Skip to main content

ant_core/data/client/
file.rs

1//! File operations using streaming self-encryption.
2//!
3//! Upload files directly from disk without loading them entirely into memory.
4//! Uses `stream_encrypt` to process files in 8KB chunks, encrypting and
5//! uploading each piece as it's produced.
6//!
7//! Encrypted chunks are spilled to a temporary directory during encryption
8//! so that peak memory usage is bounded to one wave (~256 MB for 64 × 4 MB
9//! chunks) regardless of file size.
10//!
11//! For in-memory data uploads, see the `data` module.
12
13use crate::data::client::adaptive::{observe_op, rebucketed_unordered};
14use crate::data::client::batch::{
15    finalize_batch_payment, PaymentIntent, PreparedChunk, WaveAggregateStats,
16};
17use crate::data::client::chunk::ChunkPeerGetResult;
18use crate::data::client::classify_error;
19use crate::data::client::merkle::{
20    chunk_contents_for_upload_addresses, finalize_merkle_batch, merkle_deferred_retry,
21    merkle_store_with_retry, should_use_merkle, MerkleBatchPaymentResult, PaymentMode,
22    PreparedMerkleBatch, DEFERRED_ROUND_DELAYS_SECS,
23};
24use crate::data::client::Client;
25use crate::data::error::{Error, PartialUploadSpend, Result};
26use ant_protocol::evm::{Amount, PaymentQuote, QuoteHash, TxHash, MAX_LEAVES};
27use ant_protocol::transport::{MultiAddr, PeerId};
28use ant_protocol::{compute_address, XorName as ChunkAddress, DATA_TYPE_CHUNK};
29use bytes::Bytes;
30use fs2::FileExt;
31use futures::stream::{self, StreamExt};
32use self_encryption::{
33    get_root_data_map_parallel, stream_decrypt_batch_size, stream_encrypt,
34    streaming_decrypt_with_batch_size, DataMap,
35};
36use std::collections::{HashMap, HashSet};
37use std::io::Write;
38use std::num::NonZeroUsize;
39use std::path::{Path, PathBuf};
40use std::sync::{Arc, Mutex};
41use tokio::runtime::Handle;
42use tokio::sync::mpsc;
43use tracing::{debug, info, warn};
44use xor_name::XorName;
45
46/// Progress events emitted during file upload for UI feedback.
47#[derive(Debug, Clone)]
48pub enum UploadEvent {
49    /// A chunk has been encrypted and spilled to disk.
50    Encrypting { chunks_done: usize },
51    /// File encryption complete.
52    Encrypted { total_chunks: usize },
53    /// Starting quote collection for a wave.
54    QuotingChunks {
55        wave: usize,
56        total_waves: usize,
57        chunks_in_wave: usize,
58    },
59    /// A chunk has been quoted (peer discovery + price received).
60    /// This is the slow phase — each quote involves network round-trips.
61    ChunkQuoted { quoted: usize, total: usize },
62    /// A chunk has been stored on the network.
63    ChunkStored { stored: usize, total: usize },
64}
65
66/// Progress events emitted during file download for UI feedback.
67#[derive(Debug, Clone)]
68pub enum DownloadEvent {
69    /// Resolving hierarchical DataMap to discover real chunk count.
70    ResolvingDataMap { total_map_chunks: usize },
71    /// A DataMap chunk has been fetched during resolution.
72    MapChunkFetched { fetched: usize },
73    /// DataMap resolved — total data chunk count now known.
74    DataMapResolved { total_chunks: usize },
75    /// Data chunks are being fetched from the network.
76    ChunksFetched { fetched: usize, total: usize },
77}
78
79/// File download result when peer-health diagnostics are enabled.
80#[derive(Debug, Clone)]
81pub struct FileDownloadWithPeerReport {
82    /// Number of plaintext bytes written to the destination.
83    pub bytes_written: u64,
84    /// Per-file-chunk closest-peer GET results collected during the actual download.
85    pub chunk_reports: Vec<FileChunkPeerReport>,
86}
87
88/// Closest-peer GET results for one file chunk.
89#[derive(Debug, Clone)]
90pub struct FileChunkPeerReport {
91    /// 1-based chunk index in the resolved file DataMap.
92    pub index: usize,
93    /// Chunk address.
94    pub address: ChunkAddress,
95    /// All diagnostic GET sweeps attempted for this chunk.
96    pub sweeps: Vec<FileChunkPeerSweepReport>,
97}
98
99/// One all-peer diagnostic GET sweep for a file chunk.
100#[derive(Debug, Clone)]
101pub struct FileChunkPeerSweepReport {
102    /// 1-based attempt number for this chunk.
103    pub attempt: usize,
104    /// Whether this sweep happened during a deferred retry round.
105    pub deferred_retry: bool,
106    /// DHT lookup / sweep-level error, if the closest-peer group could not be queried.
107    pub error: Option<String>,
108    /// Per-peer results, sorted closest first.
109    pub peers: Vec<FileChunkPeerReportPeer>,
110}
111
112/// One peer result in a [`FileChunkPeerReport`].
113#[derive(Debug, Clone)]
114pub struct FileChunkPeerReportPeer {
115    /// Peer queried for the chunk.
116    pub peer_id: PeerId,
117    /// Known network addresses used for the peer.
118    pub peer_addrs: Vec<MultiAddr>,
119    /// XOR distance from `peer_id` to the chunk address.
120    pub xor_distance: ChunkAddress,
121    /// Whether this peer returned the chunk or why it did not.
122    pub status: FileChunkPeerStatus,
123}
124
125/// Peer-level file chunk GET diagnostic status.
126#[derive(Debug, Clone)]
127pub enum FileChunkPeerStatus {
128    /// The peer returned the chunk.
129    Found { bytes: usize },
130    /// The peer responded authoritatively that it does not store the chunk.
131    NotFound,
132    /// The peer did not respond before the timeout.
133    Timeout { message: String },
134    /// The transport/network path to the peer failed.
135    NetworkError { message: String },
136    /// Any other per-peer error.
137    Error { message: String },
138}
139
140/// One entry in the per-chunk quote list returned by
141/// [`Client::get_store_quotes`]: the responding peer, its addresses, the
142/// signed quote it returned, and the payment amount it is demanding.
143type QuoteEntry = (PeerId, Vec<MultiAddr>, PaymentQuote, Amount);
144
145type DownloadBatchEntry = (usize, std::result::Result<Bytes, XorName>);
146
147#[derive(Debug, Clone)]
148struct RecordedFileChunkPeerSweep {
149    index: usize,
150    address: ChunkAddress,
151    sweep: FileChunkPeerSweepReport,
152}
153
154#[derive(Clone)]
155struct FileDownloadFetchContext {
156    total_chunks: usize,
157    peer_count: usize,
158    fetched_ref: Arc<std::sync::atomic::AtomicUsize>,
159    progress_ref: Option<mpsc::Sender<DownloadEvent>>,
160    peer_reports: Option<Arc<Mutex<Vec<RecordedFileChunkPeerSweep>>>>,
161}
162
163/// Number of chunks per upload wave (matches batch.rs PAYMENT_WAVE_SIZE).
164const UPLOAD_WAVE_SIZE: usize = 64;
165
166/// Hard ceiling on chunk bodies held in memory at once by the merkle whole-file
167/// store fan-out (`upload_merkle_from_spill`). Each in-flight store holds one
168/// spilled body (≤ `MAX_CHUNK_SIZE` = 4 MiB), so this bounds peak resident store
169/// memory at ~256 MiB — the same bound the old fixed 64-chunk waves gave. The
170/// adaptive store cap can legitimately exceed this (`AdaptiveConfig::sanitize`
171/// permits `adaptive.max.store` above 64), so the fan-out clamps its cap here to
172/// keep a high configured max from pinning gigabytes of chunk bodies (PR #137
173/// review). Throughput is unaffected at the default cap, which is already 64.
174const MERKLE_STORE_MAX_IN_FLIGHT: usize = 64;
175
176/// The merkle whole-file store fan-out concurrency: the adaptive store cap,
177/// clamped to [`MERKLE_STORE_MAX_IN_FLIGHT`] (memory bound) and floored at 1.
178fn merkle_store_cap(limiter_current: usize) -> usize {
179    limiter_current.clamp(1, MERKLE_STORE_MAX_IN_FLIGHT)
180}
181
182/// Stream decrypt batches should be larger than fetch fan-out so
183/// the rolling fetch scheduler can keep launching new chunk GETs as earlier
184/// ones complete, instead of stopping at each self-encryption batch boundary.
185const DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER: usize = 4;
186
187/// Use at most this fraction of currently usable RAM for one decrypt batch.
188const DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR: u64 = 4;
189
190/// A decrypt batch briefly holds encrypted chunk bytes, decrypted chunk bytes,
191/// and Vec/Bytes overhead. Use a conservative multiplier rather than assuming
192/// payload bytes alone.
193const DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER: u64 = 3;
194
195/// Maximum number of distinct chunk addresses to sample when probing for a
196/// representative quote in [`Client::estimate_upload_cost`].
197///
198/// Bounded small so we never spend more than a couple of round-trips on the
199/// `AlreadyStored` retry path, which only matters when many leading chunks
200/// of a file already live on the network.
201const ESTIMATE_SAMPLE_CAP: usize = 5;
202
203/// First diagnostic all-peer fetch attempt for a file chunk.
204const FIRST_DIAGNOSTIC_FETCH_ATTEMPT: usize = 1;
205
206/// Deferred retry attempt number for retry round 0.
207const DEFERRED_RETRY_ATTEMPT_OFFSET: usize = 2;
208
209/// Pick up to `cap` chunk indices spread evenly across `[0, total)`, always
210/// including the first and last chunk.
211///
212/// Sampling the *first* N chunks biases the probe: a file sharing a leading
213/// prefix with a prior upload (compressed archives, similar headers) reports
214/// those chunks as `AlreadyStored` even when the tail is new, so a positional
215/// sample looks in the worst possible place. Spreading the sample means a
216/// single new chunk anywhere in the file yields a real price.
217///
218/// Returns `[0]` for a single chunk and every index when `total <= cap`, so
219/// [`Client::estimate_upload_cost`] can still detect the "whole file sampled"
220/// case. Indices are strictly increasing.
221fn distributed_sample_indices(total: usize, cap: usize) -> Vec<usize> {
222    if total == 0 {
223        return Vec::new();
224    }
225    let sample_limit = total.min(cap);
226    if sample_limit <= 1 {
227        return vec![0];
228    }
229    let mut indices: Vec<usize> = (0..sample_limit)
230        .map(|i| i * (total - 1) / (sample_limit - 1))
231        .collect();
232    indices.dedup(); // defensive: already strictly increasing for cap >= 2
233    indices
234}
235
236fn file_chunk_sweep_report_from_peer_results(
237    attempt: usize,
238    deferred_retry: bool,
239    results: &[ChunkPeerGetResult],
240) -> (Option<Bytes>, FileChunkPeerSweepReport) {
241    let mut content = None;
242    let peers = results
243        .iter()
244        .map(|result| {
245            if content.is_none() {
246                if let Ok(Some(chunk)) = &result.chunk_result {
247                    content = Some(chunk.content.clone());
248                }
249            }
250
251            FileChunkPeerReportPeer {
252                peer_id: result.peer_id,
253                peer_addrs: result.peer_addrs.clone(),
254                xor_distance: result.xor_distance,
255                status: file_chunk_peer_status(&result.chunk_result),
256            }
257        })
258        .collect();
259
260    (
261        content,
262        FileChunkPeerSweepReport {
263            attempt,
264            deferred_retry,
265            error: None,
266            peers,
267        },
268    )
269}
270
271fn file_chunk_sweep_report_from_error(
272    attempt: usize,
273    deferred_retry: bool,
274    error: &Error,
275) -> FileChunkPeerSweepReport {
276    FileChunkPeerSweepReport {
277        attempt,
278        deferred_retry,
279        error: Some(error.to_string()),
280        peers: Vec::new(),
281    }
282}
283
284fn file_chunk_reports_from_recorded_sweeps(
285    mut sweeps: Vec<RecordedFileChunkPeerSweep>,
286) -> Vec<FileChunkPeerReport> {
287    sweeps.sort_by_key(|record| (record.index, record.sweep.attempt));
288
289    let mut reports: Vec<FileChunkPeerReport> = Vec::new();
290    for record in sweeps {
291        if let Some(report) = reports
292            .last_mut()
293            .filter(|report| report.index == record.index)
294        {
295            report.sweeps.push(record.sweep);
296            continue;
297        }
298
299        reports.push(FileChunkPeerReport {
300            index: record.index,
301            address: record.address,
302            sweeps: vec![record.sweep],
303        });
304    }
305
306    reports
307}
308
309fn file_chunk_peer_status(
310    chunk_result: &std::result::Result<Option<ant_protocol::DataChunk>, Error>,
311) -> FileChunkPeerStatus {
312    match chunk_result {
313        Ok(Some(chunk)) => FileChunkPeerStatus::Found {
314            bytes: chunk.content.len(),
315        },
316        Ok(None) => FileChunkPeerStatus::NotFound,
317        Err(Error::Timeout(e)) => FileChunkPeerStatus::Timeout { message: e.clone() },
318        Err(Error::Network(e)) => FileChunkPeerStatus::NetworkError { message: e.clone() },
319        Err(e) => FileChunkPeerStatus::Error {
320            message: e.to_string(),
321        },
322    }
323}
324
325/// Gas used by one `pay_for_quotes` transaction that packs up to
326/// `UPLOAD_WAVE_SIZE` (quote_hash, rewards_address, amount) entries.
327///
328/// `batch_pay` in `batch.rs` flattens every chunk's close-group quotes into a
329/// single EVM call, so the dominant cost is the SSTOREs for each entry plus
330/// the base tx overhead. On Arbitrum that is roughly
331/// `21_000 + 64 × (20_000 + small)` ≈ 1.3M; we round up to 1.5M as a
332/// conservative per-wave upper bound.
333const GAS_PER_WAVE_TX: u128 = 1_500_000;
334
335/// Gas used by one merkle batch payment transaction.
336///
337/// One on-chain tx per merkle sub-batch, but each tx verifies a merkle tree
338/// and posts a pool commitment, so budget higher than a plain transfer.
339const GAS_PER_MERKLE_TX: u128 = 500_000;
340
341/// Advisory gas price (wei/gas) used to turn the gas estimate into an ETH
342/// figure when no live gas oracle is consulted.
343///
344/// Arbitrum One typically settles around 0.1 gwei on quiet blocks; we use
345/// that as the default so the CLI prints a sensible order-of-magnitude
346/// number. Users should treat the reported gas cost as an estimate, not a
347/// commitment — real gas is bid at submission time.
348const ARBITRUM_GAS_PRICE_WEI: u128 = 100_000_000;
349
350/// Extra headroom percentage for disk space check.
351///
352/// Encrypted chunks are slightly larger than the source data due to padding
353/// and self-encryption overhead. We require file_size + 10% free space in
354/// the temp directory to account for this.
355const DISK_SPACE_HEADROOM_PERCENT: u64 = 10;
356
357/// Temporary on-disk buffer for encrypted chunks.
358///
359/// During file encryption, chunks are written to a temp directory so that
360/// only their 32-byte addresses stay in memory. At upload time chunks are
361/// read back one wave at a time, keeping peak RAM at ~`UPLOAD_WAVE_SIZE × 4 MB`.
362/// Grace period (in seconds) before a spill dir is eligible for stale cleanup.
363///
364/// This is a small TOCTOU guard covering the sub-millisecond window inside
365/// [`ChunkSpill::new`] between `create_dir` and `try_lock_exclusive`. Once a
366/// dir is older than this and its lockfile is releasable, the owning process
367/// is gone and the dir is safe to reap — regardless of how old it is.
368///
369/// The previous policy waited 24 h before reaping any orphan, which meant
370/// that any non-graceful exit (SIGKILL, kernel OOM, panic abort) leaked its
371/// spill dir until the next day's upload — and on a host being restart-looped
372/// by systemd, orphans could fill the disk well within that window.
373const SPILL_STALE_GRACE_SECS: u64 = 30;
374
375/// Prefix for spill directory names to distinguish from user files.
376const SPILL_DIR_PREFIX: &str = "spill_";
377
378/// Lockfile name inside each spill dir to signal active use.
379const SPILL_LOCK_NAME: &str = ".lock";
380
381struct ChunkSpill {
382    /// Directory holding spilled chunk files (named by hex address).
383    dir: PathBuf,
384    /// Lockfile held for the lifetime of this spill (prevents stale cleanup).
385    _lock: std::fs::File,
386    /// Deduplicated list of chunk addresses.
387    addresses: Vec<[u8; 32]>,
388    /// Tracks seen addresses for deduplication.
389    seen: HashSet<[u8; 32]>,
390    /// Byte size per spilled chunk address.
391    sizes: HashMap<[u8; 32], u64>,
392    /// Running total of unique chunk byte sizes (for average-size calculation).
393    total_bytes: u64,
394}
395
396impl ChunkSpill {
397    /// Return the parent directory for all spill dirs: `<data_dir>/spill/`.
398    fn spill_root() -> Result<PathBuf> {
399        use crate::config;
400        let root = config::data_dir()
401            .map_err(|e| Error::Config(format!("cannot determine data dir for spill: {e}")))?
402            .join("spill");
403        Ok(root)
404    }
405
406    /// Create a new spill directory under `<data_dir>/spill/`.
407    ///
408    /// Directory name is `spill_<timestamp>_<random>` so orphans can be
409    /// identified by prefix and cleaned up by age. A lockfile inside the
410    /// dir prevents concurrent cleanup from deleting an active spill.
411    fn new() -> Result<Self> {
412        let root = Self::spill_root()?;
413        std::fs::create_dir_all(&root)?;
414
415        // Clean up stale spill dirs from previous crashed runs.
416        Self::cleanup_stale(&root);
417
418        let now = std::time::SystemTime::now()
419            .duration_since(std::time::UNIX_EPOCH)
420            .unwrap_or_default()
421            .as_secs();
422        let unique: u64 = rand::random();
423        let dir = root.join(format!("{SPILL_DIR_PREFIX}{now}_{unique}"));
424        std::fs::create_dir(&dir)?;
425
426        // Create and hold a lockfile for the lifetime of this spill.
427        // cleanup_stale() will skip dirs with locked files.
428        let lock_path = dir.join(SPILL_LOCK_NAME);
429        let lock_file = std::fs::File::create(&lock_path).map_err(|e| {
430            Error::Io(std::io::Error::new(
431                e.kind(),
432                format!("failed to create spill lockfile: {e}"),
433            ))
434        })?;
435        lock_file.try_lock_exclusive().map_err(|e| {
436            Error::Io(std::io::Error::new(
437                e.kind(),
438                format!("failed to lock spill lockfile: {e}"),
439            ))
440        })?;
441
442        Ok(Self {
443            dir,
444            _lock: lock_file,
445            addresses: Vec::new(),
446            seen: HashSet::new(),
447            sizes: HashMap::new(),
448            total_bytes: 0,
449        })
450    }
451
452    /// Clean up stale spill directories. Best-effort, errors are logged.
453    ///
454    /// A spill dir is reaped when:
455    /// 1. Its name starts with `SPILL_DIR_PREFIX` (ignores unrelated files)
456    /// 2. It is an actual directory, not a symlink (prevents symlink attacks)
457    /// 3. Its timestamp is older than `SPILL_STALE_GRACE_SECS` (TOCTOU guard)
458    /// 4. Its lockfile is releasable — i.e. no live process holds it
459    ///
460    /// The lockfile is the primary correctness gate: a releasable lock means
461    /// the owning `ChunkSpill` has been dropped or the process is gone, so
462    /// the dir is fair game. The grace period covers only the brief window
463    /// inside [`Self::new`] between `create_dir` and `try_lock_exclusive`.
464    ///
465    /// Safe to call concurrently from multiple processes.
466    fn cleanup_stale(root: &Path) {
467        let now = std::time::SystemTime::now()
468            .duration_since(std::time::UNIX_EPOCH)
469            .unwrap_or_default()
470            .as_secs();
471
472        if now == 0 {
473            // Clock is broken (before Unix epoch). Skip cleanup to avoid
474            // misidentifying dirs as stale.
475            warn!("System clock before Unix epoch, skipping spill cleanup");
476            return;
477        }
478
479        let entries = match std::fs::read_dir(root) {
480            Ok(entries) => entries,
481            Err(_) => return,
482        };
483
484        for entry in entries.flatten() {
485            let name = entry.file_name();
486            let name_str = name.to_string_lossy();
487
488            // Only process dirs with our prefix.
489            let suffix = match name_str.strip_prefix(SPILL_DIR_PREFIX) {
490                Some(s) => s,
491                None => continue,
492            };
493
494            // Parse timestamp: "spill_<timestamp>_<random>"
495            let timestamp: u64 = match suffix.split('_').next().and_then(|s| s.parse().ok()) {
496                Some(ts) => ts,
497                None => continue,
498            };
499
500            if now.saturating_sub(timestamp) < SPILL_STALE_GRACE_SECS {
501                continue;
502            }
503
504            // Safety: only delete actual directories, not symlinks.
505            let file_type = match entry.file_type() {
506                Ok(ft) => ft,
507                Err(_) => continue,
508            };
509            if !file_type.is_dir() {
510                continue;
511            }
512
513            let path = entry.path();
514
515            // Check lockfile: if locked, the dir is in active use -- skip it.
516            let lock_path = path.join(SPILL_LOCK_NAME);
517            if let Ok(lock_file) = std::fs::File::open(&lock_path) {
518                use fs2::FileExt;
519                if lock_file.try_lock_exclusive().is_err() {
520                    // Lock held by another process -- dir is active.
521                    debug!("Skipping active spill dir: {}", path.display());
522                    continue;
523                }
524                // We acquired the lock, so no one else holds it.
525                // Drop it before deleting.
526                drop(lock_file);
527            }
528
529            info!("Cleaning up stale spill dir: {}", path.display());
530            if let Err(e) = std::fs::remove_dir_all(&path) {
531                warn!("Failed to clean up stale spill dir {}: {e}", path.display());
532            }
533        }
534    }
535
536    /// Run stale spill cleanup. Call at client startup or periodically.
537    #[allow(dead_code)]
538    pub(crate) fn run_cleanup() {
539        if let Ok(root) = Self::spill_root() {
540            Self::cleanup_stale(&root);
541        }
542    }
543
544    /// Write one encrypted chunk to disk and record its address.
545    ///
546    /// Deduplicates by content address: if the same chunk was already
547    /// spilled, the write and accounting are skipped. This prevents
548    /// double-uploads and inflated quoting metrics.
549    fn push(&mut self, content: &[u8]) -> Result<()> {
550        let address = compute_address(content);
551        if !self.seen.insert(address) {
552            return Ok(());
553        }
554        let path = self.dir.join(hex::encode(address));
555        std::fs::write(&path, content)?;
556        let content_len = content.len() as u64;
557        self.sizes.insert(address, content_len);
558        self.total_bytes += content_len;
559        self.addresses.push(address);
560        Ok(())
561    }
562
563    /// Number of chunks stored.
564    fn len(&self) -> usize {
565        self.addresses.len()
566    }
567
568    /// Total bytes of all spilled chunks.
569    fn total_bytes(&self) -> u64 {
570        self.total_bytes
571    }
572
573    /// Address and byte-size pairs for all spilled chunks.
574    fn chunk_entries(&self) -> Result<Vec<([u8; 32], u64)>> {
575        self.addresses
576            .iter()
577            .map(|address| {
578                self.sizes
579                    .get(address)
580                    .copied()
581                    .map(|size| (*address, size))
582                    .ok_or_else(|| {
583                        Error::Storage(format!(
584                            "missing size for spilled chunk {}",
585                            hex::encode(address)
586                        ))
587                    })
588            })
589            .collect()
590    }
591
592    /// Read a single chunk back from disk by address.
593    fn read_chunk(&self, address: &[u8; 32]) -> Result<Bytes> {
594        let path = self.dir.join(hex::encode(address));
595        let data = std::fs::read(&path).map_err(|e| {
596            Error::Io(std::io::Error::new(
597                e.kind(),
598                format!("reading spilled chunk {}: {e}", hex::encode(address)),
599            ))
600        })?;
601        Ok(Bytes::from(data))
602    }
603
604    /// Clean up the spill directory.
605    fn cleanup(&self) {
606        if let Err(e) = std::fs::remove_dir_all(&self.dir) {
607            warn!(
608                "Failed to clean up chunk spill dir {}: {e}",
609                self.dir.display()
610            );
611        }
612    }
613}
614
615impl Drop for ChunkSpill {
616    fn drop(&mut self) {
617        self.cleanup();
618    }
619}
620
621fn cached_merkle_covers_addresses(
622    cached: &MerkleBatchPaymentResult,
623    addresses: &[[u8; 32]],
624) -> bool {
625    addresses
626        .iter()
627        .all(|addr| cached.proofs.contains_key(addr))
628}
629
630/// Split `addresses` into `(to_store, missing_proof)`: those that have a merkle
631/// proof in `proofs`, and those that don't.
632///
633/// A partial [`MerkleBatchPaymentResult`] (from a `pay_for_merkle_multi_batch`
634/// where a later sub-batch's payment failed) carries proofs only for the
635/// already-paid sub-batches, so unpaid chunks reach the upload path with no
636/// proof. `upload_merkle_from_spill` reports those as failed via
637/// [`Error::PartialUpload`] rather than aborting the whole file. Order within
638/// each group follows `addresses`.
639fn partition_addresses_by_proof(
640    addresses: &[[u8; 32]],
641    proofs: &HashMap<[u8; 32], Vec<u8>>,
642) -> (Vec<[u8; 32]>, Vec<[u8; 32]>) {
643    addresses
644        .iter()
645        .copied()
646        .partition(|addr| proofs.contains_key(addr))
647}
648
649/// Build a `PartialUpload` after a fatal merkle store error, with accurate
650/// counts.
651///
652/// A fatal abort can leave chunks in three states: confirmed stored (in
653/// `stored_addresses`), known-failed (in `known_failed` — missing proofs, the
654/// quorum shortfalls and the fatal chunk seen so far), and "in flight when the
655/// abort hit" (neither). Rather than trust the helpers to enumerate the last
656/// group, this derives the failed set authoritatively as *every* `addresses`
657/// entry not in `stored_addresses`, preferring a known per-chunk message and
658/// falling back to the fatal `reason`. That guarantees
659/// `stored_count + failed_count` accounts for the whole file — fixing the
660/// under-reporting where a fatal wave could surface `failed_count = 0` and omit
661/// same-pass successes.
662fn partial_upload_after_fatal(
663    addresses: &[[u8; 32]],
664    stored_addresses: Vec<[u8; 32]>,
665    stored_count: usize,
666    total_chunks: usize,
667    known_failed: Vec<([u8; 32], String)>,
668    spend: PartialUploadSpend,
669    reason: String,
670) -> Error {
671    let stored_set: HashSet<[u8; 32]> = stored_addresses.iter().copied().collect();
672    let mut failed_map: HashMap<[u8; 32], String> = HashMap::new();
673    for (addr, msg) in known_failed {
674        if !stored_set.contains(&addr) {
675            failed_map.entry(addr).or_insert(msg);
676        }
677    }
678    for addr in addresses {
679        if !stored_set.contains(addr) {
680            failed_map.entry(*addr).or_insert_with(|| reason.clone());
681        }
682    }
683    let failed: Vec<([u8; 32], String)> = failed_map.into_iter().collect();
684    let failed_count = failed.len();
685    Error::PartialUpload {
686        stored: stored_addresses,
687        stored_count,
688        failed,
689        failed_count,
690        total_chunks,
691        spend: Box::new(spend),
692        reason,
693    }
694}
695
696/// One wave's contribution to a single-node upload, distilled from its
697/// `batch_upload_chunks_with_events` result.
698#[derive(Debug)]
699struct SingleWaveOutcome {
700    /// Addresses confirmed stored in this wave.
701    stored: Vec<[u8; 32]>,
702    /// Chunks that failed after retries in this wave.
703    failed: Vec<([u8; 32], String)>,
704    /// Storage cost paid on-chain for this wave, in atto-tokens.
705    storage_atto: Amount,
706    /// Gas paid on-chain for this wave, in wei.
707    gas_wei: u128,
708    /// Per-wave store/retry statistics. Empty for a quorum-short wave, whose
709    /// `PartialUpload` carries no stats.
710    stats: WaveAggregateStats,
711}
712
713/// Fold one wave's batch-upload result for the single-node path.
714///
715/// A `PartialUpload` (chunks short of quorum after retries) is **recoverable**:
716/// its stored/failed chunks and on-chain spend are returned so the caller
717/// records them and continues to the next wave, making the file make maximum
718/// progress exactly like `upload_merkle_from_spill`. Every other error is **fatal**
719/// (wallet/payment-infrastructure failures, missing proofs, spill reads) and is
720/// returned via `Err` to abort the file. Because `UPLOAD_WAVE_SIZE ==
721/// PAYMENT_WAVE_SIZE`, each batch call is exactly one payment wave, so folding a
722/// `PartialUpload` leaves nothing un-attempted within the wave.
723fn fold_single_wave(
724    result: Result<(Vec<[u8; 32]>, String, u128, WaveAggregateStats)>,
725) -> Result<SingleWaveOutcome> {
726    match result {
727        Ok((stored, storage, gas, stats)) => Ok(SingleWaveOutcome {
728            stored,
729            failed: Vec::new(),
730            storage_atto: storage.parse().unwrap_or(Amount::ZERO),
731            gas_wei: gas,
732            stats,
733        }),
734        Err(Error::PartialUpload {
735            stored,
736            failed,
737            spend,
738            ..
739        }) => Ok(SingleWaveOutcome {
740            stored,
741            failed,
742            storage_atto: spend.storage_cost_atto.parse().unwrap_or(Amount::ZERO),
743            gas_wei: spend.gas_cost_wei,
744            stats: WaveAggregateStats::default(),
745        }),
746        Err(e) => Err(e),
747    }
748}
749
750/// Check that the spill directory has enough free space for the spilled chunks.
751///
752/// `file_size` is the source file's byte count. We require
753/// `file_size + 10%` free space to account for self-encryption overhead.
754fn check_disk_space_for_spill(file_size: u64) -> Result<()> {
755    let spill_root = ChunkSpill::spill_root()?;
756
757    // Ensure the root exists so fs2 can query it.
758    std::fs::create_dir_all(&spill_root)?;
759
760    let available = fs2::available_space(&spill_root).map_err(|e| {
761        Error::Io(std::io::Error::new(
762            e.kind(),
763            format!(
764                "failed to query disk space on {}: {e}",
765                spill_root.display()
766            ),
767        ))
768    })?;
769
770    // Use integer arithmetic to avoid f64 precision loss on large file sizes.
771    let headroom = file_size / DISK_SPACE_HEADROOM_PERCENT;
772    let required = file_size.saturating_add(headroom);
773
774    if available < required {
775        let avail_mb = available / (1024 * 1024);
776        let req_mb = required / (1024 * 1024);
777        return Err(Error::InsufficientDiskSpace(format!(
778            "need ~{req_mb} MB in spill dir ({}) but only {avail_mb} MB available",
779            spill_root.display()
780        )));
781    }
782
783    debug!(
784        "Disk space check passed: {available} bytes available, {required} bytes required (spill: {})",
785        spill_root.display()
786    );
787    Ok(())
788}
789
790fn usable_memory_bytes() -> Option<u64> {
791    let mut system = sysinfo::System::new();
792    system.refresh_memory();
793
794    let available_memory = system.available_memory();
795    let free_memory = system.free_memory();
796    let used_memory = system.used_memory();
797    let total_memory = system.total_memory();
798    let unused_memory = total_memory.saturating_sub(used_memory);
799
800    let mut usable = [available_memory, free_memory, unused_memory]
801        .into_iter()
802        .filter(|bytes| *bytes > 0)
803        .max();
804
805    let cgroup_free_memory = system
806        .cgroup_limits()
807        .filter(|limits| limits.total_memory > 0)
808        .map(|limits| limits.free_memory);
809    if let Some(cgroup_free_memory) = cgroup_free_memory {
810        usable = Some(usable.unwrap_or(u64::MAX).min(cgroup_free_memory));
811    }
812
813    debug!(
814        available_memory,
815        free_memory,
816        used_memory,
817        total_memory,
818        cgroup_free_memory,
819        usable_memory = ?usable,
820        "Detected usable memory for stream decrypt batch sizing"
821    );
822
823    usable
824}
825
826fn stream_decrypt_batch_memory_cap(usable_memory_bytes: u64) -> usize {
827    let budget = usable_memory_bytes / DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR;
828    let estimated_bytes_per_chunk = (self_encryption::MAX_CHUNK_SIZE as u64)
829        .saturating_mul(DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER)
830        .max(1);
831    let cap = (budget / estimated_bytes_per_chunk).max(1);
832
833    usize::try_from(cap).unwrap_or(usize::MAX)
834}
835
836fn adaptive_stream_decrypt_batch_size(
837    total_chunks: usize,
838    fetch_cap: usize,
839    configured_batch_floor: usize,
840    usable_memory_bytes: Option<u64>,
841) -> usize {
842    let fetch_target = fetch_cap
843        .max(1)
844        .saturating_mul(DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER);
845    let requested = match usable_memory_bytes {
846        Some(bytes) => {
847            let memory_cap = stream_decrypt_batch_memory_cap(bytes);
848            configured_batch_floor
849                .max(fetch_target)
850                .max(1)
851                .min(memory_cap)
852        }
853        None => configured_batch_floor.max(1),
854    };
855
856    requested.min(total_chunks.max(1)).max(1)
857}
858
859/// Whether the data map is published to the network for address-based retrieval.
860///
861/// A private upload stores only the data chunks and returns the `DataMap` to
862/// the caller — only someone holding that `DataMap` can reconstruct the file.
863/// A public upload additionally stores the serialized `DataMap` as a chunk on
864/// the network, yielding a single chunk address that anyone can use to
865/// retrieve the `DataMap` (via [`Client::data_map_fetch`]) and then the file.
866#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
867pub enum Visibility {
868    /// Keep the data map local; only the holder can retrieve the file.
869    #[default]
870    Private,
871    /// Publish the data map as a network chunk so anyone with the returned
872    /// address can retrieve and decrypt the file.
873    Public,
874}
875
876/// Confidence attached to an [`UploadCostEstimate`]'s `storage_cost_atto`.
877///
878/// `estimate_upload_cost` prices a file by sampling a few of its chunk
879/// addresses and extrapolating. When every sampled chunk is already stored
880/// there is no live price to extrapolate from, so a `"0"` cost can mean either
881/// "provably free" (the whole file was sampled) or only "probably free" (the
882/// tail was unsampled). This lets callers tell those apart instead of treating
883/// every `"0"` as unconditionally free.
884#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, serde::Serialize, serde::Deserialize)]
885#[serde(rename_all = "snake_case")]
886pub enum CostEstimateConfidence {
887    /// At least one sampled chunk returned a live quote; `storage_cost_atto`
888    /// is extrapolated from a real per-chunk price. The normal case.
889    #[default]
890    PricedSample,
891    /// Every chunk in the file was sampled and every one was already stored.
892    /// `storage_cost_atto` is exactly `"0"` — the upload is genuinely free.
893    VerifiedAllAlreadyStored,
894    /// Every *sampled* chunk was already stored, but not all chunks were
895    /// sampled. `storage_cost_atto` is `"0"` as a best-effort guess; the real
896    /// upload reconciles the true cost at payment time. Render this as "likely
897    /// already stored", not a guaranteed-free price.
898    AllSamplesAlreadyStoredIncomplete,
899}
900
901/// Estimated cost of uploading a file, returned by
902/// [`Client::estimate_upload_cost`].
903///
904/// Marked `#[non_exhaustive]` so adding a field later is not a breaking change
905/// for downstream consumers that construct or pattern-match on this struct.
906#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
907#[non_exhaustive]
908pub struct UploadCostEstimate {
909    /// Original file size in bytes.
910    pub file_size: u64,
911    /// Number of chunks the file would be split into (data chunks only,
912    /// does not include the DataMap chunk added during public uploads).
913    pub chunk_count: usize,
914    /// Estimated total storage cost in atto (token smallest unit).
915    pub storage_cost_atto: String,
916    /// Estimated gas cost in wei as a string. This is a rough heuristic
917    /// based on chunk count and payment mode, NOT a live gas price query.
918    pub estimated_gas_cost_wei: String,
919    /// Payment mode that would be used.
920    pub payment_mode: PaymentMode,
921    /// How much to trust `storage_cost_atto`. See [`CostEstimateConfidence`].
922    #[serde(default)]
923    pub confidence: CostEstimateConfidence,
924}
925
926/// Result of a file upload: the `DataMap` needed to retrieve the file.
927///
928/// Marked `#[non_exhaustive]` so adding a new field in future is not a
929/// breaking change for downstream consumers that construct or pattern-match
930/// on this struct.
931#[derive(Debug, Clone)]
932#[non_exhaustive]
933pub struct FileUploadResult {
934    /// The data map containing chunk metadata for reconstruction.
935    pub data_map: DataMap,
936    /// Number of chunks stored on the network.
937    pub chunks_stored: usize,
938    /// Number of chunks that failed to store. Always 0 for a successful
939    /// upload — partial-failure information is conveyed via
940    /// [`crate::data::Error::PartialUpload`] instead.
941    pub chunks_failed: usize,
942    /// Total number of chunks in the upload, including chunks that were
943    /// already stored and skipped. On full success this equals `chunks_stored`.
944    pub total_chunks: usize,
945    /// Which payment mode was actually used (not just requested).
946    pub payment_mode_used: PaymentMode,
947    /// Total storage cost paid in token units (atto). "0" if all chunks already existed.
948    pub storage_cost_atto: String,
949    /// Total gas cost in wei. 0 if no on-chain transactions were made.
950    pub gas_cost_wei: u128,
951    /// Chunk address of the serialized `DataMap`, set only for
952    /// [`Visibility::Public`] uploads. **`Some` means this address is
953    /// retrievable from the network (via [`Client::data_map_fetch`])**, not
954    /// necessarily that *this* upload paid to store it — if the serialized
955    /// `DataMap` hashed to a chunk that was already on the network (same
956    /// file uploaded before; deterministic via self-encryption), the address
957    /// is still returned but no storage payment was made for it.
958    pub data_map_address: Option<[u8; 32]>,
959    /// Sum of chunk-store RPC attempts across the upload
960    /// (`>= chunks_stored` on full success; more if any chunk retried).
961    /// `0` for paths that don't run the wave store loop.
962    pub chunk_attempts_total: usize,
963    /// Per-chunk store wall-clock in ms (length == `chunks_stored` on full
964    /// success, empty for paths that don't run the wave store loop).
965    pub store_durations_ms: Vec<u64>,
966    /// Count of stored chunks that succeeded on each retry round
967    /// (index 0 = first attempt, 1 = first retry, etc.). All zeros for
968    /// paths that don't run the wave store loop.
969    pub retries_histogram: [usize; 4],
970}
971
972/// Payment information for external signing — either wave-batch or merkle.
973#[derive(Debug)]
974pub enum ExternalPaymentInfo {
975    /// Wave-batch: individual (quote_hash, rewards_address, amount) tuples.
976    WaveBatch {
977        /// Chunks ready for payment (needed for finalize).
978        prepared_chunks: Vec<PreparedChunk>,
979        /// Payment intent for external signing.
980        payment_intent: PaymentIntent,
981    },
982    /// Merkle: single on-chain call with depth, pool commitments, timestamp.
983    Merkle {
984        /// The prepared merkle batch (public fields sent to frontend, private fields stay in Rust).
985        prepared_batch: PreparedMerkleBatch,
986        /// Raw chunk contents that still need upload after the preflight check.
987        chunk_contents: Vec<Bytes>,
988        /// Chunk addresses that still need upload after the preflight check.
989        chunk_addresses: Vec<[u8; 32]>,
990    },
991}
992
993/// Prepared upload ready for external payment.
994///
995/// Contains everything needed to construct the on-chain payment transaction
996/// externally (e.g. via WalletConnect in a desktop app) and then finalize
997/// the upload without a Rust-side wallet.
998///
999/// Note: This struct stays in Rust memory — only the public fields of
1000/// `payment_info` are sent to the frontend. `PreparedChunk` contains
1001/// non-serializable network types, so the full struct cannot derive `Serialize`.
1002///
1003/// Marked `#[non_exhaustive]` so adding a new field in future is not a
1004/// breaking change for downstream consumers.
1005#[derive(Debug)]
1006#[non_exhaustive]
1007pub struct PreparedUpload {
1008    /// The data map for later retrieval.
1009    pub data_map: DataMap,
1010    /// Payment information for chunks that still need payment after the
1011    /// already-stored preflight. This may be wave-batch even when the original
1012    /// chunk count was merkle-eligible if the remaining count is below the
1013    /// merkle threshold.
1014    pub payment_info: ExternalPaymentInfo,
1015    /// Chunk address of the serialized `DataMap` when this upload was
1016    /// prepared with [`Visibility::Public`]. `Some` means the address is
1017    /// retrievable on the network after finalization — either because this
1018    /// upload paid to store the chunk in `payment_info`, or because the
1019    /// chunk was already on the network (deterministic self-encryption).
1020    /// Carried through to [`FileUploadResult::data_map_address`].
1021    pub data_map_address: Option<[u8; 32]>,
1022    /// Chunk addresses already present on the network when this upload was
1023    /// prepared. These do not require payment or PUT during finalization.
1024    pub already_stored_addresses: Vec<[u8; 32]>,
1025    /// Total chunk count for the upload, including already-stored chunks.
1026    pub total_chunks: usize,
1027}
1028
1029/// Return type for [`spawn_file_encryption`]: chunk receiver, `DataMap` oneshot, join handle.
1030type EncryptionChannels = (
1031    tokio::sync::mpsc::Receiver<Bytes>,
1032    tokio::sync::oneshot::Receiver<DataMap>,
1033    tokio::task::JoinHandle<Result<()>>,
1034);
1035
1036/// Spawn a blocking task that streams file encryption through a channel.
1037fn spawn_file_encryption(path: PathBuf) -> Result<EncryptionChannels> {
1038    let metadata = std::fs::metadata(&path)?;
1039    let data_size = usize::try_from(metadata.len())
1040        .map_err(|e| Error::Encryption(format!("file size exceeds platform usize: {e}")))?;
1041
1042    let (chunk_tx, chunk_rx) = tokio::sync::mpsc::channel(2);
1043    let (datamap_tx, datamap_rx) = tokio::sync::oneshot::channel();
1044
1045    let handle = tokio::task::spawn_blocking(move || {
1046        let file = std::fs::File::open(&path)?;
1047        let mut reader = std::io::BufReader::new(file);
1048
1049        let read_error: Arc<Mutex<Option<std::io::Error>>> = Arc::new(Mutex::new(None));
1050        let read_error_clone = Arc::clone(&read_error);
1051
1052        let data_iter = std::iter::from_fn(move || {
1053            let mut buffer = vec![0u8; 8192];
1054            match std::io::Read::read(&mut reader, &mut buffer) {
1055                Ok(0) => None,
1056                Ok(n) => {
1057                    buffer.truncate(n);
1058                    Some(Bytes::from(buffer))
1059                }
1060                Err(e) => {
1061                    let mut guard = read_error_clone
1062                        .lock()
1063                        .unwrap_or_else(|poisoned| poisoned.into_inner());
1064                    *guard = Some(e);
1065                    None
1066                }
1067            }
1068        });
1069
1070        let mut stream = stream_encrypt(data_size, data_iter)
1071            .map_err(|e| Error::Encryption(format!("stream_encrypt failed: {e}")))?;
1072
1073        for chunk_result in stream.chunks() {
1074            // Check for captured read errors immediately after each chunk.
1075            // stream_encrypt sees None (EOF) when a read fails, so it stops
1076            // producing chunks. We must detect this before sending the
1077            // partial results to avoid uploading a truncated DataMap.
1078            {
1079                let guard = read_error
1080                    .lock()
1081                    .unwrap_or_else(|poisoned| poisoned.into_inner());
1082                if let Some(ref e) = *guard {
1083                    return Err(Error::Io(std::io::Error::new(e.kind(), e.to_string())));
1084                }
1085            }
1086
1087            let (_hash, content) = chunk_result
1088                .map_err(|e| Error::Encryption(format!("chunk encryption failed: {e}")))?;
1089            if chunk_tx.blocking_send(content).is_err() {
1090                return Err(Error::Encryption("upload receiver dropped".to_string()));
1091            }
1092        }
1093
1094        // Final check: read error after last chunk (stream saw EOF).
1095        {
1096            let guard = read_error
1097                .lock()
1098                .unwrap_or_else(|poisoned| poisoned.into_inner());
1099            if let Some(ref e) = *guard {
1100                return Err(Error::Io(std::io::Error::new(e.kind(), e.to_string())));
1101            }
1102        }
1103
1104        let datamap = stream
1105            .into_datamap()
1106            .ok_or_else(|| Error::Encryption("no DataMap after encryption".to_string()))?;
1107        if datamap_tx.send(datamap).is_err() {
1108            warn!("DataMap receiver dropped — upload may have been cancelled");
1109        }
1110        Ok(())
1111    });
1112
1113    Ok((chunk_rx, datamap_rx, handle))
1114}
1115
1116/// RAII guard for the staging temp file used during a disk download.
1117///
1118/// Removes the file on drop — including a panic unwind out of the
1119/// `block_in_place` decrypt loop — unless [`commit`](Self::commit) has
1120/// promoted it to its final path. Centralizes the cleanup the explicit error
1121/// arms used to repeat.
1122struct TempDownload {
1123    /// `Some` while the staging file may need cleanup; `None` once committed.
1124    path: Option<PathBuf>,
1125}
1126
1127impl TempDownload {
1128    fn new(path: PathBuf) -> Self {
1129        Self { path: Some(path) }
1130    }
1131
1132    /// Path of the staging file (valid until `commit`).
1133    fn path(&self) -> &Path {
1134        self.path
1135            .as_deref()
1136            .expect("TempDownload::path called after commit")
1137    }
1138
1139    /// Rename the staged file to `dest`. On success the guard is defused so
1140    /// `Drop` is a no-op; on failure the guard stays armed and `Drop` removes
1141    /// the orphaned temp file.
1142    fn commit(mut self, dest: &Path) -> std::io::Result<()> {
1143        std::fs::rename(self.path(), dest)?; // err → guard armed → Drop cleans up
1144        self.path = None; // success → nothing left to clean
1145        Ok(())
1146    }
1147}
1148
1149impl Drop for TempDownload {
1150    fn drop(&mut self) {
1151        if let Some(path) = self.path.take() {
1152            if let Err(e) = std::fs::remove_file(&path) {
1153                // Absent file is fine (never created / already gone).
1154                if e.kind() != std::io::ErrorKind::NotFound {
1155                    warn!(
1156                        "Failed to remove temp download file {}: {e}",
1157                        path.display()
1158                    );
1159                }
1160            }
1161        }
1162    }
1163}
1164
1165impl Client {
1166    /// Upload a file to the network using streaming self-encryption.
1167    ///
1168    /// Automatically selects merkle batch payment for files that produce
1169    /// 64+ chunks (saves gas). Encrypted chunks are spilled to a temp
1170    /// directory so peak memory stays at ~256 MB regardless of file size.
1171    ///
1172    /// # Errors
1173    ///
1174    /// Returns an error if the file cannot be read, encryption fails,
1175    /// or any chunk cannot be stored.
1176    pub async fn file_upload(&self, path: &Path) -> Result<FileUploadResult> {
1177        self.file_upload_with_mode(path, PaymentMode::Auto).await
1178    }
1179
1180    /// Estimate the cost of uploading a file without actually uploading.
1181    ///
1182    /// Encrypts the file to determine chunk count and sizes, then requests
1183    /// a single quote from the network for a representative chunk. The
1184    /// per-chunk price is extrapolated to the total chunk count.
1185    ///
1186    /// The estimate is fast (~2-5s) and does not require a wallet. Spilled
1187    /// chunks are cleaned up automatically when the function returns.
1188    ///
1189    /// Gas cost is an advisory heuristic, not a live gas-oracle query. It is
1190    /// derived from realistic per-transaction budgets (`GAS_PER_WAVE_TX`,
1191    /// `GAS_PER_MERKLE_TX`) priced at `ARBITRUM_GAS_PRICE_WEI`. Real gas
1192    /// varies with network conditions.
1193    ///
1194    /// Sampled chunk addresses are spread across the whole file (not the first
1195    /// N) so a shared leading prefix doesn't bias the sample. When a sample
1196    /// returns a live quote the per-chunk price is extrapolated and the result
1197    /// is tagged [`CostEstimateConfidence::PricedSample`].
1198    ///
1199    /// When every sampled chunk is already stored the result is still `Ok`
1200    /// with `storage_cost_atto: "0"`, tagged either
1201    /// [`CostEstimateConfidence::VerifiedAllAlreadyStored`] when the whole file
1202    /// was sampled (exactly free) or
1203    /// [`CostEstimateConfidence::AllSamplesAlreadyStoredIncomplete`] when the
1204    /// tail was unsampled (a best-effort guess that payment reconciles).
1205    ///
1206    /// # Errors
1207    ///
1208    /// Returns an error if the file cannot be read, encryption fails, or the
1209    /// network cannot provide a quote.
1210    pub async fn estimate_upload_cost(
1211        &self,
1212        path: &Path,
1213        mode: PaymentMode,
1214        progress: Option<mpsc::Sender<UploadEvent>>,
1215    ) -> Result<UploadCostEstimate> {
1216        let file_size = std::fs::metadata(path).map_err(Error::Io)?.len();
1217
1218        if file_size < 3 {
1219            return Err(Error::InvalidData(
1220                "File too small: self-encryption requires at least 3 bytes".into(),
1221            ));
1222        }
1223
1224        check_disk_space_for_spill(file_size)?;
1225
1226        info!(
1227            "Estimating upload cost for {} ({file_size} bytes)",
1228            path.display()
1229        );
1230
1231        let (spill, _data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
1232        let chunk_count = spill.len();
1233
1234        if let Some(ref tx) = progress {
1235            let _ = tx
1236                .send(UploadEvent::Encrypted {
1237                    total_chunks: chunk_count,
1238                })
1239                .await;
1240        }
1241
1242        info!("Encrypted into {chunk_count} chunks, requesting quote");
1243        let uses_merkle = should_use_merkle(chunk_count, mode);
1244
1245        // Sample chunk addresses spread evenly across the file (see
1246        // `distributed_sample_indices`) rather than the first N. A single
1247        // AlreadyStored result says nothing about the rest of the file, and a
1248        // positional sample lands on a shared leading prefix in the worst case,
1249        // so we spread the probe and only treat the whole file as "fully
1250        // stored" when every sample comes back stored.
1251        let sample_indices = distributed_sample_indices(spill.addresses.len(), ESTIMATE_SAMPLE_CAP);
1252        let mut sampled = 0usize;
1253        let mut all_already_stored = true;
1254        let mut quotes_opt: Option<Vec<QuoteEntry>> = None;
1255
1256        for &idx in &sample_indices {
1257            let addr = &spill.addresses[idx];
1258            sampled += 1;
1259            let chunk_bytes = spill.read_chunk(addr)?;
1260            let data_size = u64::try_from(chunk_bytes.len())
1261                .map_err(|e| Error::InvalidData(format!("chunk size too large: {e}")))?;
1262            let result = if uses_merkle {
1263                self.get_store_quotes_with_fault_tolerance(addr, data_size, DATA_TYPE_CHUNK)
1264                    .await
1265            } else {
1266                self.get_store_quotes(addr, data_size, DATA_TYPE_CHUNK)
1267                    .await
1268            };
1269            match result {
1270                Ok(q) => {
1271                    quotes_opt = Some(q);
1272                    all_already_stored = false;
1273                    break;
1274                }
1275                Err(Error::AlreadyStored) => {
1276                    debug!(
1277                        "Sample chunk {} already stored; trying next address ({sampled}/{})",
1278                        hex::encode(addr),
1279                        sample_indices.len()
1280                    );
1281                    continue;
1282                }
1283                Err(e) => return Err(e),
1284            }
1285        }
1286
1287        let quotes = match quotes_opt {
1288            Some(q) => q,
1289            None if all_already_stored && sampled == chunk_count => {
1290                // Every address in the file was sampled and every one is
1291                // already on the network — a zero-cost estimate is exact here.
1292                info!("All {chunk_count} chunks already stored; returning zero-cost estimate");
1293                return Ok(UploadCostEstimate {
1294                    file_size,
1295                    chunk_count,
1296                    storage_cost_atto: "0".into(),
1297                    estimated_gas_cost_wei: "0".into(),
1298                    payment_mode: if uses_merkle {
1299                        PaymentMode::Merkle
1300                    } else {
1301                        PaymentMode::Single
1302                    },
1303                    confidence: CostEstimateConfidence::VerifiedAllAlreadyStored,
1304                });
1305            }
1306            None => {
1307                // Every sampled chunk was already stored but the tail was not
1308                // sampled, so there is no live price to extrapolate. The
1309                // estimate is display-only and payment reconciles the true
1310                // cost, so return an optimistic zero flagged as incomplete
1311                // rather than erroring — callers still get a value to show.
1312                info!(
1313                    "All {sampled}/{chunk_count} sampled chunks already stored; \
1314                     returning incomplete zero-cost estimate"
1315                );
1316                return Ok(UploadCostEstimate {
1317                    file_size,
1318                    chunk_count,
1319                    storage_cost_atto: "0".into(),
1320                    estimated_gas_cost_wei: "0".into(),
1321                    payment_mode: if uses_merkle {
1322                        PaymentMode::Merkle
1323                    } else {
1324                        PaymentMode::Single
1325                    },
1326                    confidence: CostEstimateConfidence::AllSamplesAlreadyStoredIncomplete,
1327                });
1328            }
1329        };
1330
1331        // Use the median price × 3 (matches SingleNodePayment::from_quotes
1332        // which pays 3x the median to incentivize reliable storage).
1333        let mut prices: Vec<Amount> = quotes.iter().map(|(_, _, _, price)| *price).collect();
1334        prices.sort();
1335        let median_price = prices
1336            .get(prices.len() / 2)
1337            .copied()
1338            .unwrap_or(Amount::ZERO);
1339        let per_chunk_cost = median_price * Amount::from(3u64);
1340
1341        let chunk_count_u64 = u64::try_from(chunk_count).unwrap_or(u64::MAX);
1342        let total_storage = per_chunk_cost * Amount::from(chunk_count_u64);
1343
1344        // Estimate gas cost from realistic per-transaction budgets rather
1345        // than a flat per-chunk or per-wave number.
1346        //
1347        // - Single mode: `batch_pay` packs up to UPLOAD_WAVE_SIZE chunks'
1348        //   close-group quotes into one `pay_for_quotes` call on Arbitrum.
1349        //   The dominant cost is one SSTORE per entry plus base tx overhead,
1350        //   so we use GAS_PER_WAVE_TX (≈1.5M) as a conservative upper bound
1351        //   on a full wave and multiply by the number of waves. The previous
1352        //   per-wave figure of 150k was closer to a single-entry transfer
1353        //   and understated cost by 5–10x for full waves.
1354        // - Merkle mode: one tx per sub-batch that verifies a merkle tree
1355        //   and posts a pool commitment (GAS_PER_MERKLE_TX ≈ 500k each).
1356        //
1357        // Gas is priced at ARBITRUM_GAS_PRICE_WEI (~0.1 gwei, a typical
1358        // Arbitrum baseline). Treat the result as advisory, not a commitment.
1359        let waves = u128::try_from(chunk_count.div_ceil(UPLOAD_WAVE_SIZE)).unwrap_or(u128::MAX);
1360        let merkle_batches = u128::try_from(chunk_count.div_ceil(MAX_LEAVES)).unwrap_or(u128::MAX);
1361        let estimated_gas: u128 = if uses_merkle {
1362            merkle_batches
1363                .saturating_mul(GAS_PER_MERKLE_TX)
1364                .saturating_mul(ARBITRUM_GAS_PRICE_WEI)
1365        } else {
1366            waves
1367                .saturating_mul(GAS_PER_WAVE_TX)
1368                .saturating_mul(ARBITRUM_GAS_PRICE_WEI)
1369        };
1370
1371        info!(
1372            "Estimate: {chunk_count} chunks, storage={total_storage} atto, gas~={estimated_gas} wei"
1373        );
1374
1375        Ok(UploadCostEstimate {
1376            file_size,
1377            chunk_count,
1378            storage_cost_atto: total_storage.to_string(),
1379            estimated_gas_cost_wei: estimated_gas.to_string(),
1380            payment_mode: if uses_merkle {
1381                PaymentMode::Merkle
1382            } else {
1383                PaymentMode::Single
1384            },
1385            confidence: CostEstimateConfidence::PricedSample,
1386        })
1387    }
1388
1389    /// Phase 1 of external-signer upload: encrypt file and prepare chunks.
1390    ///
1391    /// Equivalent to [`Client::file_prepare_upload_with_visibility`] with
1392    /// [`Visibility::Private`] — see that method for details.
1393    pub async fn file_prepare_upload(&self, path: &Path) -> Result<PreparedUpload> {
1394        self.file_prepare_upload_with_progress(path, Visibility::Private, None)
1395            .await
1396    }
1397
1398    /// Phase 1 of external-signer upload with explicit [`Visibility`] control.
1399    ///
1400    /// Equivalent to [`Client::file_prepare_upload_with_progress`] with
1401    /// `progress: None` — see that method for details.
1402    pub async fn file_prepare_upload_with_visibility(
1403        &self,
1404        path: &Path,
1405        visibility: Visibility,
1406    ) -> Result<PreparedUpload> {
1407        self.file_prepare_upload_with_progress(path, visibility, None)
1408            .await
1409    }
1410
1411    /// Phase 1 of external-signer upload with progress events.
1412    ///
1413    /// Requires an EVM network (for contract price queries) but NOT a wallet.
1414    /// Returns a [`PreparedUpload`] containing the data map, prepared chunks,
1415    /// and a [`PaymentIntent`] that the external signer uses to construct
1416    /// and submit the on-chain payment transaction.
1417    ///
1418    /// When `visibility` is [`Visibility::Public`], the serialized `DataMap`
1419    /// is bundled into the payment batch as an additional chunk and its
1420    /// address is recorded on the returned [`PreparedUpload`]. After
1421    /// [`Client::finalize_upload`] (or `_merkle`) succeeds, that address is
1422    /// surfaced via [`FileUploadResult::data_map_address`] so the uploader
1423    /// can share a single address from which anyone can retrieve the file.
1424    ///
1425    /// When `progress` is `Some`, [`UploadEvent`]s are emitted on the channel
1426    /// during encryption ([`UploadEvent::Encrypting`] / [`UploadEvent::Encrypted`])
1427    /// and per-chunk quoting ([`UploadEvent::ChunkQuoted`]). Storage events are
1428    /// emitted later by [`Client::finalize_upload_with_progress`] /
1429    /// [`Client::finalize_upload_merkle_with_progress`].
1430    ///
1431    /// **Memory note:** Encryption uses disk spilling for bounded memory, but
1432    /// the returned [`PreparedUpload`] holds all chunk content in memory (each
1433    /// [`PreparedChunk`] contains a `Bytes` with the full chunk data). This is
1434    /// inherent to the two-phase external-signer protocol — the chunks must
1435    /// stay in memory until [`Client::finalize_upload`] stores them. For very
1436    /// large files, prefer [`Client::file_upload`] which streams directly.
1437    ///
1438    /// # Errors
1439    ///
1440    /// Returns an error if there is insufficient disk space, the file cannot
1441    /// be read, encryption fails, or quote collection fails.
1442    pub async fn file_prepare_upload_with_progress(
1443        &self,
1444        path: &Path,
1445        visibility: Visibility,
1446        progress: Option<mpsc::Sender<UploadEvent>>,
1447    ) -> Result<PreparedUpload> {
1448        debug!(
1449            "Preparing file upload for external signing (visibility={visibility:?}): {}",
1450            path.display()
1451        );
1452
1453        let file_size = std::fs::metadata(path)?.len();
1454        check_disk_space_for_spill(file_size)?;
1455
1456        let (spill, data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
1457
1458        info!(
1459            "Encrypted {} into {} chunks for external signing (spilled to disk)",
1460            path.display(),
1461            spill.len()
1462        );
1463
1464        // Read each chunk from disk and collect quotes concurrently.
1465        // Note: all PreparedChunks accumulate in memory because the external-signer
1466        // protocol requires them for finalize_upload. NOT memory-bounded for large files.
1467        let mut chunk_data: Vec<Bytes> = spill
1468            .addresses
1469            .iter()
1470            .map(|addr| spill.read_chunk(addr))
1471            .collect::<std::result::Result<Vec<_>, _>>()?;
1472
1473        // For public uploads, bundle the serialized DataMap as an extra chunk
1474        // in the same payment batch. This lets the external signer pay for
1475        // the data chunks and the DataMap chunk in one flow, and lets the
1476        // finalize step return the DataMap's chunk address as the shareable
1477        // retrieval address.
1478        let data_map_address = match visibility {
1479            Visibility::Private => None,
1480            Visibility::Public => {
1481                let serialized = rmp_serde::to_vec(&data_map).map_err(|e| {
1482                    Error::Serialization(format!("Failed to serialize DataMap: {e}"))
1483                })?;
1484                let bytes = Bytes::from(serialized);
1485                let address = compute_address(&bytes);
1486                info!(
1487                    "Public upload: bundling DataMap chunk ({} bytes) at address {}",
1488                    bytes.len(),
1489                    hex::encode(address)
1490                );
1491                chunk_data.push(bytes);
1492                Some(address)
1493            }
1494        };
1495
1496        let chunk_count = chunk_data.len();
1497
1498        if let Some(ref tx) = progress {
1499            let _ = tx
1500                .send(UploadEvent::Encrypted {
1501                    total_chunks: chunk_count,
1502                })
1503                .await;
1504        }
1505
1506        let (payment_info, already_stored_addresses) = if should_use_merkle(
1507            chunk_count,
1508            PaymentMode::Auto,
1509        ) {
1510            // Merkle path: build tree, collect candidate pools, return for external payment.
1511            info!("Using merkle batch preparation for {chunk_count} file chunks");
1512
1513            let chunk_entries: Vec<([u8; 32], u64)> = chunk_data
1514                .iter()
1515                .map(|chunk| {
1516                    let size = u64::try_from(chunk.len())
1517                        .map_err(|e| Error::InvalidData(format!("chunk size too large: {e}")))?;
1518                    Ok((compute_address(chunk), size))
1519                })
1520                .collect::<Result<Vec<_>>>()?;
1521
1522            let merkle_plan = self
1523                .plan_merkle_upload(chunk_entries, DATA_TYPE_CHUNK, progress.as_ref())
1524                .await?;
1525
1526            if merkle_plan.to_upload.is_empty() {
1527                info!("All {chunk_count} file chunks already stored; no external payment needed");
1528                (
1529                    ExternalPaymentInfo::WaveBatch {
1530                        prepared_chunks: Vec::new(),
1531                        payment_intent: PaymentIntent::from_prepared_chunks(&[]),
1532                    },
1533                    merkle_plan.already_stored,
1534                )
1535            } else {
1536                let chunk_data =
1537                    chunk_contents_for_upload_addresses(chunk_data, &merkle_plan.to_upload)?;
1538
1539                if !should_use_merkle(merkle_plan.to_upload.len(), PaymentMode::Auto) {
1540                    info!(
1541                        "{} file chunks need upload after merkle preflight; preparing wave-batch payment",
1542                        merkle_plan.to_upload.len()
1543                    );
1544                    let (payment_info, mut wave_already_stored) = self
1545                        .prepare_wave_batch_external_chunks(
1546                            chunk_data,
1547                            progress.as_ref(),
1548                            chunk_count,
1549                        )
1550                        .await?;
1551                    let mut already_stored = merkle_plan.already_stored;
1552                    already_stored.append(&mut wave_already_stored);
1553                    (payment_info, already_stored)
1554                } else {
1555                    match self
1556                        .prepare_merkle_batch_external(
1557                            &merkle_plan.to_upload,
1558                            DATA_TYPE_CHUNK,
1559                            merkle_plan.to_upload_avg_size(),
1560                        )
1561                        .await
1562                    {
1563                        Ok(prepared_batch) => {
1564                            info!(
1565                                "File prepared for external merkle signing: {} chunks, depth={} ({})",
1566                                merkle_plan.to_upload.len(),
1567                                prepared_batch.depth,
1568                                path.display()
1569                            );
1570
1571                            (
1572                                ExternalPaymentInfo::Merkle {
1573                                    prepared_batch,
1574                                    chunk_contents: chunk_data,
1575                                    chunk_addresses: merkle_plan.to_upload,
1576                                },
1577                                merkle_plan.already_stored,
1578                            )
1579                        }
1580                        Err(Error::InsufficientPeers(ref msg)) => {
1581                            info!(
1582                                "External merkle preparation needs more peers ({msg}); preparing wave-batch payment"
1583                            );
1584                            let (payment_info, mut wave_already_stored) = self
1585                                .prepare_wave_batch_external_chunks(
1586                                    chunk_data,
1587                                    progress.as_ref(),
1588                                    chunk_count,
1589                                )
1590                                .await?;
1591                            let mut already_stored = merkle_plan.already_stored;
1592                            already_stored.append(&mut wave_already_stored);
1593                            (payment_info, already_stored)
1594                        }
1595                        Err(e) => return Err(e),
1596                    }
1597                }
1598            }
1599        } else {
1600            self.prepare_wave_batch_external_chunks(chunk_data, progress.as_ref(), chunk_count)
1601                .await?
1602        };
1603
1604        // Surface the "DataMap chunk was already on the network" case
1605        // so debugging "why is data_map_address set but no storage cost
1606        // appears for it?" doesn't require reading the source. See the
1607        // `data_map_address` doc comment for why this is still a valid
1608        // `Some(addr)` outcome.
1609        if let Some(addr) = data_map_address {
1610            let data_map_needs_payment = match &payment_info {
1611                ExternalPaymentInfo::WaveBatch {
1612                    prepared_chunks, ..
1613                } => prepared_chunks.iter().any(|c| c.address == addr),
1614                ExternalPaymentInfo::Merkle {
1615                    chunk_addresses, ..
1616                } => chunk_addresses.contains(&addr),
1617            };
1618            if !data_map_needs_payment {
1619                info!(
1620                    "Public upload: DataMap chunk {} was already stored \
1621                     on the network — address is retrievable without a \
1622                     new payment",
1623                    hex::encode(addr)
1624                );
1625            }
1626        }
1627
1628        Ok(PreparedUpload {
1629            data_map,
1630            payment_info,
1631            data_map_address,
1632            already_stored_addresses,
1633            total_chunks: chunk_count,
1634        })
1635    }
1636
1637    async fn prepare_wave_batch_external_chunks(
1638        &self,
1639        chunk_data: Vec<Bytes>,
1640        progress: Option<&mpsc::Sender<UploadEvent>>,
1641        progress_total: usize,
1642    ) -> Result<(ExternalPaymentInfo, Vec<[u8; 32]>)> {
1643        let chunk_count = chunk_data.len();
1644        let chunks_with_addr: Vec<(Bytes, [u8; 32])> = chunk_data
1645            .into_iter()
1646            .map(|content| {
1647                let address = compute_address(&content);
1648                (content, address)
1649            })
1650            .collect();
1651
1652        // Wave-batch path: collect quotes per chunk concurrently, emitting
1653        // a `ChunkQuoted` event after each completion so callers can drive
1654        // a progress bar through the slow quote phase.
1655        let quote_limiter = self.controller().quote.clone();
1656        let quote_concurrency = quote_limiter.current().min(chunk_count.max(1));
1657        let mut quote_stream = stream::iter(chunks_with_addr)
1658            .map(|(content, address)| {
1659                let limiter = quote_limiter.clone();
1660                async move {
1661                    let result = observe_op(
1662                        &limiter,
1663                        || async move { self.prepare_chunk_payment(content).await },
1664                        classify_error,
1665                    )
1666                    .await;
1667                    (address, result)
1668                }
1669            })
1670            .buffer_unordered(quote_concurrency);
1671
1672        let mut prepared_chunks = Vec::with_capacity(chunk_count);
1673        let mut already_stored = Vec::new();
1674        let mut quoted = 0usize;
1675        while let Some((address, result)) = quote_stream.next().await {
1676            match result? {
1677                Some(prepared) => prepared_chunks.push(prepared),
1678                None => already_stored.push(address),
1679            }
1680            quoted += 1;
1681            if let Some(tx) = progress {
1682                let _ = tx.try_send(UploadEvent::ChunkQuoted {
1683                    quoted,
1684                    total: progress_total,
1685                });
1686            }
1687        }
1688
1689        let payment_intent = PaymentIntent::from_prepared_chunks(&prepared_chunks);
1690        info!(
1691            "Prepared external wave-batch payment: {} chunks, {} already stored, total {} atto",
1692            prepared_chunks.len(),
1693            already_stored.len(),
1694            payment_intent.total_amount,
1695        );
1696
1697        Ok((
1698            ExternalPaymentInfo::WaveBatch {
1699                prepared_chunks,
1700                payment_intent,
1701            },
1702            already_stored,
1703        ))
1704    }
1705
1706    /// Phase 2 of external-signer upload (wave-batch): finalize with externally-signed tx hashes.
1707    ///
1708    /// Takes a [`PreparedUpload`] that used wave-batch payment and a map
1709    /// of `quote_hash -> tx_hash` provided by the external signer after on-chain
1710    /// payment. Builds payment proofs and stores chunks on the network.
1711    ///
1712    /// # Errors
1713    ///
1714    /// Returns an error if the prepared upload used merkle payment (use
1715    /// [`Client::finalize_upload_merkle`] instead), proof construction fails,
1716    /// or any chunk cannot be stored.
1717    pub async fn finalize_upload(
1718        &self,
1719        prepared: PreparedUpload,
1720        tx_hash_map: &HashMap<QuoteHash, TxHash>,
1721    ) -> Result<FileUploadResult> {
1722        self.finalize_upload_with_progress(prepared, tx_hash_map, None)
1723            .await
1724    }
1725
1726    /// Phase 2 of external-signer upload (wave-batch) with progress events.
1727    ///
1728    /// Same as [`Client::finalize_upload`] but emits [`UploadEvent::ChunkStored`]
1729    /// on the provided channel as each chunk is successfully stored.
1730    ///
1731    /// # Errors
1732    ///
1733    /// Same as [`Client::finalize_upload`].
1734    pub async fn finalize_upload_with_progress(
1735        &self,
1736        prepared: PreparedUpload,
1737        tx_hash_map: &HashMap<QuoteHash, TxHash>,
1738        progress: Option<mpsc::Sender<UploadEvent>>,
1739    ) -> Result<FileUploadResult> {
1740        let data_map_address = prepared.data_map_address;
1741        let already_stored_addresses = prepared.already_stored_addresses;
1742        let already_stored_count = already_stored_addresses.len();
1743        let total_chunks = prepared.total_chunks;
1744        match prepared.payment_info {
1745            ExternalPaymentInfo::WaveBatch {
1746                prepared_chunks,
1747                payment_intent,
1748            } => {
1749                let paid_chunks = finalize_batch_payment(prepared_chunks, tx_hash_map)?;
1750                let wave_result = self
1751                    .store_paid_chunks_with_events(
1752                        paid_chunks,
1753                        progress.as_ref(),
1754                        already_stored_count,
1755                        total_chunks,
1756                    )
1757                    .await;
1758                if !wave_result.failed.is_empty() {
1759                    let failed_count = wave_result.failed.len();
1760                    let stored_count = already_stored_count + wave_result.stored.len();
1761                    let mut stored = already_stored_addresses;
1762                    stored.extend(wave_result.stored);
1763                    return Err(Error::PartialUpload {
1764                        stored,
1765                        stored_count,
1766                        failed: wave_result.failed,
1767                        failed_count,
1768                        total_chunks,
1769                        // Report the storage spend known from the payment intent
1770                        // the external signer was handed. Gas is paid by the
1771                        // signer out-of-band, so it stays unknown (0).
1772                        spend: Box::new(PartialUploadSpend {
1773                            storage_cost_atto: payment_intent.total_amount.to_string(),
1774                            gas_cost_wei: 0,
1775                        }),
1776                        reason: "finalize_upload: chunk storage failed after retries".into(),
1777                    });
1778                }
1779                let chunks_stored = already_stored_count + wave_result.stored.len();
1780
1781                info!("External-signer upload finalized: {chunks_stored} chunks stored");
1782
1783                let mut stats = WaveAggregateStats::default();
1784                stats.absorb(&wave_result);
1785
1786                Ok(FileUploadResult {
1787                    data_map: prepared.data_map,
1788                    chunks_stored,
1789                    chunks_failed: 0,
1790                    total_chunks,
1791                    payment_mode_used: PaymentMode::Single,
1792                    // Storage spend is known from the payment intent; gas is
1793                    // paid by the external signer out-of-band (unknown here).
1794                    storage_cost_atto: payment_intent.total_amount.to_string(),
1795                    gas_cost_wei: 0,
1796                    data_map_address,
1797                    chunk_attempts_total: stats.chunk_attempts_total,
1798                    store_durations_ms: stats.store_durations_ms,
1799                    retries_histogram: stats.retries_histogram,
1800                })
1801            }
1802            ExternalPaymentInfo::Merkle { .. } => Err(Error::Payment(
1803                "Cannot finalize merkle upload with wave-batch tx hashes. \
1804                 Use finalize_upload_merkle() instead."
1805                    .to_string(),
1806            )),
1807        }
1808    }
1809
1810    /// Phase 2 of external-signer upload (merkle): finalize with winner pool hash.
1811    ///
1812    /// Takes a [`PreparedUpload`] that used merkle payment and the `winner_pool_hash`
1813    /// returned by the on-chain merkle payment transaction. Generates proofs and
1814    /// stores chunks on the network.
1815    ///
1816    /// # Errors
1817    ///
1818    /// Returns an error if the prepared upload used wave-batch payment (use
1819    /// [`Client::finalize_upload`] instead), proof generation fails,
1820    /// or any chunk cannot be stored.
1821    pub async fn finalize_upload_merkle(
1822        &self,
1823        prepared: PreparedUpload,
1824        winner_pool_hash: [u8; 32],
1825    ) -> Result<FileUploadResult> {
1826        self.finalize_upload_merkle_with_progress(prepared, winner_pool_hash, None)
1827            .await
1828    }
1829
1830    /// Phase 2 of external-signer upload (merkle) with progress events.
1831    ///
1832    /// Same as [`Client::finalize_upload_merkle`] but emits [`UploadEvent::ChunkStored`]
1833    /// on the provided channel as each chunk is successfully stored.
1834    ///
1835    /// # Errors
1836    ///
1837    /// Same as [`Client::finalize_upload_merkle`].
1838    pub async fn finalize_upload_merkle_with_progress(
1839        &self,
1840        prepared: PreparedUpload,
1841        winner_pool_hash: [u8; 32],
1842        progress: Option<mpsc::Sender<UploadEvent>>,
1843    ) -> Result<FileUploadResult> {
1844        let data_map_address = prepared.data_map_address;
1845        let already_stored_count = prepared.already_stored_addresses.len();
1846        let total_chunks = prepared.total_chunks;
1847        match prepared.payment_info {
1848            ExternalPaymentInfo::Merkle {
1849                prepared_batch,
1850                chunk_contents,
1851                chunk_addresses,
1852            } => {
1853                let batch_result = finalize_merkle_batch(prepared_batch, winner_pool_hash)?;
1854                let outcome = self
1855                    .merkle_upload_chunks(
1856                        chunk_contents,
1857                        chunk_addresses,
1858                        &batch_result,
1859                        progress.as_ref(),
1860                        already_stored_count,
1861                        total_chunks,
1862                    )
1863                    .await?;
1864
1865                info!(
1866                    "External-signer merkle upload finalized: {} chunks stored, {} failed",
1867                    outcome.stored, outcome.failed
1868                );
1869
1870                Ok(FileUploadResult {
1871                    data_map: prepared.data_map,
1872                    chunks_stored: outcome.stored,
1873                    chunks_failed: outcome.failed,
1874                    total_chunks,
1875                    payment_mode_used: PaymentMode::Merkle,
1876                    storage_cost_atto: "0".into(),
1877                    gas_cost_wei: 0,
1878                    data_map_address,
1879                    chunk_attempts_total: outcome.stats.chunk_attempts_total,
1880                    store_durations_ms: outcome.stats.store_durations_ms,
1881                    retries_histogram: outcome.stats.retries_histogram,
1882                })
1883            }
1884            ExternalPaymentInfo::WaveBatch { .. } => Err(Error::Payment(
1885                "Cannot finalize wave-batch upload with merkle winner hash. \
1886                 Use finalize_upload() instead."
1887                    .to_string(),
1888            )),
1889        }
1890    }
1891
1892    /// Upload a file with a specific payment mode.
1893    ///
1894    /// Before encryption, checks that the temp directory has enough free
1895    /// disk space for the spilled chunks (~1.1× source file size).
1896    ///
1897    /// Encrypted chunks are spilled to a temp directory during encryption
1898    /// so that only their 32-byte addresses stay in memory. At upload time,
1899    /// chunks are read back one wave at a time (~64 × 4 MB ≈ 256 MB peak).
1900    ///
1901    /// # Errors
1902    ///
1903    /// Returns an error if there is insufficient disk space, the file cannot
1904    /// be read, encryption fails, or any chunk cannot be stored.
1905    #[allow(clippy::too_many_lines)]
1906    pub async fn file_upload_with_mode(
1907        &self,
1908        path: &Path,
1909        mode: PaymentMode,
1910    ) -> Result<FileUploadResult> {
1911        self.file_upload_with_progress(path, mode, None).await
1912    }
1913
1914    /// Upload a file publicly, storing the serialized [`DataMap`] as part of
1915    /// the same upload payment batch.
1916    ///
1917    /// The returned [`FileUploadResult::data_map_address`] can be shared for
1918    /// public downloads via [`Client::data_map_fetch`].
1919    #[allow(clippy::too_many_lines)]
1920    pub async fn file_upload_public_with_mode(
1921        &self,
1922        path: &Path,
1923        mode: PaymentMode,
1924    ) -> Result<FileUploadResult> {
1925        self.file_upload_with_visibility_and_progress(path, mode, Visibility::Public, None)
1926            .await
1927    }
1928
1929    /// Upload a file with progress events sent to the given channel.
1930    ///
1931    /// Same as [`Client::file_upload_with_mode`] but sends [`UploadEvent`]s to the
1932    /// provided channel for UI progress feedback.
1933    #[allow(clippy::too_many_lines)]
1934    pub async fn file_upload_with_progress(
1935        &self,
1936        path: &Path,
1937        mode: PaymentMode,
1938        progress: Option<mpsc::Sender<UploadEvent>>,
1939    ) -> Result<FileUploadResult> {
1940        self.file_upload_with_visibility_and_progress(path, mode, Visibility::Private, progress)
1941            .await
1942    }
1943
1944    /// Public file upload with progress events.
1945    ///
1946    /// Same as [`Client::file_upload_public_with_mode`] but sends
1947    /// [`UploadEvent`]s to the provided channel for UI progress feedback.
1948    #[allow(clippy::too_many_lines)]
1949    pub async fn file_upload_public_with_progress(
1950        &self,
1951        path: &Path,
1952        mode: PaymentMode,
1953        progress: Option<mpsc::Sender<UploadEvent>>,
1954    ) -> Result<FileUploadResult> {
1955        self.file_upload_with_visibility_and_progress(path, mode, Visibility::Public, progress)
1956            .await
1957    }
1958
1959    #[allow(clippy::too_many_lines)]
1960    async fn file_upload_with_visibility_and_progress(
1961        &self,
1962        path: &Path,
1963        mode: PaymentMode,
1964        visibility: Visibility,
1965        progress: Option<mpsc::Sender<UploadEvent>>,
1966    ) -> Result<FileUploadResult> {
1967        debug!(
1968            "Streaming file upload with mode {mode:?}, visibility {visibility:?}: {}",
1969            path.display()
1970        );
1971
1972        // Pre-flight: verify enough temp disk space for the chunk spill.
1973        let file_size = std::fs::metadata(path)?.len();
1974        check_disk_space_for_spill(file_size)?;
1975
1976        // Phase 1: Encrypt file and spill chunks to temp directory.
1977        // Only 32-byte addresses stay in memory — chunk data lives on disk.
1978        let (mut spill, data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
1979
1980        let data_map_address = match visibility {
1981            Visibility::Private => None,
1982            Visibility::Public => {
1983                let serialized = rmp_serde::to_vec(&data_map).map_err(|e| {
1984                    Error::Serialization(format!("Failed to serialize DataMap: {e}"))
1985                })?;
1986                let address = compute_address(&serialized);
1987                info!(
1988                    "Public upload: adding DataMap chunk ({} bytes) at address {} to payment batch",
1989                    serialized.len(),
1990                    hex::encode(address)
1991                );
1992                spill.push(&serialized)?;
1993                Some(address)
1994            }
1995        };
1996
1997        let chunk_count = spill.len();
1998        info!(
1999            "Encrypted {} into {chunk_count} chunks (spilled to disk)",
2000            path.display()
2001        );
2002        if let Some(ref tx) = progress {
2003            let _ = tx
2004                .send(UploadEvent::Encrypted {
2005                    total_chunks: chunk_count,
2006                })
2007                .await;
2008        }
2009
2010        // Phase 2: Decide payment mode and upload in waves from disk.
2011        //
2012        // For the merkle path, attempt to resume from a cached
2013        // receipt before paying again. The cache is keyed by the
2014        // CANONICAL source path so `./foo`, `/abs/foo`, and any
2015        // symlink alias all resolve to the same cache entry — a
2016        // crash-and-retry from a different cwd or via a different
2017        // alias still hits the receipt. Canonicalize may fail (the
2018        // file could have been moved between phase 1 and here); we
2019        // fall back to the display string in that case, which
2020        // preserves pre-fix behaviour rather than dropping cache
2021        // resume entirely.
2022        let file_path_key = std::fs::canonicalize(path)
2023            .map(|p| p.display().to_string())
2024            .unwrap_or_else(|_| path.display().to_string());
2025        let (chunks_stored, actual_mode, storage_cost_atto, gas_cost_wei, stats) = if self
2026            .should_use_merkle(chunk_count, mode)
2027        {
2028            info!("Using merkle batch payment for {chunk_count} file chunks");
2029
2030            let cached_merkle =
2031                crate::data::client::cached_merkle::try_load_for_file(&file_path_key)
2032                    .map(|(_cache_path, cached)| cached);
2033
2034            let merkle_plan = match self
2035                .plan_merkle_upload(spill.chunk_entries()?, DATA_TYPE_CHUNK, progress.as_ref())
2036                .await
2037            {
2038                Ok(plan) => plan,
2039                Err(e) => {
2040                    if let Some(cached) = cached_merkle
2041                        .as_ref()
2042                        .filter(|cached| cached_merkle_covers_addresses(cached, &spill.addresses))
2043                    {
2044                        info!(
2045                            "Merkle preflight failed ({e}); \
2046                             resuming with cached merkle proofs"
2047                        );
2048                        let (stored, sc, gc, stats) = self
2049                            .upload_merkle_from_spill(
2050                                &spill,
2051                                &spill.addresses,
2052                                cached,
2053                                &[],
2054                                progress.as_ref(),
2055                            )
2056                            .await?;
2057                        crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
2058                        return Ok(FileUploadResult {
2059                            data_map,
2060                            chunks_stored: stored,
2061                            chunks_failed: 0,
2062                            total_chunks: chunk_count,
2063                            payment_mode_used: PaymentMode::Merkle,
2064                            storage_cost_atto: sc,
2065                            gas_cost_wei: gc,
2066                            data_map_address,
2067                            chunk_attempts_total: stats.chunk_attempts_total,
2068                            store_durations_ms: stats.store_durations_ms,
2069                            retries_histogram: stats.retries_histogram,
2070                        });
2071                    }
2072                    match &e {
2073                        Error::InsufficientPeers(msg) if mode == PaymentMode::Auto => {
2074                            info!(
2075                                "Merkle preflight needs more peers ({msg}), \
2076                                 falling back to wave-batch"
2077                            );
2078                            let (stored, sc, gc, fb_stats) = self
2079                                .upload_waves_single(
2080                                    &spill,
2081                                    progress.as_ref(),
2082                                    Some(&file_path_key),
2083                                )
2084                                .await?;
2085                            crate::data::client::cached_single::try_delete_for_file(&file_path_key);
2086                            return Ok(FileUploadResult {
2087                                data_map,
2088                                chunks_stored: stored,
2089                                chunks_failed: 0,
2090                                total_chunks: chunk_count,
2091                                payment_mode_used: PaymentMode::Single,
2092                                storage_cost_atto: sc,
2093                                gas_cost_wei: gc,
2094                                data_map_address,
2095                                chunk_attempts_total: fb_stats.chunk_attempts_total,
2096                                store_durations_ms: fb_stats.store_durations_ms,
2097                                retries_histogram: fb_stats.retries_histogram,
2098                            });
2099                        }
2100                        _ => return Err(e),
2101                    }
2102                }
2103            };
2104
2105            if merkle_plan.to_upload.is_empty() {
2106                info!("All {chunk_count} merkle chunks already stored; skipping payment");
2107                crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
2108                crate::data::client::cached_single::try_delete_for_file(&file_path_key);
2109                (
2110                    chunk_count,
2111                    PaymentMode::Merkle,
2112                    "0".to_string(),
2113                    0,
2114                    WaveAggregateStats::default(),
2115                )
2116            } else if !self.should_use_merkle(merkle_plan.to_upload.len(), mode) {
2117                let remaining_chunks = merkle_plan.to_upload.len();
2118                if let Some(cached) = cached_merkle
2119                    .as_ref()
2120                    .filter(|cached| cached_merkle_covers_addresses(cached, &merkle_plan.to_upload))
2121                {
2122                    info!(
2123                        "{remaining_chunks} chunks remain below merkle threshold; \
2124                         reusing cached merkle proofs"
2125                    );
2126                    let (stored, sc, gc, stats) = self
2127                        .upload_merkle_from_spill(
2128                            &spill,
2129                            &merkle_plan.to_upload,
2130                            cached,
2131                            &merkle_plan.already_stored,
2132                            progress.as_ref(),
2133                        )
2134                        .await?;
2135                    crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
2136                    (stored, PaymentMode::Merkle, sc, gc, stats)
2137                } else {
2138                    if cached_merkle.is_some() {
2139                        info!(
2140                            "{remaining_chunks} chunks remain below merkle threshold, \
2141                             and the cached merkle receipt does not cover them. \
2142                             Discarding cache and using single-node payment."
2143                        );
2144                        crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
2145                    } else {
2146                        info!(
2147                            "{remaining_chunks} chunks need upload after merkle preflight; \
2148                             using single-node payment"
2149                        );
2150                    }
2151                    let (stored, sc, gc, stats) = self
2152                        .upload_spill_addresses_single(
2153                            &spill,
2154                            &merkle_plan.to_upload,
2155                            progress.as_ref(),
2156                            &merkle_plan.already_stored,
2157                            chunk_count,
2158                            Some(&file_path_key),
2159                        )
2160                        .await?;
2161                    crate::data::client::cached_single::try_delete_for_file(&file_path_key);
2162                    (stored, PaymentMode::Single, sc, gc, stats)
2163                }
2164            } else {
2165                let batch_result = if let Some(cached) = cached_merkle.as_ref() {
2166                    // Validate the cache against the chunks that still need
2167                    // storage. Extra proofs are harmless: a previous attempt
2168                    // may have paid for chunks that are now already stored.
2169                    if cached_merkle_covers_addresses(cached, &merkle_plan.to_upload) {
2170                        info!(
2171                            "Skipping merkle payment phase; resuming with \
2172                             cached proofs for {} remaining chunks",
2173                            merkle_plan.to_upload.len()
2174                        );
2175                        Ok(cached.clone())
2176                    } else {
2177                        info!(
2178                            "Cached merkle receipt does not cover the current \
2179                             remaining chunks (cached={}, remaining={}). \
2180                             Discarding cache and paying fresh.",
2181                            cached.proofs.len(),
2182                            merkle_plan.to_upload.len()
2183                        );
2184                        crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
2185                        self.pay_for_merkle_batch(
2186                            &merkle_plan.to_upload,
2187                            DATA_TYPE_CHUNK,
2188                            merkle_plan.to_upload_avg_size(),
2189                        )
2190                        .await
2191                        .inspect(|result| {
2192                            crate::data::client::cached_merkle::try_save(&file_path_key, result);
2193                        })
2194                    }
2195                } else {
2196                    self.pay_for_merkle_batch(
2197                        &merkle_plan.to_upload,
2198                        DATA_TYPE_CHUNK,
2199                        merkle_plan.to_upload_avg_size(),
2200                    )
2201                    .await
2202                    .inspect(|result| {
2203                        // Save BEFORE the store phase so a crash
2204                        // mid-upload leaves a resumable receipt.
2205                        crate::data::client::cached_merkle::try_save(&file_path_key, result);
2206                    })
2207                };
2208
2209                let batch_result = match batch_result {
2210                    Ok(result) => result,
2211                    Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => {
2212                        info!("Merkle needs more peers ({msg}), falling back to wave-batch");
2213                        let (stored, sc, gc, fb_stats) = self
2214                            .upload_spill_addresses_single(
2215                                &spill,
2216                                &merkle_plan.to_upload,
2217                                progress.as_ref(),
2218                                &merkle_plan.already_stored,
2219                                chunk_count,
2220                                Some(&file_path_key),
2221                            )
2222                            .await?;
2223                        crate::data::client::cached_single::try_delete_for_file(&file_path_key);
2224                        return Ok(FileUploadResult {
2225                            data_map,
2226                            chunks_stored: stored,
2227                            chunks_failed: 0,
2228                            total_chunks: chunk_count,
2229                            payment_mode_used: PaymentMode::Single,
2230                            storage_cost_atto: sc,
2231                            gas_cost_wei: gc,
2232                            data_map_address,
2233                            chunk_attempts_total: fb_stats.chunk_attempts_total,
2234                            store_durations_ms: fb_stats.store_durations_ms,
2235                            retries_histogram: fb_stats.retries_histogram,
2236                        });
2237                    }
2238                    Err(e) => return Err(e),
2239                };
2240
2241                let (stored, sc, gc, stats) = self
2242                    .upload_merkle_from_spill(
2243                        &spill,
2244                        &merkle_plan.to_upload,
2245                        &batch_result,
2246                        &merkle_plan.already_stored,
2247                        progress.as_ref(),
2248                    )
2249                    .await?;
2250                // Upload succeeded end-to-end; the cached receipt is
2251                // no longer needed.
2252                crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
2253                (stored, PaymentMode::Merkle, sc, gc, stats)
2254            }
2255        } else {
2256            let (stored, sc, gc, stats) = self
2257                .upload_waves_single(&spill, progress.as_ref(), Some(&file_path_key))
2258                .await?;
2259            // Full file success: drop any cached single-node receipt.
2260            crate::data::client::cached_single::try_delete_for_file(&file_path_key);
2261            (stored, PaymentMode::Single, sc, gc, stats)
2262        };
2263
2264        info!(
2265            "File uploaded with {actual_mode:?}: {chunks_stored} chunks stored ({})",
2266            path.display()
2267        );
2268
2269        Ok(FileUploadResult {
2270            data_map,
2271            chunks_stored,
2272            chunks_failed: 0,
2273            total_chunks: chunk_count,
2274            payment_mode_used: actual_mode,
2275            storage_cost_atto,
2276            gas_cost_wei,
2277            data_map_address,
2278            chunk_attempts_total: stats.chunk_attempts_total,
2279            store_durations_ms: stats.store_durations_ms,
2280            retries_histogram: stats.retries_histogram,
2281        })
2282    }
2283
2284    /// Encrypt a file and spill chunks to a temp directory.
2285    ///
2286    /// Logs progress every 100 chunks so users get feedback during
2287    /// multi-GB encryptions.
2288    ///
2289    /// Returns the spill buffer (addresses on disk) and the `DataMap`.
2290    async fn encrypt_file_to_spill(
2291        &self,
2292        path: &Path,
2293        progress: Option<&mpsc::Sender<UploadEvent>>,
2294    ) -> Result<(ChunkSpill, DataMap)> {
2295        let (mut chunk_rx, datamap_rx, handle) = spawn_file_encryption(path.to_path_buf())?;
2296
2297        let mut spill = ChunkSpill::new()?;
2298        while let Some(content) = chunk_rx.recv().await {
2299            spill.push(&content)?;
2300            let chunks_done = spill.len();
2301            if let Some(tx) = progress {
2302                if chunks_done.is_multiple_of(10) {
2303                    let _ = tx.send(UploadEvent::Encrypting { chunks_done }).await;
2304                }
2305            }
2306            if chunks_done % 100 == 0 {
2307                let mb = spill.total_bytes() / (1024 * 1024);
2308                info!(
2309                    "Encryption progress: {chunks_done} chunks spilled ({mb} MB) — {}",
2310                    path.display()
2311                );
2312            }
2313        }
2314
2315        // Await encryption completion to catch errors before paying.
2316        handle
2317            .await
2318            .map_err(|e| Error::Encryption(format!("encryption task panicked: {e}")))?
2319            .map_err(|e| Error::Encryption(format!("encryption failed: {e}")))?;
2320
2321        let data_map = datamap_rx
2322            .await
2323            .map_err(|_| Error::Encryption("no DataMap from encryption thread".to_string()))?;
2324
2325        Ok((spill, data_map))
2326    }
2327
2328    /// Upload chunks from a spill using wave-based per-chunk (single) payments.
2329    ///
2330    /// Reads one wave at a time from disk, prepares quotes, pays, and stores.
2331    /// Peak memory: ~`UPLOAD_WAVE_SIZE × MAX_CHUNK_SIZE` (~256 MB).
2332    ///
2333    /// Returns `(chunks_stored, storage_cost_atto, gas_cost_wei)`.
2334    async fn upload_waves_single(
2335        &self,
2336        spill: &ChunkSpill,
2337        progress: Option<&mpsc::Sender<UploadEvent>>,
2338        resume_key: Option<&str>,
2339    ) -> Result<(usize, String, u128, WaveAggregateStats)> {
2340        self.upload_spill_addresses_single(
2341            spill,
2342            &spill.addresses,
2343            progress,
2344            &[],
2345            spill.len(),
2346            resume_key,
2347        )
2348        .await
2349    }
2350
2351    async fn upload_spill_addresses_single(
2352        &self,
2353        spill: &ChunkSpill,
2354        addresses: &[[u8; 32]],
2355        progress: Option<&mpsc::Sender<UploadEvent>>,
2356        already_stored_addresses: &[[u8; 32]],
2357        total_chunks: usize,
2358        resume_key: Option<&str>,
2359    ) -> Result<(usize, String, u128, WaveAggregateStats)> {
2360        let mut total_stored = already_stored_addresses.len();
2361        let mut total_storage = Amount::ZERO;
2362        let mut total_gas: u128 = 0;
2363        let mut agg_stats = WaveAggregateStats::default();
2364        // A wave whose chunks fall short of quorum after retries must not abort
2365        // the file: its failures are accumulated here and surfaced as a single
2366        // `PartialUpload` only after every wave has been attempted, mirroring
2367        // `upload_merkle_from_spill`. Aborting on the first failed wave (the old `?`)
2368        // discarded all later waves' progress — already self-encrypted, spilled,
2369        // and in some cases already paid for — converting high per-chunk success
2370        // into 0% per-file success.
2371        // Seed with the addresses a preflight already confirmed stored (e.g.
2372        // the merkle-fallback path passes `merkle_plan.already_stored`), so a
2373        // returned `PartialUpload.stored` lists every stored chunk and
2374        // `stored_count == stored.len()` holds for programmatic callers.
2375        let mut stored_addresses: Vec<[u8; 32]> = already_stored_addresses.to_vec();
2376        let mut failed: Vec<([u8; 32], String)> = Vec::new();
2377        let waves: Vec<&[[u8; 32]]> = addresses.chunks(UPLOAD_WAVE_SIZE).collect();
2378        let wave_count = waves.len();
2379
2380        // Unconditional breadcrumb: lets a clean run confirm the continue-on-
2381        // partial single-node path is in effect (the old path aborted the file
2382        // on the first failed wave instead of continuing across all waves).
2383        info!(
2384            "single-node upload: {} chunk(s) in {wave_count} wave(s) (continue-on-partial)",
2385            addresses.len()
2386        );
2387
2388        for (wave_idx, wave_addrs) in waves.into_iter().enumerate() {
2389            let wave_num = wave_idx + 1;
2390            let wave_data: Vec<Bytes> = wave_addrs
2391                .iter()
2392                .map(|addr| spill.read_chunk(addr))
2393                .collect::<Result<Vec<_>>>()?;
2394
2395            info!(
2396                "Wave {wave_num}/{wave_count}: quoting {} chunks — {total_stored}/{total_chunks} stored so far",
2397                wave_data.len()
2398            );
2399            if let Some(tx) = progress {
2400                let _ = tx
2401                    .send(UploadEvent::QuotingChunks {
2402                        wave: wave_num,
2403                        total_waves: wave_count,
2404                        chunks_in_wave: wave_data.len(),
2405                    })
2406                    .await;
2407            }
2408            // Fold this wave's result. A quorum shortfall (`PartialUpload`) is
2409            // recoverable and its parts are returned to be recorded here;
2410            // genuinely fatal errors propagate via `?` and abort the file, as in
2411            // `upload_merkle_from_spill`.
2412            let outcome = fold_single_wave(
2413                self.batch_upload_chunks_with_events(
2414                    wave_data,
2415                    progress,
2416                    total_stored,
2417                    total_chunks,
2418                    resume_key,
2419                )
2420                .await,
2421            )?;
2422
2423            if !outcome.failed.is_empty() {
2424                warn!(
2425                    "Wave {wave_num}/{wave_count}: {} chunk(s) failed to store after retries; \
2426                     continuing with remaining waves",
2427                    outcome.failed.len()
2428                );
2429            }
2430
2431            total_stored += outcome.stored.len();
2432            stored_addresses.extend(outcome.stored);
2433            failed.extend(outcome.failed);
2434            total_storage += outcome.storage_atto;
2435            total_gas = total_gas.saturating_add(outcome.gas_wei);
2436            // Merge per-wave stats (a quorum-short wave contributes none, since
2437            // `PartialUpload` carries no stats).
2438            agg_stats.chunk_attempts_total = agg_stats
2439                .chunk_attempts_total
2440                .saturating_add(outcome.stats.chunk_attempts_total);
2441            agg_stats
2442                .store_durations_ms
2443                .extend(outcome.stats.store_durations_ms);
2444            for (slot, count) in agg_stats
2445                .retries_histogram
2446                .iter_mut()
2447                .zip(outcome.stats.retries_histogram.iter())
2448            {
2449                *slot = slot.saturating_add(*count);
2450            }
2451        }
2452
2453        // Any chunk still failed after every wave was attempted means the file
2454        // is not fully stored — surface it as `PartialUpload` (never silently
2455        // succeed with missing chunks), carrying the real on-chain spend.
2456        if !failed.is_empty() {
2457            let failed_count = failed.len();
2458            warn!(
2459                "single-node upload incomplete: {failed_count}/{total_chunks} chunks failed after retries"
2460            );
2461            return Err(Error::PartialUpload {
2462                stored: stored_addresses,
2463                stored_count: total_stored,
2464                failed,
2465                failed_count,
2466                total_chunks,
2467                spend: Box::new(PartialUploadSpend {
2468                    storage_cost_atto: total_storage.to_string(),
2469                    gas_cost_wei: total_gas,
2470                }),
2471                reason: format!("{failed_count} chunk(s) failed to store after retries"),
2472            });
2473        }
2474
2475        Ok((
2476            total_stored,
2477            total_storage.to_string(),
2478            total_gas,
2479            agg_stats,
2480        ))
2481    }
2482
2483    /// Upload chunks from a spill using pre-computed merkle proofs.
2484    ///
2485    /// Stores the whole file as a **single cap-bounded fan-out** — not in fixed
2486    /// waves. The store concurrency limiter is the only throttle: `store_one`
2487    /// reads each chunk's body from the on-disk spill on demand, so at most
2488    /// `store_cap` (≤ 64) bodies are ever resident, giving the same
2489    /// `~store_cap × MAX_CHUNK_SIZE` peak-memory bound the old 64-chunk waves
2490    /// gave — but with **no wave barrier**, so a slow straggler (e.g. a chunk
2491    /// whose close-group peers are stale relayed addresses that take minutes to
2492    /// revalidate) no longer stalls the rest of the file behind it.
2493    ///
2494    /// A chunk that is transiently short of quorum (`InsufficientPeers` /
2495    /// `CloseGroupShortfall` / `RemotePut`) does **not** abort the file, nor
2496    /// block the pass: the store pass is a **single attempt** (no in-pass
2497    /// backoff), and quorum-short chunks are collected into a deferred set. After
2498    /// the pass, [`merkle_deferred_retry`] retries that set in concurrent rounds
2499    /// ([`DEFERRED_ROUND_DELAYS_SECS`] delays), re-reading each body from the
2500    /// spill and reusing its proof. Non-quorum errors (e.g. a missing proof)
2501    /// stay fatal and abort immediately.
2502    ///
2503    /// Returns `(chunks_stored, storage_cost_atto, gas_cost_wei)` on success.
2504    /// Costs come from the `batch_result` which was populated during payment.
2505    ///
2506    /// # Errors
2507    ///
2508    /// Returns [`Error::PartialUpload`] if any chunk is still short of quorum
2509    /// after the store pass and every deferred round (other chunks remain
2510    /// stored), or the underlying error for a non-quorum failure.
2511    async fn upload_merkle_from_spill(
2512        &self,
2513        spill: &ChunkSpill,
2514        addresses: &[[u8; 32]],
2515        batch_result: &MerkleBatchPaymentResult,
2516        already_stored_addresses: &[[u8; 32]],
2517        progress: Option<&mpsc::Sender<UploadEvent>>,
2518    ) -> Result<(usize, String, u128, WaveAggregateStats)> {
2519        let mut total_stored = already_stored_addresses.len();
2520        let total_chunks = total_stored + addresses.len();
2521        let mut stored_addresses: Vec<[u8; 32]> = already_stored_addresses.to_vec();
2522        let mut failed: Vec<([u8; 32], String)> = Vec::new();
2523        let mut agg_stats = WaveAggregateStats::default();
2524
2525        // Chunks without a merkle proof were never paid for: a partial
2526        // `pay_for_merkle_multi_batch` result carries proofs only for the
2527        // sub-batches whose on-chain payment succeeded. Such a chunk cannot be
2528        // stored, so record it as failed (surfaced via `PartialUpload` once the
2529        // storable chunks have been attempted) rather than letting its
2530        // "missing proof" error abort the whole file and discard every other
2531        // chunk's progress.
2532        let (to_store, missing_proof) =
2533            partition_addresses_by_proof(addresses, &batch_result.proofs);
2534        if !missing_proof.is_empty() {
2535            warn!(
2536                "{} chunk(s) lack a merkle proof (partial payment); reporting them as failed",
2537                missing_proof.len()
2538            );
2539            for addr in &missing_proof {
2540                failed.push((
2541                    *addr,
2542                    format!("Missing merkle proof for chunk {}", hex::encode(addr)),
2543                ));
2544            }
2545        }
2546
2547        let store_limiter = self.controller().store.clone();
2548
2549        // Store one chunk to its (freshly re-collected) close group, reusing the
2550        // chunk's merkle proof. Reads the body from the on-disk spill on demand,
2551        // so the whole-file store runs as ONE cap-bounded fan-out with no per-wave
2552        // barrier: a slow straggler (e.g. a chunk whose close-group peers are
2553        // stale relayed addresses that take minutes to revalidate) no longer
2554        // holds back the rest of the file. Only the ≤cap in-flight stores hold a
2555        // body, so peak resident memory is `cap × MAX_CHUNK_SIZE`; the cap is
2556        // clamped to `MERKLE_STORE_MAX_IN_FLIGHT` (below) so it stays within the
2557        // ~256 MiB bound the fixed 64-chunk waves gave even if `adaptive.max.store`
2558        // is configured above 64.
2559        // Shared across every deferred round so a converged routing table yields
2560        // a fresh group. Only a quorum shortfall is recoverable; a missing proof
2561        // or a failed spill read stays fatal. Mirrors `merkle_upload_chunks`.
2562        let store_one = |addr: [u8; 32]| {
2563            let limiter = store_limiter.clone();
2564            let proof_bytes = batch_result.proofs.get(&addr).cloned();
2565            async move {
2566                let started = std::time::Instant::now();
2567                let proof = proof_bytes.ok_or_else(|| {
2568                    Error::Payment(format!(
2569                        "Missing merkle proof for chunk {}",
2570                        hex::encode(addr)
2571                    ))
2572                })?;
2573                let content = spill.read_chunk(&addr)?;
2574                let peers = self.put_target_peers(&addr).await?;
2575                observe_op(
2576                    &limiter,
2577                    || async move { self.chunk_put_to_close_group(content, proof, &peers).await },
2578                    classify_error,
2579                )
2580                .await
2581                .map(|_| started)
2582            }
2583        };
2584
2585        info!(
2586            "Storing {} chunks (merkle) as a single cap-bounded pass — {total_stored}/{total_chunks} stored so far",
2587            to_store.len()
2588        );
2589
2590        // Store the WHOLE file in one cap-bounded fan-out (`max_attempts = 1`, no
2591        // backoff): no wave barrier, so a slow straggler (dead-relay peers) can't
2592        // hold back the rest of the file. The store cap re-reads the limiter per
2593        // slot, so it maxes at 64 → ≤64 bodies resident (bodies read from spill on
2594        // demand by `store_one`), the same peak-memory bound the fixed 64-chunk
2595        // waves gave. Quorum-short chunks are collected and deferred to the
2596        // post-pass concurrent retry rather than parking slots behind a backoff.
2597        // `merkle_store_cap` clamps to `MERKLE_STORE_MAX_IN_FLIGHT` so a high
2598        // configured `adaptive.max.store` can't hold more than the wave-era
2599        // ~256 MB of spilled bodies resident (PR #137 review).
2600        let cap = || merkle_store_cap(store_limiter.current());
2601        let outcome = merkle_store_with_retry(
2602            to_store.clone(),
2603            cap,
2604            1,
2605            std::time::Duration::ZERO,
2606            progress,
2607            total_stored,
2608            total_chunks,
2609            &store_one,
2610        )
2611        .await?;
2612
2613        // Record confirmed stores from the explicit set the store helper reports.
2614        // Using that set (rather than inferring "chunks minus failed") keeps
2615        // `stored_addresses` correct even when a fatal abort leaves some chunks
2616        // neither stored nor reported short of quorum.
2617        stored_addresses.extend(&outcome.stored_addresses);
2618        total_stored = outcome.stored;
2619
2620        // Merge store stats (durations, attempts, per-round histogram).
2621        agg_stats.chunk_attempts_total = agg_stats
2622            .chunk_attempts_total
2623            .saturating_add(outcome.stats.chunk_attempts_total);
2624        agg_stats
2625            .store_durations_ms
2626            .extend(outcome.stats.store_durations_ms);
2627        for (slot, count) in agg_stats
2628            .retries_histogram
2629            .iter_mut()
2630            .zip(outcome.stats.retries_histogram.iter())
2631        {
2632            *slot = slot.saturating_add(*count);
2633        }
2634
2635        if let Some(e) = outcome.fatal {
2636            // A non-quorum store error is fatal (missing proofs were filtered out
2637            // above, so this is a genuine network/store failure). Preserve every
2638            // chunk stored so far and report every not-stored chunk as failed, so
2639            // the `PartialUpload` counts are accurate.
2640            warn!("merkle store aborted: {e}");
2641            let mut known_failed = failed;
2642            known_failed.extend(outcome.failed_addresses);
2643            return Err(partial_upload_after_fatal(
2644                addresses,
2645                stored_addresses,
2646                total_stored,
2647                total_chunks,
2648                known_failed,
2649                PartialUploadSpend {
2650                    storage_cost_atto: batch_result.storage_cost_atto.clone(),
2651                    gas_cost_wei: batch_result.gas_cost_wei,
2652                },
2653                format!("merkle chunk store aborted: {e}"),
2654            ));
2655        }
2656
2657        // Non-fatal: quorum-short chunks are deferred (not failed yet) for the
2658        // post-pass concurrent retry. A deferred chunk joins `stored_addresses`
2659        // only if/when a later round stores it.
2660        let deferred: Vec<([u8; 32], String)> = outcome.failed_addresses;
2661
2662        // The store pass never blocked on backoff; now retry the deferred set in
2663        // concurrent rounds. Bodies are re-read from the spill by `store_one`
2664        // (peak RAM unchanged) and proofs re-attached. Chunks still short after
2665        // the final round become `failed`; a non-quorum error aborts as
2666        // `PartialUpload`.
2667        if !deferred.is_empty() {
2668            info!(
2669                "Deferring {} merkle chunk(s) short of quorum for concurrent retry after the store pass",
2670                deferred.len()
2671            );
2672            let dr = merkle_deferred_retry(
2673                deferred,
2674                &DEFERRED_ROUND_DELAYS_SECS,
2675                |n: usize| merkle_store_cap(store_limiter.current()).min(n.max(1)),
2676                progress,
2677                total_stored,
2678                total_chunks,
2679                &store_one,
2680            )
2681            .await?;
2682
2683            stored_addresses.extend(dr.stored_addresses);
2684            total_stored = dr.stored;
2685
2686            // Merge the deferred pass's stats — its histogram is already mapped
2687            // to the right per-round slots — into the file aggregate.
2688            agg_stats.chunk_attempts_total = agg_stats
2689                .chunk_attempts_total
2690                .saturating_add(dr.stats.chunk_attempts_total);
2691            agg_stats
2692                .store_durations_ms
2693                .extend(dr.stats.store_durations_ms);
2694            for (slot, count) in agg_stats
2695                .retries_histogram
2696                .iter_mut()
2697                .zip(dr.stats.retries_histogram.iter())
2698            {
2699                *slot = slot.saturating_add(*count);
2700            }
2701
2702            if let Some(reason) = dr.fatal {
2703                // A non-quorum store error during a deferred round is fatal, the
2704                // same as in the wave path: preserve everything stored so far and
2705                // report every not-stored chunk as failed.
2706                warn!("merkle deferred retry aborted: {reason}");
2707                let mut known_failed = failed;
2708                known_failed.extend(dr.failed_addresses);
2709                return Err(partial_upload_after_fatal(
2710                    addresses,
2711                    stored_addresses,
2712                    total_stored,
2713                    total_chunks,
2714                    known_failed,
2715                    PartialUploadSpend {
2716                        storage_cost_atto: batch_result.storage_cost_atto.clone(),
2717                        gas_cost_wei: batch_result.gas_cost_wei,
2718                    },
2719                    format!("merkle chunk store aborted: {reason}"),
2720                ));
2721            }
2722            failed.extend(dr.failed_addresses);
2723        }
2724
2725        // A file with any permanently-failed chunk is not fully stored — surface
2726        // it as `PartialUpload`, but only after the store pass and every deferred
2727        // retry round are exhausted (never silently succeed with missing chunks).
2728        if !failed.is_empty() {
2729            let failed_count = failed.len();
2730            let total_attempts = 1 + DEFERRED_ROUND_DELAYS_SECS.len();
2731            warn!(
2732                "merkle upload incomplete: {failed_count}/{total_chunks} chunks short of quorum after retries"
2733            );
2734            return Err(Error::PartialUpload {
2735                stored: stored_addresses,
2736                stored_count: total_stored,
2737                failed,
2738                failed_count,
2739                total_chunks,
2740                spend: Box::new(PartialUploadSpend {
2741                    storage_cost_atto: batch_result.storage_cost_atto.clone(),
2742                    gas_cost_wei: batch_result.gas_cost_wei,
2743                }),
2744                reason: format!(
2745                    "{failed_count} chunk(s) short of quorum after {total_attempts} attempts"
2746                ),
2747            });
2748        }
2749
2750        Ok((
2751            total_stored,
2752            batch_result.storage_cost_atto.clone(),
2753            batch_result.gas_cost_wei,
2754            agg_stats,
2755        ))
2756    }
2757
2758    /// Download and decrypt a file from the network, writing it to disk.
2759    ///
2760    /// Uses `streaming_decrypt` so that only one batch of chunks lives in
2761    /// memory at a time, avoiding OOM on large files. Chunks are fetched
2762    /// concurrently within each batch, then decrypted data is written to
2763    /// disk incrementally.
2764    ///
2765    /// Returns the number of bytes written.
2766    ///
2767    /// # Panics
2768    ///
2769    /// Requires a multi-threaded Tokio runtime (`flavor = "multi_thread"`).
2770    /// Will panic if called from a `current_thread` runtime because
2771    /// `streaming_decrypt` takes a synchronous callback that must bridge
2772    /// back to async via `block_in_place`.
2773    ///
2774    /// # Errors
2775    ///
2776    /// Returns an error if any chunk cannot be retrieved, decryption fails,
2777    /// or the file cannot be written.
2778    pub async fn file_download(&self, data_map: &DataMap, output: &Path) -> Result<u64> {
2779        self.file_download_with_progress(data_map, output, None)
2780            .await
2781    }
2782
2783    /// Download and decrypt a file, trying the requested number of
2784    /// closest peers for every chunk fetch.
2785    ///
2786    /// Returns the number of bytes written.
2787    ///
2788    /// # Errors
2789    ///
2790    /// Returns an error if any chunk cannot be retrieved, decryption fails,
2791    /// or the file cannot be written.
2792    pub async fn file_download_from_closest_peers(
2793        &self,
2794        data_map: &DataMap,
2795        output: &Path,
2796        peer_count: NonZeroUsize,
2797    ) -> Result<u64> {
2798        self.file_download_with_progress_using_peer_count(data_map, output, None, peer_count.get())
2799            .await
2800    }
2801
2802    /// Download and decrypt a file with progress events, trying the
2803    /// requested number of closest peers for every chunk fetch.
2804    ///
2805    /// Same as [`Client::file_download_from_closest_peers`] but sends
2806    /// [`DownloadEvent`]s for UI feedback.
2807    ///
2808    /// # Errors
2809    ///
2810    /// Returns an error if any chunk cannot be retrieved, decryption fails,
2811    /// or the file cannot be written.
2812    pub async fn file_download_with_progress_from_closest_peers(
2813        &self,
2814        data_map: &DataMap,
2815        output: &Path,
2816        progress: Option<mpsc::Sender<DownloadEvent>>,
2817        peer_count: NonZeroUsize,
2818    ) -> Result<u64> {
2819        self.file_download_with_progress_using_peer_count(
2820            data_map,
2821            output,
2822            progress,
2823            peer_count.get(),
2824        )
2825        .await
2826    }
2827
2828    /// Download and decrypt a file with peer-health diagnostics.
2829    ///
2830    /// Each file chunk is fetched by querying every selected closest peer,
2831    /// not by returning after the first successful peer. The returned report
2832    /// records which peers had each chunk and which did not. DataMap
2833    /// resolution still uses the normal early-return fetch path; diagnostics
2834    /// are for file chunks only.
2835    ///
2836    /// # Errors
2837    ///
2838    /// Returns an error if any chunk cannot be retrieved, decryption fails,
2839    /// or the file cannot be written.
2840    pub async fn file_download_with_peer_report_from_closest_peers(
2841        &self,
2842        data_map: &DataMap,
2843        output: &Path,
2844        progress: Option<mpsc::Sender<DownloadEvent>>,
2845        peer_count: NonZeroUsize,
2846    ) -> Result<FileDownloadWithPeerReport> {
2847        let chunk_reports = Arc::new(Mutex::new(Vec::new()));
2848        let bytes_written = self
2849            .file_download_with_progress_using_peer_count_and_reports(
2850                data_map,
2851                output,
2852                progress,
2853                peer_count.get(),
2854                Some(chunk_reports.clone()),
2855            )
2856            .await?;
2857
2858        let chunk_reports = chunk_reports
2859            .lock()
2860            .map_err(|_| Error::Storage("file chunk peer report lock poisoned".to_string()))?
2861            .clone();
2862        let chunk_reports = file_chunk_reports_from_recorded_sweeps(chunk_reports);
2863
2864        Ok(FileDownloadWithPeerReport {
2865            bytes_written,
2866            chunk_reports,
2867        })
2868    }
2869
2870    async fn download_fetch_file_chunk(
2871        &self,
2872        idx: usize,
2873        hash: XorName,
2874        context: FileDownloadFetchContext,
2875        is_deferred_retry: bool,
2876        attempt: usize,
2877    ) -> std::result::Result<DownloadBatchEntry, self_encryption::Error> {
2878        let addr = hash.0;
2879        let addr_hex = hex::encode(addr);
2880
2881        let chunk_content = if let Some(peer_reports) = context.peer_reports {
2882            match self
2883                .chunk_get_from_closest_peer_group(&addr, context.peer_count)
2884                .await
2885            {
2886                Ok(results) => {
2887                    let (content, sweep) = file_chunk_sweep_report_from_peer_results(
2888                        attempt,
2889                        is_deferred_retry,
2890                        &results,
2891                    );
2892                    peer_reports
2893                        .lock()
2894                        .map_err(|_| {
2895                            self_encryption::Error::Generic(
2896                                "file chunk peer report lock poisoned".to_string(),
2897                            )
2898                        })?
2899                        .push(RecordedFileChunkPeerSweep {
2900                            index: idx + 1,
2901                            address: addr,
2902                            sweep,
2903                        });
2904                    content
2905                }
2906                Err(e) => {
2907                    if is_deferred_retry {
2908                        info!(
2909                            "Deferred all-peer retry for {addr_hex} hit transient error: {e}; re-deferring"
2910                        );
2911                    } else {
2912                        info!("First-pass all-peer fetch error for {addr_hex}: {e}; deferring");
2913                    }
2914                    peer_reports
2915                        .lock()
2916                        .map_err(|_| {
2917                            self_encryption::Error::Generic(
2918                                "file chunk peer report lock poisoned".to_string(),
2919                            )
2920                        })?
2921                        .push(RecordedFileChunkPeerSweep {
2922                            index: idx + 1,
2923                            address: addr,
2924                            sweep: file_chunk_sweep_report_from_error(
2925                                attempt,
2926                                is_deferred_retry,
2927                                &e,
2928                            ),
2929                        });
2930                    None
2931                }
2932            }
2933        } else {
2934            match self
2935                .chunk_get_observed_from_closest_peers(&addr, context.peer_count)
2936                .await
2937            {
2938                Ok(Some(chunk)) => Some(chunk.content),
2939                Ok(None) => None,
2940                Err(e) => {
2941                    if is_deferred_retry {
2942                        info!(
2943                            "Deferred retry for {addr_hex} hit transient error: {e}; re-deferring"
2944                        );
2945                    } else {
2946                        info!("First-pass fetch error for {addr_hex}: {e}; deferring");
2947                    }
2948                    None
2949                }
2950            }
2951        };
2952
2953        let Some(content) = chunk_content else {
2954            return Ok((idx, Err(hash)));
2955        };
2956
2957        let fetched = context
2958            .fetched_ref
2959            .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
2960            + 1;
2961        if is_deferred_retry {
2962            info!(
2963                "Downloaded {fetched}/{} (deferred retry)",
2964                context.total_chunks
2965            );
2966        } else {
2967            let total_chunks = context.total_chunks;
2968            info!("Downloaded {fetched}/{total_chunks}");
2969        }
2970        if let Some(ref tx) = context.progress_ref {
2971            let _ = tx.try_send(DownloadEvent::ChunksFetched {
2972                fetched,
2973                total: context.total_chunks,
2974            });
2975        }
2976
2977        Ok((idx, Ok(content)))
2978    }
2979
2980    /// Shared download core: resolve the DataMap, then fetch + streaming-decrypt
2981    /// the file one batch at a time, handing each decrypted plaintext segment
2982    /// (in order) to `on_chunk`. Constant memory — only one decrypt batch is
2983    /// resident at a time. Returns the total plaintext bytes produced.
2984    ///
2985    /// `on_chunk` is async so a sink can apply backpressure (e.g. a bounded
2986    /// channel). Driving the decrypt iterator runs the batched chunk fetch via
2987    /// `block_in_place`, so this requires a multi-threaded Tokio runtime.
2988    ///
2989    /// Every chunk fetch tries `peer_count` closest peers.
2990    ///
2991    /// Progress reporting (via `progress`):
2992    /// 1. Resolves hierarchical DataMaps to the root level first (reports as
2993    ///    `ChunksFetched` with `total: 0` during resolution)
2994    /// 2. Once the root DataMap is known, sends `total_chunks` with accurate count
2995    /// 3. Fetches data chunks with accurate `fetched/total` progress
2996    async fn download_decrypted_chunks<F, Fut>(
2997        &self,
2998        data_map: &DataMap,
2999        progress: Option<mpsc::Sender<DownloadEvent>>,
3000        peer_count: usize,
3001        peer_reports: Option<Arc<Mutex<Vec<RecordedFileChunkPeerSweep>>>>,
3002        mut on_chunk: F,
3003    ) -> Result<u64>
3004    where
3005        F: FnMut(Bytes) -> Fut,
3006        Fut: std::future::Future<Output = Result<()>>,
3007    {
3008        let handle = Handle::current();
3009
3010        // Phase 1: Resolve hierarchical DataMap to root level.
3011        // This fetches child DataMap chunks (typically 3) to discover the real chunk count.
3012        let root_map = if data_map.is_child() {
3013            let dm_chunks = data_map.len();
3014            if let Some(ref tx) = progress {
3015                let _ = tx.try_send(DownloadEvent::ResolvingDataMap {
3016                    total_map_chunks: dm_chunks,
3017                });
3018            }
3019
3020            let resolve_progress = progress.clone();
3021            let resolve_counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
3022
3023            let resolved = tokio::task::block_in_place(|| {
3024                let counter_ref = resolve_counter.clone();
3025                let progress_ref = resolve_progress.clone();
3026                let fetch_limiter = self.controller().fetch.clone();
3027                let fetch = |batch: &[(usize, XorName)]| {
3028                    let batch_owned: Vec<(usize, XorName)> = batch.to_vec();
3029                    let counter = counter_ref.clone();
3030                    let prog = progress_ref.clone();
3031                    let limiter = fetch_limiter.clone();
3032                    handle.block_on(async {
3033                        // Use rebucketed_unordered so the in-flight cap
3034                        // is re-read from the limiter as each slot frees.
3035                        // `buffer_unordered` snapshots the cap once at
3036                        // pipeline build, which means observe_op
3037                        // signals from inside chunk_get cannot reduce
3038                        // concurrency on the current batch — exactly
3039                        // the case where load-shedding is needed.
3040                        let mut results = rebucketed_unordered(
3041                            &limiter,
3042                            batch_owned,
3043                            |(idx, hash): (usize, XorName)| {
3044                                let counter = counter.clone();
3045                                let prog = prog.clone();
3046                                async move {
3047                                    let addr = hash.0;
3048                                    // chunk_get_observed feeds the
3049                                    // adaptive fetch limiter once per
3050                                    // call via chunk_get_outcome
3051                                    // (Ok(None) -> Timeout is the
3052                                    // load-shedding signal for
3053                                    // sustained close-group exhaustion).
3054                                    let chunk = self
3055                                        .chunk_get_observed_from_closest_peers(&addr, peer_count)
3056                                        .await
3057                                        .map_err(|e| {
3058                                            self_encryption::Error::Generic(format!(
3059                                                "DataMap resolution failed: {e}"
3060                                            ))
3061                                        })?
3062                                        .ok_or_else(|| {
3063                                            self_encryption::Error::Generic(format!(
3064                                                "DataMap chunk not found: {}",
3065                                                hex::encode(addr)
3066                                            ))
3067                                        })?;
3068                                    let fetched = counter
3069                                        .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
3070                                        + 1;
3071                                    if let Some(ref tx) = prog {
3072                                        let _ =
3073                                            tx.try_send(DownloadEvent::MapChunkFetched { fetched });
3074                                    }
3075                                    Ok::<_, self_encryption::Error>((idx, chunk.content))
3076                                }
3077                            },
3078                        )
3079                        .await?;
3080                        // CRITICAL: self_encryption::get_root_data_map_parallel
3081                        // pairs the returned Vec POSITIONALLY with the input
3082                        // hashes via .zip() and discards our idx field.
3083                        // rebucketed_unordered preserves first-completion
3084                        // order, so sort by idx to restore input order
3085                        // before returning.
3086                        results.sort_by_key(|(idx, _)| *idx);
3087                        Ok(results)
3088                    })
3089                };
3090                get_root_data_map_parallel(data_map.clone(), &fetch)
3091            })
3092            .map_err(|e| Error::Encryption(format!("DataMap resolution failed: {e}")))?;
3093
3094            info!(
3095                "Resolved hierarchical DataMap: {} data chunks",
3096                resolved.len()
3097            );
3098            resolved
3099        } else {
3100            data_map.clone()
3101        };
3102
3103        // Phase 2: Now we know the real chunk count.
3104        let total_chunks = root_map.len();
3105        if let Some(ref tx) = progress {
3106            let _ = tx.try_send(DownloadEvent::DataMapResolved { total_chunks });
3107        }
3108
3109        // Phase 3: Fetch and decrypt data chunks with accurate progress.
3110        let fetched_counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
3111        let fetched_for_closure = fetched_counter.clone();
3112        let progress_for_closure = progress.clone();
3113        let peer_reports_for_closure = peer_reports.clone();
3114
3115        let fetch_limiter_outer = self.controller().fetch.clone();
3116        let usable_memory = usable_memory_bytes();
3117        let configured_batch_floor = stream_decrypt_batch_size();
3118        let fetch_cap = fetch_limiter_outer.current();
3119        let decrypt_batch_size = adaptive_stream_decrypt_batch_size(
3120            total_chunks,
3121            fetch_cap,
3122            configured_batch_floor,
3123            usable_memory,
3124        );
3125        info!(
3126            total_chunks,
3127            fetch_cap,
3128            configured_batch_floor,
3129            ?usable_memory,
3130            decrypt_batch_size,
3131            "Selected adaptive stream decrypt batch size"
3132        );
3133
3134        let stream = streaming_decrypt_with_batch_size(
3135            &root_map,
3136            |batch: &[(usize, XorName)]| {
3137                let batch_owned: Vec<(usize, XorName)> = batch.to_vec();
3138                let fetch_context = FileDownloadFetchContext {
3139                    total_chunks,
3140                    peer_count,
3141                    fetched_ref: fetched_for_closure.clone(),
3142                    progress_ref: progress_for_closure.clone(),
3143                    peer_reports: peer_reports_for_closure.clone(),
3144                };
3145                let fetch_limiter = fetch_limiter_outer.clone();
3146
3147                tokio::task::block_in_place(|| {
3148                    handle.block_on(async {
3149                        // First pass: try every chunk in the batch. Normal mode
3150                        // uses chunk_get_observed (early-return after a found
3151                        // peer); diagnostic mode asks every selected closest
3152                        // peer and records that sweep before returning bytes.
3153                        // Any missing chunk or transient fetch error is encoded
3154                        // as Err(hash), so one noisy chunk does not abort the
3155                        // whole batch before the deferred retry rounds run.
3156                        let first_fetch_context = fetch_context.clone();
3157                        let raw: Vec<DownloadBatchEntry> = rebucketed_unordered(
3158                            &fetch_limiter,
3159                            batch_owned,
3160                            |(idx, hash): (usize, XorName)| {
3161                                let fetch_context = first_fetch_context.clone();
3162                                async move {
3163                                    self.download_fetch_file_chunk(
3164                                        idx,
3165                                        hash,
3166                                        fetch_context,
3167                                        false,
3168                                        FIRST_DIAGNOSTIC_FETCH_ATTEMPT,
3169                                    )
3170                                    .await
3171                                }
3172                            },
3173                        )
3174                        .await?;
3175
3176                        // Partition: things we already have vs the
3177                        // deferred set we need to retry.
3178                        let mut results: Vec<(usize, bytes::Bytes)> = Vec::new();
3179                        let mut deferred: Vec<(usize, XorName)> = Vec::new();
3180                        for (idx, inner) in raw {
3181                            match inner {
3182                                Ok(bytes) => results.push((idx, bytes)),
3183                                Err(hash) => deferred.push((idx, hash)),
3184                            }
3185                        }
3186
3187                        // Deferred retry pass: retry the deferred chunks
3188                        // in CONCURRENT rounds (reusing the fetch
3189                        // limiter's cap), not serially. The first round
3190                        // fires immediately — most deferrals on a
3191                        // healthy-but-lossy link are peer-side noise
3192                        // that clears in well under a second, and
3193                        // serializing them behind mandatory multi-second
3194                        // sleeps was the single biggest throughput sink
3195                        // on such links (a batch deferring ~20 chunks
3196                        // burned minutes of near-zero throughput even
3197                        // though every chunk succeeded on its first
3198                        // retry). Only chunks that survive a round get a
3199                        // longer back-off before the next, so genuine
3200                        // saturation still gets time to settle.
3201                        if !deferred.is_empty() {
3202                            // Round delays in seconds. Round 0 is
3203                            // immediate; later rounds back off to ride
3204                            // out sustained saturation.
3205                            const DEFERRED_ROUND_DELAYS_SECS: [u64; 3] = [0, 15, 45];
3206                            info!(
3207                                "Deferring {} chunk(s) for concurrent retry after batch settles",
3208                                deferred.len()
3209                            );
3210                            let mut remaining = deferred;
3211                            for (round, &delay_secs) in
3212                                DEFERRED_ROUND_DELAYS_SECS.iter().enumerate()
3213                            {
3214                                if remaining.is_empty() {
3215                                    break;
3216                                }
3217                                if delay_secs > 0 {
3218                                    tokio::time::sleep(std::time::Duration::from_secs(delay_secs))
3219                                        .await;
3220                                }
3221                                info!(
3222                                    "Deferred retry round {}/{}: {} chunk(s)",
3223                                    round + 1,
3224                                    DEFERRED_ROUND_DELAYS_SECS.len(),
3225                                    remaining.len(),
3226                                );
3227                                let round_input = std::mem::take(&mut remaining);
3228                                let retry_fetch_context = fetch_context.clone();
3229                                let round_results: Vec<DownloadBatchEntry> = rebucketed_unordered(
3230                                    &fetch_limiter,
3231                                    round_input,
3232                                    |(idx, hash): (usize, XorName)| {
3233                                        let fetch_context = retry_fetch_context.clone();
3234                                        async move {
3235                                            self.download_fetch_file_chunk(
3236                                                idx,
3237                                                hash,
3238                                                fetch_context,
3239                                                true,
3240                                                round + DEFERRED_RETRY_ATTEMPT_OFFSET,
3241                                            )
3242                                            .await
3243                                        }
3244                                    },
3245                                )
3246                                .await?;
3247                                for (idx, inner) in round_results {
3248                                    match inner {
3249                                        Ok(bytes) => results.push((idx, bytes)),
3250                                        Err(hash) => remaining.push((idx, hash)),
3251                                    }
3252                                }
3253                            }
3254                            if let Some((_, hash)) = remaining.first() {
3255                                return Err(self_encryption::Error::Generic(format!(
3256                                    "Chunk not found after {} deferred retry rounds: {}",
3257                                    DEFERRED_ROUND_DELAYS_SECS.len(),
3258                                    hex::encode(hash.0),
3259                                )));
3260                            }
3261                        }
3262
3263                        // streaming_decrypt itself sort_by_keys before
3264                        // zipping, but the same closure is also passed
3265                        // through get_root_data_map_parallel internally
3266                        // (see self_encryption::stream_decrypt.rs::new), and
3267                        // THAT path zips positionally without sorting. Sort
3268                        // here so both consumers see input order.
3269                        results.sort_by_key(|(idx, _)| *idx);
3270                        Ok(results)
3271                    })
3272                })
3273            },
3274            decrypt_batch_size,
3275        )
3276        .map_err(|e| Error::Encryption(format!("streaming decrypt failed: {e}")))?;
3277
3278        // Drive the iterator (each `next()` runs the batched fetch via
3279        // block_in_place) and hand each decrypted segment to the sink in
3280        // order. Awaiting the sink between items yields back to the runtime so
3281        // a bounded sink can apply backpressure.
3282        let mut bytes_total = 0u64;
3283        for chunk_result in stream {
3284            let chunk: Bytes =
3285                chunk_result.map_err(|e| Error::Encryption(format!("decryption failed: {e}")))?;
3286            bytes_total += chunk.len() as u64;
3287            on_chunk(chunk).await?;
3288        }
3289        Ok(bytes_total)
3290    }
3291
3292    /// Download and decrypt a file to disk, with optional progress events.
3293    ///
3294    /// Same as [`Client::file_download`] but sends [`DownloadEvent`]s for UI
3295    /// feedback. Streams to a temp file (one decrypt batch resident at a time)
3296    /// and renames atomically on success. A `TempDownload` guard removes the
3297    /// staging file on any error path, including a panic.
3298    pub async fn file_download_with_progress(
3299        &self,
3300        data_map: &DataMap,
3301        output: &Path,
3302        progress: Option<mpsc::Sender<DownloadEvent>>,
3303    ) -> Result<u64> {
3304        self.file_download_with_progress_using_peer_count(
3305            data_map,
3306            output,
3307            progress,
3308            self.config().close_group_size,
3309        )
3310        .await
3311    }
3312
3313    /// Download and decrypt a file to disk with progress events, trying
3314    /// `peer_count` closest peers for every chunk fetch.
3315    ///
3316    /// Streams to a temp file (one decrypt batch resident at a time) and
3317    /// renames atomically on success.
3318    async fn file_download_with_progress_using_peer_count(
3319        &self,
3320        data_map: &DataMap,
3321        output: &Path,
3322        progress: Option<mpsc::Sender<DownloadEvent>>,
3323        peer_count: usize,
3324    ) -> Result<u64> {
3325        self.file_download_with_progress_using_peer_count_and_reports(
3326            data_map, output, progress, peer_count, None,
3327        )
3328        .await
3329    }
3330
3331    async fn file_download_with_progress_using_peer_count_and_reports(
3332        &self,
3333        data_map: &DataMap,
3334        output: &Path,
3335        progress: Option<mpsc::Sender<DownloadEvent>>,
3336        peer_count: usize,
3337        peer_reports: Option<Arc<Mutex<Vec<RecordedFileChunkPeerSweep>>>>,
3338    ) -> Result<u64> {
3339        debug!("Downloading file to {}", output.display());
3340
3341        let parent = output.parent().unwrap_or_else(|| Path::new("."));
3342        let unique: u64 = rand::random();
3343        let tmp_path = parent.join(format!(".ant_download_{}_{unique}.tmp", std::process::id()));
3344
3345        // Guard removes the staging file on any early return OR a panic unwind
3346        // out of the `block_in_place` decrypt loop; defused only by a
3347        // successful commit(). Centralizes what used to be three duplicated
3348        // cleanup arms.
3349        let tmp = TempDownload::new(tmp_path);
3350        let mut file = std::fs::File::create(tmp.path())?;
3351
3352        let bytes_written = self
3353            .download_decrypted_chunks(data_map, progress, peer_count, peer_reports, |bytes| {
3354                let r = file.write_all(&bytes).map_err(Error::from);
3355                std::future::ready(r)
3356            })
3357            .await?;
3358        file.flush()?;
3359        drop(file); // close the handle before rename (Windows won't rename an open file)
3360
3361        tmp.commit(output)?;
3362        info!(
3363            "File downloaded: {bytes_written} bytes written to {}",
3364            output.display()
3365        );
3366        Ok(bytes_written)
3367    }
3368
3369    /// Download and decrypt a file, streaming the plaintext to `sink` instead
3370    /// of writing to disk.
3371    ///
3372    /// Constant memory (one decrypt batch resident at a time); the caller
3373    /// receives bytes progressively as each batch decrypts, suitable for
3374    /// forwarding to an HTTP chunked body or a gRPC response stream. The
3375    /// bounded `sink` applies backpressure. If the receiver is dropped (e.g.
3376    /// the client disconnected) the download stops early and returns
3377    /// [`Error::Cancelled`].
3378    ///
3379    /// The channel item type is `Result<Bytes, Error>`, so the caller sets up:
3380    ///
3381    /// ```ignore
3382    /// let (tx, rx) = tokio::sync::mpsc::channel::<Result<Bytes, Error>>(8);
3383    /// ```
3384    ///
3385    /// Typically the caller `tokio::spawn`s this and converts the matching
3386    /// `Receiver` into its response stream. Requires a multi-threaded Tokio
3387    /// runtime (the decrypt iterator uses `block_in_place`).
3388    pub async fn file_download_to_sender(
3389        &self,
3390        data_map: &DataMap,
3391        sink: mpsc::Sender<std::result::Result<Bytes, Error>>,
3392        progress: Option<mpsc::Sender<DownloadEvent>>,
3393    ) -> Result<u64> {
3394        let peer_count = self.config().close_group_size;
3395        self.download_decrypted_chunks(data_map, progress, peer_count, None, |bytes| {
3396            let sink = sink.clone();
3397            async move {
3398                sink.send(Ok(bytes))
3399                    .await
3400                    .map_err(|_| Error::Cancelled("download stream receiver dropped".into()))
3401            }
3402        })
3403        .await
3404    }
3405}
3406
3407#[cfg(test)]
3408#[allow(clippy::unwrap_used)]
3409mod tests {
3410    use super::*;
3411
3412    #[test]
3413    fn merkle_store_cap_clamps_to_memory_bound() {
3414        // Below the ceiling: pass the adaptive cap through unchanged.
3415        assert_eq!(merkle_store_cap(8), 8);
3416        assert_eq!(merkle_store_cap(64), 64);
3417        // A configured `adaptive.max.store` above the ceiling must be clamped so
3418        // the whole-file fan-out can't pin more than ~256 MB of bodies (PR #137).
3419        assert_eq!(merkle_store_cap(512), MERKLE_STORE_MAX_IN_FLIGHT);
3420        assert_eq!(merkle_store_cap(usize::MAX), MERKLE_STORE_MAX_IN_FLIGHT);
3421        // Never zero — always make progress.
3422        assert_eq!(merkle_store_cap(0), 1);
3423    }
3424
3425    #[test]
3426    fn distributed_sample_indices_spreads_across_large_file() {
3427        // cap 5 over 100 chunks: first and last included, evenly spread.
3428        assert_eq!(distributed_sample_indices(100, 5), vec![0, 24, 49, 74, 99]);
3429    }
3430
3431    #[test]
3432    fn distributed_sample_indices_covers_whole_small_file() {
3433        // total <= cap returns every index, preserving the exact
3434        // "whole file sampled" detection in estimate_upload_cost.
3435        assert_eq!(distributed_sample_indices(3, 5), vec![0, 1, 2]);
3436        assert_eq!(distributed_sample_indices(5, 5), vec![0, 1, 2, 3, 4]);
3437    }
3438
3439    #[test]
3440    fn distributed_sample_indices_is_in_range_and_increasing() {
3441        assert!(distributed_sample_indices(0, 5).is_empty());
3442        assert_eq!(distributed_sample_indices(1, 5), vec![0]);
3443        for total in 1..200usize {
3444            let idx = distributed_sample_indices(total, 5);
3445            assert_eq!(*idx.first().unwrap(), 0);
3446            assert_eq!(*idx.last().unwrap(), total - 1);
3447            assert!(idx.iter().all(|&i| i < total));
3448            assert!(idx.windows(2).all(|w| w[0] < w[1]));
3449        }
3450    }
3451
3452    #[test]
3453    fn disk_space_check_passes_for_small_file() {
3454        // A 1 KB file should always pass the disk space check
3455        check_disk_space_for_spill(1024).unwrap();
3456    }
3457
3458    #[test]
3459    fn disk_space_check_fails_for_absurd_size() {
3460        // Requesting space for a 1 exabyte file should fail on any real system
3461        let result = check_disk_space_for_spill(u64::MAX / 2);
3462        assert!(result.is_err());
3463        let err = result.unwrap_err();
3464        assert!(
3465            matches!(err, Error::InsufficientDiskSpace(_)),
3466            "expected InsufficientDiskSpace, got: {err}"
3467        );
3468    }
3469
3470    #[test]
3471    fn adaptive_stream_decrypt_batch_size_tracks_fetch_headroom() {
3472        let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, Some(u64::MAX));
3473
3474        assert_eq!(batch_size, 64 * DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER);
3475    }
3476
3477    #[test]
3478    fn adaptive_stream_decrypt_batch_size_caps_to_total_chunks() {
3479        let batch_size = adaptive_stream_decrypt_batch_size(12, 64, 10, Some(u64::MAX));
3480
3481        assert_eq!(batch_size, 12);
3482    }
3483
3484    #[test]
3485    fn adaptive_stream_decrypt_batch_size_honours_configured_floor() {
3486        let batch_size = adaptive_stream_decrypt_batch_size(1_000, 1, 32, None);
3487
3488        assert_eq!(batch_size, 32);
3489    }
3490
3491    #[test]
3492    fn adaptive_stream_decrypt_batch_size_does_not_expand_without_memory_reading() {
3493        let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, None);
3494
3495        assert_eq!(batch_size, 10);
3496    }
3497
3498    #[test]
3499    fn adaptive_stream_decrypt_batch_size_caps_to_memory_budget() {
3500        let estimated_bytes_per_chunk = (self_encryption::MAX_CHUNK_SIZE as u64)
3501            .saturating_mul(DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER)
3502            .max(1);
3503        let usable_memory = estimated_bytes_per_chunk
3504            .saturating_mul(16)
3505            .saturating_mul(DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR);
3506        let batch_size = adaptive_stream_decrypt_batch_size(1_000, 256, 10, Some(usable_memory));
3507
3508        assert_eq!(batch_size, 16);
3509    }
3510
3511    #[test]
3512    fn adaptive_stream_decrypt_batch_size_keeps_one_chunk_when_memory_is_tight() {
3513        let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, Some(1));
3514
3515        assert_eq!(batch_size, 1);
3516    }
3517
3518    #[test]
3519    fn cached_merkle_covers_only_when_all_addresses_have_proofs() {
3520        let covered = compute_address(&Bytes::from_static(b"covered"));
3521        let extra = compute_address(&Bytes::from_static(b"extra"));
3522        let missing = compute_address(&Bytes::from_static(b"missing"));
3523        let cached = MerkleBatchPaymentResult {
3524            proofs: HashMap::from([(covered, vec![1]), (extra, vec![2])]),
3525            chunk_count: 2,
3526            storage_cost_atto: "0".to_string(),
3527            gas_cost_wei: 0,
3528            merkle_payment_timestamp: 0,
3529        };
3530
3531        assert!(cached_merkle_covers_addresses(&cached, &[covered]));
3532        assert!(cached_merkle_covers_addresses(&cached, &[covered, extra]));
3533        assert!(!cached_merkle_covers_addresses(
3534            &cached,
3535            &[covered, missing]
3536        ));
3537    }
3538
3539    /// A partial merkle payment leaves some addresses without a proof. Those
3540    /// must be split out so `upload_merkle_from_spill` reports them as failed
3541    /// (`PartialUpload`) instead of aborting the whole file — preserving the
3542    /// addresses' original order in each group.
3543    #[test]
3544    fn partition_addresses_by_proof_splits_paid_and_unpaid() {
3545        let paid_a = [1u8; 32];
3546        let unpaid_b = [2u8; 32];
3547        let paid_c = [3u8; 32];
3548        let unpaid_d = [4u8; 32];
3549        let proofs: HashMap<[u8; 32], Vec<u8>> =
3550            HashMap::from([(paid_a, vec![0xaa]), (paid_c, vec![0xcc])]);
3551
3552        let (to_store, missing) =
3553            partition_addresses_by_proof(&[paid_a, unpaid_b, paid_c, unpaid_d], &proofs);
3554
3555        assert_eq!(to_store, vec![paid_a, paid_c]);
3556        assert_eq!(missing, vec![unpaid_b, unpaid_d]);
3557    }
3558
3559    /// A wave that returns `Ok` contributes its stored chunks, parsed cost, and
3560    /// stats; nothing is recorded as failed.
3561    #[test]
3562    fn fold_single_wave_keeps_ok_wave() {
3563        let stored = vec![[1u8; 32], [2u8; 32]];
3564        let stats = WaveAggregateStats {
3565            chunk_attempts_total: 7,
3566            ..Default::default()
3567        };
3568
3569        let outcome = fold_single_wave(Ok((stored.clone(), "100".to_string(), 9, stats))).unwrap();
3570
3571        assert_eq!(outcome.stored, stored);
3572        assert!(outcome.failed.is_empty());
3573        assert_eq!(outcome.storage_atto.to_string(), "100");
3574        assert_eq!(outcome.gas_wei, 9);
3575        assert_eq!(outcome.stats.chunk_attempts_total, 7);
3576    }
3577
3578    /// The core V2-461 semantic: a wave short of quorum (`PartialUpload`) is
3579    /// recoverable — its stored chunks, failed chunks, and on-chain spend are
3580    /// folded so the caller can continue to the next wave rather than aborting
3581    /// the whole file.
3582    #[test]
3583    fn fold_single_wave_folds_partial_upload() {
3584        let stored = vec![[3u8; 32]];
3585        let failed = vec![([4u8; 32], "short of quorum".to_string())];
3586        let err = Error::PartialUpload {
3587            stored: stored.clone(),
3588            stored_count: 1,
3589            failed: failed.clone(),
3590            failed_count: 1,
3591            total_chunks: 2,
3592            spend: Box::new(PartialUploadSpend {
3593                storage_cost_atto: "250".to_string(),
3594                gas_cost_wei: 11,
3595            }),
3596            reason: "wave store failed after retries".to_string(),
3597        };
3598
3599        let outcome = fold_single_wave(Err(err)).unwrap();
3600
3601        assert_eq!(outcome.stored, stored);
3602        assert_eq!(outcome.failed, failed);
3603        assert_eq!(outcome.storage_atto.to_string(), "250");
3604        assert_eq!(outcome.gas_wei, 11);
3605        // `PartialUpload` carries no stats, so the failed wave contributes none.
3606        assert_eq!(outcome.stats.chunk_attempts_total, 0);
3607    }
3608
3609    /// A non-`PartialUpload` error (wallet/payment-infrastructure failure) is
3610    /// fatal and must abort the file, not be folded into the failed set.
3611    #[test]
3612    fn fold_single_wave_propagates_fatal_error() {
3613        let result = fold_single_wave(Err(Error::Payment("wallet unavailable".to_string())));
3614
3615        assert!(
3616            matches!(result, Err(Error::Payment(_))),
3617            "fatal payment error must propagate, got: {result:?}"
3618        );
3619    }
3620
3621    #[test]
3622    fn partition_addresses_by_proof_handles_all_or_nothing() {
3623        let a = [5u8; 32];
3624        let b = [6u8; 32];
3625
3626        // No proofs at all → every address is missing.
3627        let empty: HashMap<[u8; 32], Vec<u8>> = HashMap::new();
3628        let (to_store, missing) = partition_addresses_by_proof(&[a, b], &empty);
3629        assert!(to_store.is_empty());
3630        assert_eq!(missing, vec![a, b]);
3631
3632        // All proofs present → nothing missing.
3633        let full: HashMap<[u8; 32], Vec<u8>> = HashMap::from([(a, vec![1]), (b, vec![2])]);
3634        let (to_store, missing) = partition_addresses_by_proof(&[a, b], &full);
3635        assert_eq!(to_store, vec![a, b]);
3636        assert!(missing.is_empty());
3637    }
3638
3639    #[test]
3640    fn chunk_spill_round_trip() {
3641        let mut spill = ChunkSpill::new().unwrap();
3642        let data1 = vec![0xAA; 1024];
3643        let data2 = vec![0xBB; 2048];
3644
3645        spill.push(&data1).unwrap();
3646        spill.push(&data2).unwrap();
3647
3648        assert_eq!(spill.len(), 2);
3649        assert_eq!(spill.total_bytes(), 1024 + 2048);
3650        let chunk_entries = spill.chunk_entries().unwrap();
3651        let entry_total: u64 = chunk_entries.iter().map(|(_, size)| *size).sum();
3652        assert_eq!(entry_total, 1024 + 2048);
3653
3654        // Read back and verify
3655        let chunk1 = spill.read_chunk(spill.addresses.first().unwrap()).unwrap();
3656        assert_eq!(&chunk1[..], &data1[..]);
3657
3658        let chunk2 = spill.read_chunk(spill.addresses.get(1).unwrap()).unwrap();
3659        assert_eq!(&chunk2[..], &data2[..]);
3660
3661        // Verify waves with 1-chunk wave size
3662        let waves: Vec<_> = spill.addresses.chunks(1).collect();
3663        assert_eq!(waves.len(), 2);
3664    }
3665
3666    #[test]
3667    fn chunk_spill_cleanup_on_drop() {
3668        let dir;
3669        {
3670            let spill = ChunkSpill::new().unwrap();
3671            dir = spill.dir.clone();
3672            assert!(dir.exists());
3673        }
3674        // After drop, the directory should be cleaned up
3675        assert!(!dir.exists(), "spill dir should be removed on drop");
3676    }
3677
3678    #[test]
3679    fn chunk_spill_deduplicates_identical_content() {
3680        let mut spill = ChunkSpill::new().unwrap();
3681        let data = vec![0xCC; 512];
3682
3683        spill.push(&data).unwrap();
3684        spill.push(&data).unwrap(); // same content, should be skipped
3685        spill.push(&data).unwrap(); // again
3686
3687        assert_eq!(spill.len(), 1, "duplicate chunks should be deduplicated");
3688        assert_eq!(
3689            spill.total_bytes(),
3690            512,
3691            "total_bytes should count unique only"
3692        );
3693
3694        // Different content should still be added
3695        let data2 = vec![0xDD; 256];
3696        spill.push(&data2).unwrap();
3697        assert_eq!(spill.len(), 2);
3698        assert_eq!(spill.total_bytes(), 512 + 256);
3699    }
3700}
3701
3702/// Compile-time assertions that Client file method futures are Send.
3703#[cfg(test)]
3704mod send_assertions {
3705    use super::*;
3706
3707    fn _assert_send<T: Send>(_: &T) {}
3708
3709    #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
3710    async fn _file_upload_is_send(client: &Client) {
3711        let fut = client.file_upload(Path::new("/dev/null"));
3712        _assert_send(&fut);
3713    }
3714
3715    #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
3716    async fn _file_upload_with_mode_is_send(client: &Client) {
3717        let fut = client.file_upload_with_mode(Path::new("/dev/null"), PaymentMode::Auto);
3718        _assert_send(&fut);
3719    }
3720
3721    #[allow(
3722        dead_code,
3723        unreachable_code,
3724        unused_variables,
3725        clippy::diverging_sub_expression
3726    )]
3727    async fn _file_download_is_send(client: &Client) {
3728        let dm: DataMap = todo!();
3729        let fut = client.file_download(&dm, Path::new("/dev/null"));
3730        _assert_send(&fut);
3731    }
3732}