Skip to main content

ant_core/data/client/
file.rs

1//! File operations using streaming self-encryption.
2//!
3//! Upload files directly from disk without loading them entirely into memory.
4//! Uses `stream_encrypt` to process files in 8KB chunks, encrypting and
5//! uploading each piece as it's produced.
6//!
7//! Encrypted chunks are spilled to a temporary directory during encryption
8//! so that peak memory usage is bounded to one wave (~256 MB for 64 × 4 MB
9//! chunks) regardless of file size.
10//!
11//! For in-memory data uploads, see the `data` module.
12
13use crate::data::client::adaptive::{observe_op, rebucketed_unordered};
14use crate::data::client::batch::{
15    finalize_batch_payment, PaymentIntent, PreparedChunk, WaveAggregateStats,
16};
17use crate::data::client::classify_error;
18use crate::data::client::merkle::{
19    chunk_contents_for_upload_addresses, finalize_merkle_batch, merkle_deferred_retry,
20    merkle_store_with_retry, should_use_merkle, MerkleBatchPaymentResult, PaymentMode,
21    PreparedMerkleBatch, DEFERRED_ROUND_DELAYS_SECS,
22};
23use crate::data::client::Client;
24use crate::data::error::{Error, PartialUploadSpend, Result};
25use ant_protocol::evm::{Amount, PaymentQuote, QuoteHash, TxHash, MAX_LEAVES};
26use ant_protocol::transport::{MultiAddr, PeerId};
27use ant_protocol::{compute_address, DATA_TYPE_CHUNK};
28use bytes::Bytes;
29use fs2::FileExt;
30use futures::stream::{self, StreamExt};
31use self_encryption::{
32    get_root_data_map_parallel, stream_decrypt_batch_size, stream_encrypt,
33    streaming_decrypt_with_batch_size, DataMap,
34};
35use std::collections::{HashMap, HashSet};
36use std::io::Write;
37use std::num::NonZeroUsize;
38use std::path::{Path, PathBuf};
39use std::sync::{Arc, Mutex};
40use tokio::runtime::Handle;
41use tokio::sync::mpsc;
42use tracing::{debug, info, warn};
43use xor_name::XorName;
44
45/// Progress events emitted during file upload for UI feedback.
46#[derive(Debug, Clone)]
47pub enum UploadEvent {
48    /// A chunk has been encrypted and spilled to disk.
49    Encrypting { chunks_done: usize },
50    /// File encryption complete.
51    Encrypted { total_chunks: usize },
52    /// Starting quote collection for a wave.
53    QuotingChunks {
54        wave: usize,
55        total_waves: usize,
56        chunks_in_wave: usize,
57    },
58    /// A chunk has been quoted (peer discovery + price received).
59    /// This is the slow phase — each quote involves network round-trips.
60    ChunkQuoted { quoted: usize, total: usize },
61    /// A chunk has been stored on the network.
62    ChunkStored { stored: usize, total: usize },
63    /// A wave has completed.
64    WaveComplete {
65        wave: usize,
66        total_waves: usize,
67        stored_so_far: usize,
68        total: usize,
69    },
70}
71
72/// Progress events emitted during file download for UI feedback.
73#[derive(Debug, Clone)]
74pub enum DownloadEvent {
75    /// Resolving hierarchical DataMap to discover real chunk count.
76    ResolvingDataMap { total_map_chunks: usize },
77    /// A DataMap chunk has been fetched during resolution.
78    MapChunkFetched { fetched: usize },
79    /// DataMap resolved — total data chunk count now known.
80    DataMapResolved { total_chunks: usize },
81    /// Data chunks are being fetched from the network.
82    ChunksFetched { fetched: usize, total: usize },
83}
84
85/// One entry in the per-chunk quote list returned by
86/// [`Client::get_store_quotes`]: the responding peer, its addresses, the
87/// signed quote it returned, and the payment amount it is demanding.
88type QuoteEntry = (PeerId, Vec<MultiAddr>, PaymentQuote, Amount);
89
90/// Number of chunks per upload wave (matches batch.rs PAYMENT_WAVE_SIZE).
91const UPLOAD_WAVE_SIZE: usize = 64;
92
93/// Stream decrypt batches should be larger than fetch fan-out so
94/// the rolling fetch scheduler can keep launching new chunk GETs as earlier
95/// ones complete, instead of stopping at each self-encryption batch boundary.
96const DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER: usize = 4;
97
98/// Use at most this fraction of currently usable RAM for one decrypt batch.
99const DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR: u64 = 4;
100
101/// A decrypt batch briefly holds encrypted chunk bytes, decrypted chunk bytes,
102/// and Vec/Bytes overhead. Use a conservative multiplier rather than assuming
103/// payload bytes alone.
104const DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER: u64 = 3;
105
106/// Maximum number of distinct chunk addresses to sample when probing for a
107/// representative quote in [`Client::estimate_upload_cost`].
108///
109/// Bounded small so we never spend more than a couple of round-trips on the
110/// `AlreadyStored` retry path, which only matters when many leading chunks
111/// of a file already live on the network.
112const ESTIMATE_SAMPLE_CAP: usize = 5;
113
114/// Pick up to `cap` chunk indices spread evenly across `[0, total)`, always
115/// including the first and last chunk.
116///
117/// Sampling the *first* N chunks biases the probe: a file sharing a leading
118/// prefix with a prior upload (compressed archives, similar headers) reports
119/// those chunks as `AlreadyStored` even when the tail is new, so a positional
120/// sample looks in the worst possible place. Spreading the sample means a
121/// single new chunk anywhere in the file yields a real price.
122///
123/// Returns `[0]` for a single chunk and every index when `total <= cap`, so
124/// [`Client::estimate_upload_cost`] can still detect the "whole file sampled"
125/// case. Indices are strictly increasing.
126fn distributed_sample_indices(total: usize, cap: usize) -> Vec<usize> {
127    if total == 0 {
128        return Vec::new();
129    }
130    let sample_limit = total.min(cap);
131    if sample_limit <= 1 {
132        return vec![0];
133    }
134    let mut indices: Vec<usize> = (0..sample_limit)
135        .map(|i| i * (total - 1) / (sample_limit - 1))
136        .collect();
137    indices.dedup(); // defensive: already strictly increasing for cap >= 2
138    indices
139}
140
141/// Gas used by one `pay_for_quotes` transaction that packs up to
142/// `UPLOAD_WAVE_SIZE` (quote_hash, rewards_address, amount) entries.
143///
144/// `batch_pay` in `batch.rs` flattens every chunk's close-group quotes into a
145/// single EVM call, so the dominant cost is the SSTOREs for each entry plus
146/// the base tx overhead. On Arbitrum that is roughly
147/// `21_000 + 64 × (20_000 + small)` ≈ 1.3M; we round up to 1.5M as a
148/// conservative per-wave upper bound.
149const GAS_PER_WAVE_TX: u128 = 1_500_000;
150
151/// Gas used by one merkle batch payment transaction.
152///
153/// One on-chain tx per merkle sub-batch, but each tx verifies a merkle tree
154/// and posts a pool commitment, so budget higher than a plain transfer.
155const GAS_PER_MERKLE_TX: u128 = 500_000;
156
157/// Advisory gas price (wei/gas) used to turn the gas estimate into an ETH
158/// figure when no live gas oracle is consulted.
159///
160/// Arbitrum One typically settles around 0.1 gwei on quiet blocks; we use
161/// that as the default so the CLI prints a sensible order-of-magnitude
162/// number. Users should treat the reported gas cost as an estimate, not a
163/// commitment — real gas is bid at submission time.
164const ARBITRUM_GAS_PRICE_WEI: u128 = 100_000_000;
165
166/// Extra headroom percentage for disk space check.
167///
168/// Encrypted chunks are slightly larger than the source data due to padding
169/// and self-encryption overhead. We require file_size + 10% free space in
170/// the temp directory to account for this.
171const DISK_SPACE_HEADROOM_PERCENT: u64 = 10;
172
173/// Temporary on-disk buffer for encrypted chunks.
174///
175/// During file encryption, chunks are written to a temp directory so that
176/// only their 32-byte addresses stay in memory. At upload time chunks are
177/// read back one wave at a time, keeping peak RAM at ~`UPLOAD_WAVE_SIZE × 4 MB`.
178/// Grace period (in seconds) before a spill dir is eligible for stale cleanup.
179///
180/// This is a small TOCTOU guard covering the sub-millisecond window inside
181/// [`ChunkSpill::new`] between `create_dir` and `try_lock_exclusive`. Once a
182/// dir is older than this and its lockfile is releasable, the owning process
183/// is gone and the dir is safe to reap — regardless of how old it is.
184///
185/// The previous policy waited 24 h before reaping any orphan, which meant
186/// that any non-graceful exit (SIGKILL, kernel OOM, panic abort) leaked its
187/// spill dir until the next day's upload — and on a host being restart-looped
188/// by systemd, orphans could fill the disk well within that window.
189const SPILL_STALE_GRACE_SECS: u64 = 30;
190
191/// Prefix for spill directory names to distinguish from user files.
192const SPILL_DIR_PREFIX: &str = "spill_";
193
194/// Lockfile name inside each spill dir to signal active use.
195const SPILL_LOCK_NAME: &str = ".lock";
196
197struct ChunkSpill {
198    /// Directory holding spilled chunk files (named by hex address).
199    dir: PathBuf,
200    /// Lockfile held for the lifetime of this spill (prevents stale cleanup).
201    _lock: std::fs::File,
202    /// Deduplicated list of chunk addresses.
203    addresses: Vec<[u8; 32]>,
204    /// Tracks seen addresses for deduplication.
205    seen: HashSet<[u8; 32]>,
206    /// Byte size per spilled chunk address.
207    sizes: HashMap<[u8; 32], u64>,
208    /// Running total of unique chunk byte sizes (for average-size calculation).
209    total_bytes: u64,
210}
211
212impl ChunkSpill {
213    /// Return the parent directory for all spill dirs: `<data_dir>/spill/`.
214    fn spill_root() -> Result<PathBuf> {
215        use crate::config;
216        let root = config::data_dir()
217            .map_err(|e| Error::Config(format!("cannot determine data dir for spill: {e}")))?
218            .join("spill");
219        Ok(root)
220    }
221
222    /// Create a new spill directory under `<data_dir>/spill/`.
223    ///
224    /// Directory name is `spill_<timestamp>_<random>` so orphans can be
225    /// identified by prefix and cleaned up by age. A lockfile inside the
226    /// dir prevents concurrent cleanup from deleting an active spill.
227    fn new() -> Result<Self> {
228        let root = Self::spill_root()?;
229        std::fs::create_dir_all(&root)?;
230
231        // Clean up stale spill dirs from previous crashed runs.
232        Self::cleanup_stale(&root);
233
234        let now = std::time::SystemTime::now()
235            .duration_since(std::time::UNIX_EPOCH)
236            .unwrap_or_default()
237            .as_secs();
238        let unique: u64 = rand::random();
239        let dir = root.join(format!("{SPILL_DIR_PREFIX}{now}_{unique}"));
240        std::fs::create_dir(&dir)?;
241
242        // Create and hold a lockfile for the lifetime of this spill.
243        // cleanup_stale() will skip dirs with locked files.
244        let lock_path = dir.join(SPILL_LOCK_NAME);
245        let lock_file = std::fs::File::create(&lock_path).map_err(|e| {
246            Error::Io(std::io::Error::new(
247                e.kind(),
248                format!("failed to create spill lockfile: {e}"),
249            ))
250        })?;
251        lock_file.try_lock_exclusive().map_err(|e| {
252            Error::Io(std::io::Error::new(
253                e.kind(),
254                format!("failed to lock spill lockfile: {e}"),
255            ))
256        })?;
257
258        Ok(Self {
259            dir,
260            _lock: lock_file,
261            addresses: Vec::new(),
262            seen: HashSet::new(),
263            sizes: HashMap::new(),
264            total_bytes: 0,
265        })
266    }
267
268    /// Clean up stale spill directories. Best-effort, errors are logged.
269    ///
270    /// A spill dir is reaped when:
271    /// 1. Its name starts with `SPILL_DIR_PREFIX` (ignores unrelated files)
272    /// 2. It is an actual directory, not a symlink (prevents symlink attacks)
273    /// 3. Its timestamp is older than `SPILL_STALE_GRACE_SECS` (TOCTOU guard)
274    /// 4. Its lockfile is releasable — i.e. no live process holds it
275    ///
276    /// The lockfile is the primary correctness gate: a releasable lock means
277    /// the owning `ChunkSpill` has been dropped or the process is gone, so
278    /// the dir is fair game. The grace period covers only the brief window
279    /// inside [`Self::new`] between `create_dir` and `try_lock_exclusive`.
280    ///
281    /// Safe to call concurrently from multiple processes.
282    fn cleanup_stale(root: &Path) {
283        let now = std::time::SystemTime::now()
284            .duration_since(std::time::UNIX_EPOCH)
285            .unwrap_or_default()
286            .as_secs();
287
288        if now == 0 {
289            // Clock is broken (before Unix epoch). Skip cleanup to avoid
290            // misidentifying dirs as stale.
291            warn!("System clock before Unix epoch, skipping spill cleanup");
292            return;
293        }
294
295        let entries = match std::fs::read_dir(root) {
296            Ok(entries) => entries,
297            Err(_) => return,
298        };
299
300        for entry in entries.flatten() {
301            let name = entry.file_name();
302            let name_str = name.to_string_lossy();
303
304            // Only process dirs with our prefix.
305            let suffix = match name_str.strip_prefix(SPILL_DIR_PREFIX) {
306                Some(s) => s,
307                None => continue,
308            };
309
310            // Parse timestamp: "spill_<timestamp>_<random>"
311            let timestamp: u64 = match suffix.split('_').next().and_then(|s| s.parse().ok()) {
312                Some(ts) => ts,
313                None => continue,
314            };
315
316            if now.saturating_sub(timestamp) < SPILL_STALE_GRACE_SECS {
317                continue;
318            }
319
320            // Safety: only delete actual directories, not symlinks.
321            let file_type = match entry.file_type() {
322                Ok(ft) => ft,
323                Err(_) => continue,
324            };
325            if !file_type.is_dir() {
326                continue;
327            }
328
329            let path = entry.path();
330
331            // Check lockfile: if locked, the dir is in active use -- skip it.
332            let lock_path = path.join(SPILL_LOCK_NAME);
333            if let Ok(lock_file) = std::fs::File::open(&lock_path) {
334                use fs2::FileExt;
335                if lock_file.try_lock_exclusive().is_err() {
336                    // Lock held by another process -- dir is active.
337                    debug!("Skipping active spill dir: {}", path.display());
338                    continue;
339                }
340                // We acquired the lock, so no one else holds it.
341                // Drop it before deleting.
342                drop(lock_file);
343            }
344
345            info!("Cleaning up stale spill dir: {}", path.display());
346            if let Err(e) = std::fs::remove_dir_all(&path) {
347                warn!("Failed to clean up stale spill dir {}: {e}", path.display());
348            }
349        }
350    }
351
352    /// Run stale spill cleanup. Call at client startup or periodically.
353    #[allow(dead_code)]
354    pub(crate) fn run_cleanup() {
355        if let Ok(root) = Self::spill_root() {
356            Self::cleanup_stale(&root);
357        }
358    }
359
360    /// Write one encrypted chunk to disk and record its address.
361    ///
362    /// Deduplicates by content address: if the same chunk was already
363    /// spilled, the write and accounting are skipped. This prevents
364    /// double-uploads and inflated quoting metrics.
365    fn push(&mut self, content: &[u8]) -> Result<()> {
366        let address = compute_address(content);
367        if !self.seen.insert(address) {
368            return Ok(());
369        }
370        let path = self.dir.join(hex::encode(address));
371        std::fs::write(&path, content)?;
372        let content_len = content.len() as u64;
373        self.sizes.insert(address, content_len);
374        self.total_bytes += content_len;
375        self.addresses.push(address);
376        Ok(())
377    }
378
379    /// Number of chunks stored.
380    fn len(&self) -> usize {
381        self.addresses.len()
382    }
383
384    /// Total bytes of all spilled chunks.
385    fn total_bytes(&self) -> u64 {
386        self.total_bytes
387    }
388
389    /// Address and byte-size pairs for all spilled chunks.
390    fn chunk_entries(&self) -> Result<Vec<([u8; 32], u64)>> {
391        self.addresses
392            .iter()
393            .map(|address| {
394                self.sizes
395                    .get(address)
396                    .copied()
397                    .map(|size| (*address, size))
398                    .ok_or_else(|| {
399                        Error::Storage(format!(
400                            "missing size for spilled chunk {}",
401                            hex::encode(address)
402                        ))
403                    })
404            })
405            .collect()
406    }
407
408    /// Read a single chunk back from disk by address.
409    fn read_chunk(&self, address: &[u8; 32]) -> Result<Bytes> {
410        let path = self.dir.join(hex::encode(address));
411        let data = std::fs::read(&path).map_err(|e| {
412            Error::Io(std::io::Error::new(
413                e.kind(),
414                format!("reading spilled chunk {}: {e}", hex::encode(address)),
415            ))
416        })?;
417        Ok(Bytes::from(data))
418    }
419
420    /// Read a wave of chunks from disk.
421    fn read_wave(&self, wave_addrs: &[[u8; 32]]) -> Result<Vec<(Bytes, [u8; 32])>> {
422        let mut out = Vec::with_capacity(wave_addrs.len());
423        for addr in wave_addrs {
424            let content = self.read_chunk(addr)?;
425            out.push((content, *addr));
426        }
427        Ok(out)
428    }
429
430    /// Clean up the spill directory.
431    fn cleanup(&self) {
432        if let Err(e) = std::fs::remove_dir_all(&self.dir) {
433            warn!(
434                "Failed to clean up chunk spill dir {}: {e}",
435                self.dir.display()
436            );
437        }
438    }
439}
440
441impl Drop for ChunkSpill {
442    fn drop(&mut self) {
443        self.cleanup();
444    }
445}
446
447fn cached_merkle_covers_addresses(
448    cached: &MerkleBatchPaymentResult,
449    addresses: &[[u8; 32]],
450) -> bool {
451    addresses
452        .iter()
453        .all(|addr| cached.proofs.contains_key(addr))
454}
455
456/// Split `addresses` into `(to_store, missing_proof)`: those that have a merkle
457/// proof in `proofs`, and those that don't.
458///
459/// A partial [`MerkleBatchPaymentResult`] (from a `pay_for_merkle_multi_batch`
460/// where a later sub-batch's payment failed) carries proofs only for the
461/// already-paid sub-batches, so unpaid chunks reach the upload path with no
462/// proof. `upload_waves_merkle` reports those as failed via
463/// [`Error::PartialUpload`] rather than aborting the whole file. Order within
464/// each group follows `addresses`.
465fn partition_addresses_by_proof(
466    addresses: &[[u8; 32]],
467    proofs: &HashMap<[u8; 32], Vec<u8>>,
468) -> (Vec<[u8; 32]>, Vec<[u8; 32]>) {
469    addresses
470        .iter()
471        .copied()
472        .partition(|addr| proofs.contains_key(addr))
473}
474
475/// Build a `PartialUpload` after a fatal merkle store error, with accurate
476/// counts.
477///
478/// A fatal abort can leave chunks in three states: confirmed stored (in
479/// `stored_addresses`), known-failed (in `known_failed` — missing proofs, the
480/// quorum shortfalls and the fatal chunk seen so far), and "in flight when the
481/// abort hit" (neither). Rather than trust the helpers to enumerate the last
482/// group, this derives the failed set authoritatively as *every* `addresses`
483/// entry not in `stored_addresses`, preferring a known per-chunk message and
484/// falling back to the fatal `reason`. That guarantees
485/// `stored_count + failed_count` accounts for the whole file — fixing the
486/// under-reporting where a fatal wave could surface `failed_count = 0` and omit
487/// same-pass successes.
488fn partial_upload_after_fatal(
489    addresses: &[[u8; 32]],
490    stored_addresses: Vec<[u8; 32]>,
491    stored_count: usize,
492    total_chunks: usize,
493    known_failed: Vec<([u8; 32], String)>,
494    spend: PartialUploadSpend,
495    reason: String,
496) -> Error {
497    let stored_set: HashSet<[u8; 32]> = stored_addresses.iter().copied().collect();
498    let mut failed_map: HashMap<[u8; 32], String> = HashMap::new();
499    for (addr, msg) in known_failed {
500        if !stored_set.contains(&addr) {
501            failed_map.entry(addr).or_insert(msg);
502        }
503    }
504    for addr in addresses {
505        if !stored_set.contains(addr) {
506            failed_map.entry(*addr).or_insert_with(|| reason.clone());
507        }
508    }
509    let failed: Vec<([u8; 32], String)> = failed_map.into_iter().collect();
510    let failed_count = failed.len();
511    Error::PartialUpload {
512        stored: stored_addresses,
513        stored_count,
514        failed,
515        failed_count,
516        total_chunks,
517        spend: Box::new(spend),
518        reason,
519    }
520}
521
522/// One wave's contribution to a single-node upload, distilled from its
523/// `batch_upload_chunks_with_events` result.
524#[derive(Debug)]
525struct SingleWaveOutcome {
526    /// Addresses confirmed stored in this wave.
527    stored: Vec<[u8; 32]>,
528    /// Chunks that failed after retries in this wave.
529    failed: Vec<([u8; 32], String)>,
530    /// Storage cost paid on-chain for this wave, in atto-tokens.
531    storage_atto: Amount,
532    /// Gas paid on-chain for this wave, in wei.
533    gas_wei: u128,
534    /// Per-wave store/retry statistics. Empty for a quorum-short wave, whose
535    /// `PartialUpload` carries no stats.
536    stats: WaveAggregateStats,
537}
538
539/// Fold one wave's batch-upload result for the single-node path.
540///
541/// A `PartialUpload` (chunks short of quorum after retries) is **recoverable**:
542/// its stored/failed chunks and on-chain spend are returned so the caller
543/// records them and continues to the next wave, making the file make maximum
544/// progress exactly like `upload_waves_merkle`. Every other error is **fatal**
545/// (wallet/payment-infrastructure failures, missing proofs, spill reads) and is
546/// returned via `Err` to abort the file. Because `UPLOAD_WAVE_SIZE ==
547/// PAYMENT_WAVE_SIZE`, each batch call is exactly one payment wave, so folding a
548/// `PartialUpload` leaves nothing un-attempted within the wave.
549fn fold_single_wave(
550    result: Result<(Vec<[u8; 32]>, String, u128, WaveAggregateStats)>,
551) -> Result<SingleWaveOutcome> {
552    match result {
553        Ok((stored, storage, gas, stats)) => Ok(SingleWaveOutcome {
554            stored,
555            failed: Vec::new(),
556            storage_atto: storage.parse().unwrap_or(Amount::ZERO),
557            gas_wei: gas,
558            stats,
559        }),
560        Err(Error::PartialUpload {
561            stored,
562            failed,
563            spend,
564            ..
565        }) => Ok(SingleWaveOutcome {
566            stored,
567            failed,
568            storage_atto: spend.storage_cost_atto.parse().unwrap_or(Amount::ZERO),
569            gas_wei: spend.gas_cost_wei,
570            stats: WaveAggregateStats::default(),
571        }),
572        Err(e) => Err(e),
573    }
574}
575
576/// Check that the spill directory has enough free space for the spilled chunks.
577///
578/// `file_size` is the source file's byte count. We require
579/// `file_size + 10%` free space to account for self-encryption overhead.
580fn check_disk_space_for_spill(file_size: u64) -> Result<()> {
581    let spill_root = ChunkSpill::spill_root()?;
582
583    // Ensure the root exists so fs2 can query it.
584    std::fs::create_dir_all(&spill_root)?;
585
586    let available = fs2::available_space(&spill_root).map_err(|e| {
587        Error::Io(std::io::Error::new(
588            e.kind(),
589            format!(
590                "failed to query disk space on {}: {e}",
591                spill_root.display()
592            ),
593        ))
594    })?;
595
596    // Use integer arithmetic to avoid f64 precision loss on large file sizes.
597    let headroom = file_size / DISK_SPACE_HEADROOM_PERCENT;
598    let required = file_size.saturating_add(headroom);
599
600    if available < required {
601        let avail_mb = available / (1024 * 1024);
602        let req_mb = required / (1024 * 1024);
603        return Err(Error::InsufficientDiskSpace(format!(
604            "need ~{req_mb} MB in spill dir ({}) but only {avail_mb} MB available",
605            spill_root.display()
606        )));
607    }
608
609    debug!(
610        "Disk space check passed: {available} bytes available, {required} bytes required (spill: {})",
611        spill_root.display()
612    );
613    Ok(())
614}
615
616fn usable_memory_bytes() -> Option<u64> {
617    let mut system = sysinfo::System::new();
618    system.refresh_memory();
619
620    let available_memory = system.available_memory();
621    let free_memory = system.free_memory();
622    let used_memory = system.used_memory();
623    let total_memory = system.total_memory();
624    let unused_memory = total_memory.saturating_sub(used_memory);
625
626    let mut usable = [available_memory, free_memory, unused_memory]
627        .into_iter()
628        .filter(|bytes| *bytes > 0)
629        .max();
630
631    let cgroup_free_memory = system
632        .cgroup_limits()
633        .filter(|limits| limits.total_memory > 0)
634        .map(|limits| limits.free_memory);
635    if let Some(cgroup_free_memory) = cgroup_free_memory {
636        usable = Some(usable.unwrap_or(u64::MAX).min(cgroup_free_memory));
637    }
638
639    debug!(
640        available_memory,
641        free_memory,
642        used_memory,
643        total_memory,
644        cgroup_free_memory,
645        usable_memory = ?usable,
646        "Detected usable memory for stream decrypt batch sizing"
647    );
648
649    usable
650}
651
652fn stream_decrypt_batch_memory_cap(usable_memory_bytes: u64) -> usize {
653    let budget = usable_memory_bytes / DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR;
654    let estimated_bytes_per_chunk = (self_encryption::MAX_CHUNK_SIZE as u64)
655        .saturating_mul(DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER)
656        .max(1);
657    let cap = (budget / estimated_bytes_per_chunk).max(1);
658
659    usize::try_from(cap).unwrap_or(usize::MAX)
660}
661
662fn adaptive_stream_decrypt_batch_size(
663    total_chunks: usize,
664    fetch_cap: usize,
665    configured_batch_floor: usize,
666    usable_memory_bytes: Option<u64>,
667) -> usize {
668    let fetch_target = fetch_cap
669        .max(1)
670        .saturating_mul(DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER);
671    let requested = match usable_memory_bytes {
672        Some(bytes) => {
673            let memory_cap = stream_decrypt_batch_memory_cap(bytes);
674            configured_batch_floor
675                .max(fetch_target)
676                .max(1)
677                .min(memory_cap)
678        }
679        None => configured_batch_floor.max(1),
680    };
681
682    requested.min(total_chunks.max(1)).max(1)
683}
684
685/// Whether the data map is published to the network for address-based retrieval.
686///
687/// A private upload stores only the data chunks and returns the `DataMap` to
688/// the caller — only someone holding that `DataMap` can reconstruct the file.
689/// A public upload additionally stores the serialized `DataMap` as a chunk on
690/// the network, yielding a single chunk address that anyone can use to
691/// retrieve the `DataMap` (via [`Client::data_map_fetch`]) and then the file.
692#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
693pub enum Visibility {
694    /// Keep the data map local; only the holder can retrieve the file.
695    #[default]
696    Private,
697    /// Publish the data map as a network chunk so anyone with the returned
698    /// address can retrieve and decrypt the file.
699    Public,
700}
701
702/// Confidence attached to an [`UploadCostEstimate`]'s `storage_cost_atto`.
703///
704/// `estimate_upload_cost` prices a file by sampling a few of its chunk
705/// addresses and extrapolating. When every sampled chunk is already stored
706/// there is no live price to extrapolate from, so a `"0"` cost can mean either
707/// "provably free" (the whole file was sampled) or only "probably free" (the
708/// tail was unsampled). This lets callers tell those apart instead of treating
709/// every `"0"` as unconditionally free.
710#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, serde::Serialize, serde::Deserialize)]
711#[serde(rename_all = "snake_case")]
712pub enum CostEstimateConfidence {
713    /// At least one sampled chunk returned a live quote; `storage_cost_atto`
714    /// is extrapolated from a real per-chunk price. The normal case.
715    #[default]
716    PricedSample,
717    /// Every chunk in the file was sampled and every one was already stored.
718    /// `storage_cost_atto` is exactly `"0"` — the upload is genuinely free.
719    VerifiedAllAlreadyStored,
720    /// Every *sampled* chunk was already stored, but not all chunks were
721    /// sampled. `storage_cost_atto` is `"0"` as a best-effort guess; the real
722    /// upload reconciles the true cost at payment time. Render this as "likely
723    /// already stored", not a guaranteed-free price.
724    AllSamplesAlreadyStoredIncomplete,
725}
726
727/// Estimated cost of uploading a file, returned by
728/// [`Client::estimate_upload_cost`].
729///
730/// Marked `#[non_exhaustive]` so adding a field later is not a breaking change
731/// for downstream consumers that construct or pattern-match on this struct.
732#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
733#[non_exhaustive]
734pub struct UploadCostEstimate {
735    /// Original file size in bytes.
736    pub file_size: u64,
737    /// Number of chunks the file would be split into (data chunks only,
738    /// does not include the DataMap chunk added during public uploads).
739    pub chunk_count: usize,
740    /// Estimated total storage cost in atto (token smallest unit).
741    pub storage_cost_atto: String,
742    /// Estimated gas cost in wei as a string. This is a rough heuristic
743    /// based on chunk count and payment mode, NOT a live gas price query.
744    pub estimated_gas_cost_wei: String,
745    /// Payment mode that would be used.
746    pub payment_mode: PaymentMode,
747    /// How much to trust `storage_cost_atto`. See [`CostEstimateConfidence`].
748    #[serde(default)]
749    pub confidence: CostEstimateConfidence,
750}
751
752/// Result of a file upload: the `DataMap` needed to retrieve the file.
753///
754/// Marked `#[non_exhaustive]` so adding a new field in future is not a
755/// breaking change for downstream consumers that construct or pattern-match
756/// on this struct.
757#[derive(Debug, Clone)]
758#[non_exhaustive]
759pub struct FileUploadResult {
760    /// The data map containing chunk metadata for reconstruction.
761    pub data_map: DataMap,
762    /// Number of chunks stored on the network.
763    pub chunks_stored: usize,
764    /// Number of chunks that failed to store. Always 0 for a successful
765    /// upload — partial-failure information is conveyed via
766    /// [`crate::data::Error::PartialUpload`] instead.
767    pub chunks_failed: usize,
768    /// Total number of chunks in the upload, including chunks that were
769    /// already stored and skipped. On full success this equals `chunks_stored`.
770    pub total_chunks: usize,
771    /// Which payment mode was actually used (not just requested).
772    pub payment_mode_used: PaymentMode,
773    /// Total storage cost paid in token units (atto). "0" if all chunks already existed.
774    pub storage_cost_atto: String,
775    /// Total gas cost in wei. 0 if no on-chain transactions were made.
776    pub gas_cost_wei: u128,
777    /// Chunk address of the serialized `DataMap`, set only for
778    /// [`Visibility::Public`] uploads. **`Some` means this address is
779    /// retrievable from the network (via [`Client::data_map_fetch`])**, not
780    /// necessarily that *this* upload paid to store it — if the serialized
781    /// `DataMap` hashed to a chunk that was already on the network (same
782    /// file uploaded before; deterministic via self-encryption), the address
783    /// is still returned but no storage payment was made for it.
784    pub data_map_address: Option<[u8; 32]>,
785    /// Sum of chunk-store RPC attempts across the upload
786    /// (`>= chunks_stored` on full success; more if any chunk retried).
787    /// `0` for paths that don't run the wave store loop.
788    pub chunk_attempts_total: usize,
789    /// Per-chunk store wall-clock in ms (length == `chunks_stored` on full
790    /// success, empty for paths that don't run the wave store loop).
791    pub store_durations_ms: Vec<u64>,
792    /// Count of stored chunks that succeeded on each retry round
793    /// (index 0 = first attempt, 1 = first retry, etc.). All zeros for
794    /// paths that don't run the wave store loop.
795    pub retries_histogram: [usize; 4],
796}
797
798/// Payment information for external signing — either wave-batch or merkle.
799#[derive(Debug)]
800pub enum ExternalPaymentInfo {
801    /// Wave-batch: individual (quote_hash, rewards_address, amount) tuples.
802    WaveBatch {
803        /// Chunks ready for payment (needed for finalize).
804        prepared_chunks: Vec<PreparedChunk>,
805        /// Payment intent for external signing.
806        payment_intent: PaymentIntent,
807    },
808    /// Merkle: single on-chain call with depth, pool commitments, timestamp.
809    Merkle {
810        /// The prepared merkle batch (public fields sent to frontend, private fields stay in Rust).
811        prepared_batch: PreparedMerkleBatch,
812        /// Raw chunk contents that still need upload after the preflight check.
813        chunk_contents: Vec<Bytes>,
814        /// Chunk addresses that still need upload after the preflight check.
815        chunk_addresses: Vec<[u8; 32]>,
816    },
817}
818
819/// Prepared upload ready for external payment.
820///
821/// Contains everything needed to construct the on-chain payment transaction
822/// externally (e.g. via WalletConnect in a desktop app) and then finalize
823/// the upload without a Rust-side wallet.
824///
825/// Note: This struct stays in Rust memory — only the public fields of
826/// `payment_info` are sent to the frontend. `PreparedChunk` contains
827/// non-serializable network types, so the full struct cannot derive `Serialize`.
828///
829/// Marked `#[non_exhaustive]` so adding a new field in future is not a
830/// breaking change for downstream consumers.
831#[derive(Debug)]
832#[non_exhaustive]
833pub struct PreparedUpload {
834    /// The data map for later retrieval.
835    pub data_map: DataMap,
836    /// Payment information for chunks that still need payment after the
837    /// already-stored preflight. This may be wave-batch even when the original
838    /// chunk count was merkle-eligible if the remaining count is below the
839    /// merkle threshold.
840    pub payment_info: ExternalPaymentInfo,
841    /// Chunk address of the serialized `DataMap` when this upload was
842    /// prepared with [`Visibility::Public`]. `Some` means the address is
843    /// retrievable on the network after finalization — either because this
844    /// upload paid to store the chunk in `payment_info`, or because the
845    /// chunk was already on the network (deterministic self-encryption).
846    /// Carried through to [`FileUploadResult::data_map_address`].
847    pub data_map_address: Option<[u8; 32]>,
848    /// Chunk addresses already present on the network when this upload was
849    /// prepared. These do not require payment or PUT during finalization.
850    pub already_stored_addresses: Vec<[u8; 32]>,
851    /// Total chunk count for the upload, including already-stored chunks.
852    pub total_chunks: usize,
853}
854
855/// Return type for [`spawn_file_encryption`]: chunk receiver, `DataMap` oneshot, join handle.
856type EncryptionChannels = (
857    tokio::sync::mpsc::Receiver<Bytes>,
858    tokio::sync::oneshot::Receiver<DataMap>,
859    tokio::task::JoinHandle<Result<()>>,
860);
861
862/// Spawn a blocking task that streams file encryption through a channel.
863fn spawn_file_encryption(path: PathBuf) -> Result<EncryptionChannels> {
864    let metadata = std::fs::metadata(&path)?;
865    let data_size = usize::try_from(metadata.len())
866        .map_err(|e| Error::Encryption(format!("file size exceeds platform usize: {e}")))?;
867
868    let (chunk_tx, chunk_rx) = tokio::sync::mpsc::channel(2);
869    let (datamap_tx, datamap_rx) = tokio::sync::oneshot::channel();
870
871    let handle = tokio::task::spawn_blocking(move || {
872        let file = std::fs::File::open(&path)?;
873        let mut reader = std::io::BufReader::new(file);
874
875        let read_error: Arc<Mutex<Option<std::io::Error>>> = Arc::new(Mutex::new(None));
876        let read_error_clone = Arc::clone(&read_error);
877
878        let data_iter = std::iter::from_fn(move || {
879            let mut buffer = vec![0u8; 8192];
880            match std::io::Read::read(&mut reader, &mut buffer) {
881                Ok(0) => None,
882                Ok(n) => {
883                    buffer.truncate(n);
884                    Some(Bytes::from(buffer))
885                }
886                Err(e) => {
887                    let mut guard = read_error_clone
888                        .lock()
889                        .unwrap_or_else(|poisoned| poisoned.into_inner());
890                    *guard = Some(e);
891                    None
892                }
893            }
894        });
895
896        let mut stream = stream_encrypt(data_size, data_iter)
897            .map_err(|e| Error::Encryption(format!("stream_encrypt failed: {e}")))?;
898
899        for chunk_result in stream.chunks() {
900            // Check for captured read errors immediately after each chunk.
901            // stream_encrypt sees None (EOF) when a read fails, so it stops
902            // producing chunks. We must detect this before sending the
903            // partial results to avoid uploading a truncated DataMap.
904            {
905                let guard = read_error
906                    .lock()
907                    .unwrap_or_else(|poisoned| poisoned.into_inner());
908                if let Some(ref e) = *guard {
909                    return Err(Error::Io(std::io::Error::new(e.kind(), e.to_string())));
910                }
911            }
912
913            let (_hash, content) = chunk_result
914                .map_err(|e| Error::Encryption(format!("chunk encryption failed: {e}")))?;
915            if chunk_tx.blocking_send(content).is_err() {
916                return Err(Error::Encryption("upload receiver dropped".to_string()));
917            }
918        }
919
920        // Final check: read error after last chunk (stream saw EOF).
921        {
922            let guard = read_error
923                .lock()
924                .unwrap_or_else(|poisoned| poisoned.into_inner());
925            if let Some(ref e) = *guard {
926                return Err(Error::Io(std::io::Error::new(e.kind(), e.to_string())));
927            }
928        }
929
930        let datamap = stream
931            .into_datamap()
932            .ok_or_else(|| Error::Encryption("no DataMap after encryption".to_string()))?;
933        if datamap_tx.send(datamap).is_err() {
934            warn!("DataMap receiver dropped — upload may have been cancelled");
935        }
936        Ok(())
937    });
938
939    Ok((chunk_rx, datamap_rx, handle))
940}
941
942/// RAII guard for the staging temp file used during a disk download.
943///
944/// Removes the file on drop — including a panic unwind out of the
945/// `block_in_place` decrypt loop — unless [`commit`](Self::commit) has
946/// promoted it to its final path. Centralizes the cleanup the explicit error
947/// arms used to repeat.
948struct TempDownload {
949    /// `Some` while the staging file may need cleanup; `None` once committed.
950    path: Option<PathBuf>,
951}
952
953impl TempDownload {
954    fn new(path: PathBuf) -> Self {
955        Self { path: Some(path) }
956    }
957
958    /// Path of the staging file (valid until `commit`).
959    fn path(&self) -> &Path {
960        self.path
961            .as_deref()
962            .expect("TempDownload::path called after commit")
963    }
964
965    /// Rename the staged file to `dest`. On success the guard is defused so
966    /// `Drop` is a no-op; on failure the guard stays armed and `Drop` removes
967    /// the orphaned temp file.
968    fn commit(mut self, dest: &Path) -> std::io::Result<()> {
969        std::fs::rename(self.path(), dest)?; // err → guard armed → Drop cleans up
970        self.path = None; // success → nothing left to clean
971        Ok(())
972    }
973}
974
975impl Drop for TempDownload {
976    fn drop(&mut self) {
977        if let Some(path) = self.path.take() {
978            if let Err(e) = std::fs::remove_file(&path) {
979                // Absent file is fine (never created / already gone).
980                if e.kind() != std::io::ErrorKind::NotFound {
981                    warn!(
982                        "Failed to remove temp download file {}: {e}",
983                        path.display()
984                    );
985                }
986            }
987        }
988    }
989}
990
991impl Client {
992    /// Upload a file to the network using streaming self-encryption.
993    ///
994    /// Automatically selects merkle batch payment for files that produce
995    /// 64+ chunks (saves gas). Encrypted chunks are spilled to a temp
996    /// directory so peak memory stays at ~256 MB regardless of file size.
997    ///
998    /// # Errors
999    ///
1000    /// Returns an error if the file cannot be read, encryption fails,
1001    /// or any chunk cannot be stored.
1002    pub async fn file_upload(&self, path: &Path) -> Result<FileUploadResult> {
1003        self.file_upload_with_mode(path, PaymentMode::Auto).await
1004    }
1005
1006    /// Estimate the cost of uploading a file without actually uploading.
1007    ///
1008    /// Encrypts the file to determine chunk count and sizes, then requests
1009    /// a single quote from the network for a representative chunk. The
1010    /// per-chunk price is extrapolated to the total chunk count.
1011    ///
1012    /// The estimate is fast (~2-5s) and does not require a wallet. Spilled
1013    /// chunks are cleaned up automatically when the function returns.
1014    ///
1015    /// Gas cost is an advisory heuristic, not a live gas-oracle query. It is
1016    /// derived from realistic per-transaction budgets (`GAS_PER_WAVE_TX`,
1017    /// `GAS_PER_MERKLE_TX`) priced at `ARBITRUM_GAS_PRICE_WEI`. Real gas
1018    /// varies with network conditions.
1019    ///
1020    /// Sampled chunk addresses are spread across the whole file (not the first
1021    /// N) so a shared leading prefix doesn't bias the sample. When a sample
1022    /// returns a live quote the per-chunk price is extrapolated and the result
1023    /// is tagged [`CostEstimateConfidence::PricedSample`].
1024    ///
1025    /// When every sampled chunk is already stored the result is still `Ok`
1026    /// with `storage_cost_atto: "0"`, tagged either
1027    /// [`CostEstimateConfidence::VerifiedAllAlreadyStored`] when the whole file
1028    /// was sampled (exactly free) or
1029    /// [`CostEstimateConfidence::AllSamplesAlreadyStoredIncomplete`] when the
1030    /// tail was unsampled (a best-effort guess that payment reconciles).
1031    ///
1032    /// # Errors
1033    ///
1034    /// Returns an error if the file cannot be read, encryption fails, or the
1035    /// network cannot provide a quote.
1036    pub async fn estimate_upload_cost(
1037        &self,
1038        path: &Path,
1039        mode: PaymentMode,
1040        progress: Option<mpsc::Sender<UploadEvent>>,
1041    ) -> Result<UploadCostEstimate> {
1042        let file_size = std::fs::metadata(path).map_err(Error::Io)?.len();
1043
1044        if file_size < 3 {
1045            return Err(Error::InvalidData(
1046                "File too small: self-encryption requires at least 3 bytes".into(),
1047            ));
1048        }
1049
1050        check_disk_space_for_spill(file_size)?;
1051
1052        info!(
1053            "Estimating upload cost for {} ({file_size} bytes)",
1054            path.display()
1055        );
1056
1057        let (spill, _data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
1058        let chunk_count = spill.len();
1059
1060        if let Some(ref tx) = progress {
1061            let _ = tx
1062                .send(UploadEvent::Encrypted {
1063                    total_chunks: chunk_count,
1064                })
1065                .await;
1066        }
1067
1068        info!("Encrypted into {chunk_count} chunks, requesting quote");
1069        let uses_merkle = should_use_merkle(chunk_count, mode);
1070
1071        // Sample chunk addresses spread evenly across the file (see
1072        // `distributed_sample_indices`) rather than the first N. A single
1073        // AlreadyStored result says nothing about the rest of the file, and a
1074        // positional sample lands on a shared leading prefix in the worst case,
1075        // so we spread the probe and only treat the whole file as "fully
1076        // stored" when every sample comes back stored.
1077        let sample_indices = distributed_sample_indices(spill.addresses.len(), ESTIMATE_SAMPLE_CAP);
1078        let mut sampled = 0usize;
1079        let mut all_already_stored = true;
1080        let mut quotes_opt: Option<Vec<QuoteEntry>> = None;
1081
1082        for &idx in &sample_indices {
1083            let addr = &spill.addresses[idx];
1084            sampled += 1;
1085            let chunk_bytes = spill.read_chunk(addr)?;
1086            let data_size = u64::try_from(chunk_bytes.len())
1087                .map_err(|e| Error::InvalidData(format!("chunk size too large: {e}")))?;
1088            let result = if uses_merkle {
1089                self.get_store_quotes_with_fault_tolerance(addr, data_size, DATA_TYPE_CHUNK)
1090                    .await
1091            } else {
1092                self.get_store_quotes(addr, data_size, DATA_TYPE_CHUNK)
1093                    .await
1094            };
1095            match result {
1096                Ok(q) => {
1097                    quotes_opt = Some(q);
1098                    all_already_stored = false;
1099                    break;
1100                }
1101                Err(Error::AlreadyStored) => {
1102                    debug!(
1103                        "Sample chunk {} already stored; trying next address ({sampled}/{})",
1104                        hex::encode(addr),
1105                        sample_indices.len()
1106                    );
1107                    continue;
1108                }
1109                Err(e) => return Err(e),
1110            }
1111        }
1112
1113        let quotes = match quotes_opt {
1114            Some(q) => q,
1115            None if all_already_stored && sampled == chunk_count => {
1116                // Every address in the file was sampled and every one is
1117                // already on the network — a zero-cost estimate is exact here.
1118                info!("All {chunk_count} chunks already stored; returning zero-cost estimate");
1119                return Ok(UploadCostEstimate {
1120                    file_size,
1121                    chunk_count,
1122                    storage_cost_atto: "0".into(),
1123                    estimated_gas_cost_wei: "0".into(),
1124                    payment_mode: if uses_merkle {
1125                        PaymentMode::Merkle
1126                    } else {
1127                        PaymentMode::Single
1128                    },
1129                    confidence: CostEstimateConfidence::VerifiedAllAlreadyStored,
1130                });
1131            }
1132            None => {
1133                // Every sampled chunk was already stored but the tail was not
1134                // sampled, so there is no live price to extrapolate. The
1135                // estimate is display-only and payment reconciles the true
1136                // cost, so return an optimistic zero flagged as incomplete
1137                // rather than erroring — callers still get a value to show.
1138                info!(
1139                    "All {sampled}/{chunk_count} sampled chunks already stored; \
1140                     returning incomplete zero-cost estimate"
1141                );
1142                return Ok(UploadCostEstimate {
1143                    file_size,
1144                    chunk_count,
1145                    storage_cost_atto: "0".into(),
1146                    estimated_gas_cost_wei: "0".into(),
1147                    payment_mode: if uses_merkle {
1148                        PaymentMode::Merkle
1149                    } else {
1150                        PaymentMode::Single
1151                    },
1152                    confidence: CostEstimateConfidence::AllSamplesAlreadyStoredIncomplete,
1153                });
1154            }
1155        };
1156
1157        // Use the median price × 3 (matches SingleNodePayment::from_quotes
1158        // which pays 3x the median to incentivize reliable storage).
1159        let mut prices: Vec<Amount> = quotes.iter().map(|(_, _, _, price)| *price).collect();
1160        prices.sort();
1161        let median_price = prices
1162            .get(prices.len() / 2)
1163            .copied()
1164            .unwrap_or(Amount::ZERO);
1165        let per_chunk_cost = median_price * Amount::from(3u64);
1166
1167        let chunk_count_u64 = u64::try_from(chunk_count).unwrap_or(u64::MAX);
1168        let total_storage = per_chunk_cost * Amount::from(chunk_count_u64);
1169
1170        // Estimate gas cost from realistic per-transaction budgets rather
1171        // than a flat per-chunk or per-wave number.
1172        //
1173        // - Single mode: `batch_pay` packs up to UPLOAD_WAVE_SIZE chunks'
1174        //   close-group quotes into one `pay_for_quotes` call on Arbitrum.
1175        //   The dominant cost is one SSTORE per entry plus base tx overhead,
1176        //   so we use GAS_PER_WAVE_TX (≈1.5M) as a conservative upper bound
1177        //   on a full wave and multiply by the number of waves. The previous
1178        //   per-wave figure of 150k was closer to a single-entry transfer
1179        //   and understated cost by 5–10x for full waves.
1180        // - Merkle mode: one tx per sub-batch that verifies a merkle tree
1181        //   and posts a pool commitment (GAS_PER_MERKLE_TX ≈ 500k each).
1182        //
1183        // Gas is priced at ARBITRUM_GAS_PRICE_WEI (~0.1 gwei, a typical
1184        // Arbitrum baseline). Treat the result as advisory, not a commitment.
1185        let waves = u128::try_from(chunk_count.div_ceil(UPLOAD_WAVE_SIZE)).unwrap_or(u128::MAX);
1186        let merkle_batches = u128::try_from(chunk_count.div_ceil(MAX_LEAVES)).unwrap_or(u128::MAX);
1187        let estimated_gas: u128 = if uses_merkle {
1188            merkle_batches
1189                .saturating_mul(GAS_PER_MERKLE_TX)
1190                .saturating_mul(ARBITRUM_GAS_PRICE_WEI)
1191        } else {
1192            waves
1193                .saturating_mul(GAS_PER_WAVE_TX)
1194                .saturating_mul(ARBITRUM_GAS_PRICE_WEI)
1195        };
1196
1197        info!(
1198            "Estimate: {chunk_count} chunks, storage={total_storage} atto, gas~={estimated_gas} wei"
1199        );
1200
1201        Ok(UploadCostEstimate {
1202            file_size,
1203            chunk_count,
1204            storage_cost_atto: total_storage.to_string(),
1205            estimated_gas_cost_wei: estimated_gas.to_string(),
1206            payment_mode: if uses_merkle {
1207                PaymentMode::Merkle
1208            } else {
1209                PaymentMode::Single
1210            },
1211            confidence: CostEstimateConfidence::PricedSample,
1212        })
1213    }
1214
1215    /// Phase 1 of external-signer upload: encrypt file and prepare chunks.
1216    ///
1217    /// Equivalent to [`Client::file_prepare_upload_with_visibility`] with
1218    /// [`Visibility::Private`] — see that method for details.
1219    pub async fn file_prepare_upload(&self, path: &Path) -> Result<PreparedUpload> {
1220        self.file_prepare_upload_with_progress(path, Visibility::Private, None)
1221            .await
1222    }
1223
1224    /// Phase 1 of external-signer upload with explicit [`Visibility`] control.
1225    ///
1226    /// Equivalent to [`Client::file_prepare_upload_with_progress`] with
1227    /// `progress: None` — see that method for details.
1228    pub async fn file_prepare_upload_with_visibility(
1229        &self,
1230        path: &Path,
1231        visibility: Visibility,
1232    ) -> Result<PreparedUpload> {
1233        self.file_prepare_upload_with_progress(path, visibility, None)
1234            .await
1235    }
1236
1237    /// Phase 1 of external-signer upload with progress events.
1238    ///
1239    /// Requires an EVM network (for contract price queries) but NOT a wallet.
1240    /// Returns a [`PreparedUpload`] containing the data map, prepared chunks,
1241    /// and a [`PaymentIntent`] that the external signer uses to construct
1242    /// and submit the on-chain payment transaction.
1243    ///
1244    /// When `visibility` is [`Visibility::Public`], the serialized `DataMap`
1245    /// is bundled into the payment batch as an additional chunk and its
1246    /// address is recorded on the returned [`PreparedUpload`]. After
1247    /// [`Client::finalize_upload`] (or `_merkle`) succeeds, that address is
1248    /// surfaced via [`FileUploadResult::data_map_address`] so the uploader
1249    /// can share a single address from which anyone can retrieve the file.
1250    ///
1251    /// When `progress` is `Some`, [`UploadEvent`]s are emitted on the channel
1252    /// during encryption ([`UploadEvent::Encrypting`] / [`UploadEvent::Encrypted`])
1253    /// and per-chunk quoting ([`UploadEvent::ChunkQuoted`]). Storage events are
1254    /// emitted later by [`Client::finalize_upload_with_progress`] /
1255    /// [`Client::finalize_upload_merkle_with_progress`].
1256    ///
1257    /// **Memory note:** Encryption uses disk spilling for bounded memory, but
1258    /// the returned [`PreparedUpload`] holds all chunk content in memory (each
1259    /// [`PreparedChunk`] contains a `Bytes` with the full chunk data). This is
1260    /// inherent to the two-phase external-signer protocol — the chunks must
1261    /// stay in memory until [`Client::finalize_upload`] stores them. For very
1262    /// large files, prefer [`Client::file_upload`] which streams directly.
1263    ///
1264    /// # Errors
1265    ///
1266    /// Returns an error if there is insufficient disk space, the file cannot
1267    /// be read, encryption fails, or quote collection fails.
1268    pub async fn file_prepare_upload_with_progress(
1269        &self,
1270        path: &Path,
1271        visibility: Visibility,
1272        progress: Option<mpsc::Sender<UploadEvent>>,
1273    ) -> Result<PreparedUpload> {
1274        debug!(
1275            "Preparing file upload for external signing (visibility={visibility:?}): {}",
1276            path.display()
1277        );
1278
1279        let file_size = std::fs::metadata(path)?.len();
1280        check_disk_space_for_spill(file_size)?;
1281
1282        let (spill, data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
1283
1284        info!(
1285            "Encrypted {} into {} chunks for external signing (spilled to disk)",
1286            path.display(),
1287            spill.len()
1288        );
1289
1290        // Read each chunk from disk and collect quotes concurrently.
1291        // Note: all PreparedChunks accumulate in memory because the external-signer
1292        // protocol requires them for finalize_upload. NOT memory-bounded for large files.
1293        let mut chunk_data: Vec<Bytes> = spill
1294            .addresses
1295            .iter()
1296            .map(|addr| spill.read_chunk(addr))
1297            .collect::<std::result::Result<Vec<_>, _>>()?;
1298
1299        // For public uploads, bundle the serialized DataMap as an extra chunk
1300        // in the same payment batch. This lets the external signer pay for
1301        // the data chunks and the DataMap chunk in one flow, and lets the
1302        // finalize step return the DataMap's chunk address as the shareable
1303        // retrieval address.
1304        let data_map_address = match visibility {
1305            Visibility::Private => None,
1306            Visibility::Public => {
1307                let serialized = rmp_serde::to_vec(&data_map).map_err(|e| {
1308                    Error::Serialization(format!("Failed to serialize DataMap: {e}"))
1309                })?;
1310                let bytes = Bytes::from(serialized);
1311                let address = compute_address(&bytes);
1312                info!(
1313                    "Public upload: bundling DataMap chunk ({} bytes) at address {}",
1314                    bytes.len(),
1315                    hex::encode(address)
1316                );
1317                chunk_data.push(bytes);
1318                Some(address)
1319            }
1320        };
1321
1322        let chunk_count = chunk_data.len();
1323
1324        if let Some(ref tx) = progress {
1325            let _ = tx
1326                .send(UploadEvent::Encrypted {
1327                    total_chunks: chunk_count,
1328                })
1329                .await;
1330        }
1331
1332        let (payment_info, already_stored_addresses) = if should_use_merkle(
1333            chunk_count,
1334            PaymentMode::Auto,
1335        ) {
1336            // Merkle path: build tree, collect candidate pools, return for external payment.
1337            info!("Using merkle batch preparation for {chunk_count} file chunks");
1338
1339            let chunk_entries: Vec<([u8; 32], u64)> = chunk_data
1340                .iter()
1341                .map(|chunk| {
1342                    let size = u64::try_from(chunk.len())
1343                        .map_err(|e| Error::InvalidData(format!("chunk size too large: {e}")))?;
1344                    Ok((compute_address(chunk), size))
1345                })
1346                .collect::<Result<Vec<_>>>()?;
1347
1348            let merkle_plan = self
1349                .plan_merkle_upload(chunk_entries, DATA_TYPE_CHUNK, progress.as_ref())
1350                .await?;
1351
1352            if merkle_plan.to_upload.is_empty() {
1353                info!("All {chunk_count} file chunks already stored; no external payment needed");
1354                (
1355                    ExternalPaymentInfo::WaveBatch {
1356                        prepared_chunks: Vec::new(),
1357                        payment_intent: PaymentIntent::from_prepared_chunks(&[]),
1358                    },
1359                    merkle_plan.already_stored,
1360                )
1361            } else {
1362                let chunk_data =
1363                    chunk_contents_for_upload_addresses(chunk_data, &merkle_plan.to_upload)?;
1364
1365                if !should_use_merkle(merkle_plan.to_upload.len(), PaymentMode::Auto) {
1366                    info!(
1367                        "{} file chunks need upload after merkle preflight; preparing wave-batch payment",
1368                        merkle_plan.to_upload.len()
1369                    );
1370                    let (payment_info, mut wave_already_stored) = self
1371                        .prepare_wave_batch_external_chunks(
1372                            chunk_data,
1373                            progress.as_ref(),
1374                            chunk_count,
1375                        )
1376                        .await?;
1377                    let mut already_stored = merkle_plan.already_stored;
1378                    already_stored.append(&mut wave_already_stored);
1379                    (payment_info, already_stored)
1380                } else {
1381                    match self
1382                        .prepare_merkle_batch_external(
1383                            &merkle_plan.to_upload,
1384                            DATA_TYPE_CHUNK,
1385                            merkle_plan.to_upload_avg_size(),
1386                        )
1387                        .await
1388                    {
1389                        Ok(prepared_batch) => {
1390                            info!(
1391                                "File prepared for external merkle signing: {} chunks, depth={} ({})",
1392                                merkle_plan.to_upload.len(),
1393                                prepared_batch.depth,
1394                                path.display()
1395                            );
1396
1397                            (
1398                                ExternalPaymentInfo::Merkle {
1399                                    prepared_batch,
1400                                    chunk_contents: chunk_data,
1401                                    chunk_addresses: merkle_plan.to_upload,
1402                                },
1403                                merkle_plan.already_stored,
1404                            )
1405                        }
1406                        Err(Error::InsufficientPeers(ref msg)) => {
1407                            info!(
1408                                "External merkle preparation needs more peers ({msg}); preparing wave-batch payment"
1409                            );
1410                            let (payment_info, mut wave_already_stored) = self
1411                                .prepare_wave_batch_external_chunks(
1412                                    chunk_data,
1413                                    progress.as_ref(),
1414                                    chunk_count,
1415                                )
1416                                .await?;
1417                            let mut already_stored = merkle_plan.already_stored;
1418                            already_stored.append(&mut wave_already_stored);
1419                            (payment_info, already_stored)
1420                        }
1421                        Err(e) => return Err(e),
1422                    }
1423                }
1424            }
1425        } else {
1426            self.prepare_wave_batch_external_chunks(chunk_data, progress.as_ref(), chunk_count)
1427                .await?
1428        };
1429
1430        // Surface the "DataMap chunk was already on the network" case
1431        // so debugging "why is data_map_address set but no storage cost
1432        // appears for it?" doesn't require reading the source. See the
1433        // `data_map_address` doc comment for why this is still a valid
1434        // `Some(addr)` outcome.
1435        if let Some(addr) = data_map_address {
1436            let data_map_needs_payment = match &payment_info {
1437                ExternalPaymentInfo::WaveBatch {
1438                    prepared_chunks, ..
1439                } => prepared_chunks.iter().any(|c| c.address == addr),
1440                ExternalPaymentInfo::Merkle {
1441                    chunk_addresses, ..
1442                } => chunk_addresses.contains(&addr),
1443            };
1444            if !data_map_needs_payment {
1445                info!(
1446                    "Public upload: DataMap chunk {} was already stored \
1447                     on the network — address is retrievable without a \
1448                     new payment",
1449                    hex::encode(addr)
1450                );
1451            }
1452        }
1453
1454        Ok(PreparedUpload {
1455            data_map,
1456            payment_info,
1457            data_map_address,
1458            already_stored_addresses,
1459            total_chunks: chunk_count,
1460        })
1461    }
1462
1463    async fn prepare_wave_batch_external_chunks(
1464        &self,
1465        chunk_data: Vec<Bytes>,
1466        progress: Option<&mpsc::Sender<UploadEvent>>,
1467        progress_total: usize,
1468    ) -> Result<(ExternalPaymentInfo, Vec<[u8; 32]>)> {
1469        let chunk_count = chunk_data.len();
1470        let chunks_with_addr: Vec<(Bytes, [u8; 32])> = chunk_data
1471            .into_iter()
1472            .map(|content| {
1473                let address = compute_address(&content);
1474                (content, address)
1475            })
1476            .collect();
1477
1478        // Wave-batch path: collect quotes per chunk concurrently, emitting
1479        // a `ChunkQuoted` event after each completion so callers can drive
1480        // a progress bar through the slow quote phase.
1481        let quote_limiter = self.controller().quote.clone();
1482        let quote_concurrency = quote_limiter.current().min(chunk_count.max(1));
1483        let mut quote_stream = stream::iter(chunks_with_addr)
1484            .map(|(content, address)| {
1485                let limiter = quote_limiter.clone();
1486                async move {
1487                    let result = observe_op(
1488                        &limiter,
1489                        || async move { self.prepare_chunk_payment(content).await },
1490                        classify_error,
1491                    )
1492                    .await;
1493                    (address, result)
1494                }
1495            })
1496            .buffer_unordered(quote_concurrency);
1497
1498        let mut prepared_chunks = Vec::with_capacity(chunk_count);
1499        let mut already_stored = Vec::new();
1500        let mut quoted = 0usize;
1501        while let Some((address, result)) = quote_stream.next().await {
1502            match result? {
1503                Some(prepared) => prepared_chunks.push(prepared),
1504                None => already_stored.push(address),
1505            }
1506            quoted += 1;
1507            if let Some(tx) = progress {
1508                let _ = tx.try_send(UploadEvent::ChunkQuoted {
1509                    quoted,
1510                    total: progress_total,
1511                });
1512            }
1513        }
1514
1515        let payment_intent = PaymentIntent::from_prepared_chunks(&prepared_chunks);
1516        info!(
1517            "Prepared external wave-batch payment: {} chunks, {} already stored, total {} atto",
1518            prepared_chunks.len(),
1519            already_stored.len(),
1520            payment_intent.total_amount,
1521        );
1522
1523        Ok((
1524            ExternalPaymentInfo::WaveBatch {
1525                prepared_chunks,
1526                payment_intent,
1527            },
1528            already_stored,
1529        ))
1530    }
1531
1532    /// Phase 2 of external-signer upload (wave-batch): finalize with externally-signed tx hashes.
1533    ///
1534    /// Takes a [`PreparedUpload`] that used wave-batch payment and a map
1535    /// of `quote_hash -> tx_hash` provided by the external signer after on-chain
1536    /// payment. Builds payment proofs and stores chunks on the network.
1537    ///
1538    /// # Errors
1539    ///
1540    /// Returns an error if the prepared upload used merkle payment (use
1541    /// [`Client::finalize_upload_merkle`] instead), proof construction fails,
1542    /// or any chunk cannot be stored.
1543    pub async fn finalize_upload(
1544        &self,
1545        prepared: PreparedUpload,
1546        tx_hash_map: &HashMap<QuoteHash, TxHash>,
1547    ) -> Result<FileUploadResult> {
1548        self.finalize_upload_with_progress(prepared, tx_hash_map, None)
1549            .await
1550    }
1551
1552    /// Phase 2 of external-signer upload (wave-batch) with progress events.
1553    ///
1554    /// Same as [`Client::finalize_upload`] but emits [`UploadEvent::ChunkStored`]
1555    /// on the provided channel as each chunk is successfully stored.
1556    ///
1557    /// # Errors
1558    ///
1559    /// Same as [`Client::finalize_upload`].
1560    pub async fn finalize_upload_with_progress(
1561        &self,
1562        prepared: PreparedUpload,
1563        tx_hash_map: &HashMap<QuoteHash, TxHash>,
1564        progress: Option<mpsc::Sender<UploadEvent>>,
1565    ) -> Result<FileUploadResult> {
1566        let data_map_address = prepared.data_map_address;
1567        let already_stored_addresses = prepared.already_stored_addresses;
1568        let already_stored_count = already_stored_addresses.len();
1569        let total_chunks = prepared.total_chunks;
1570        match prepared.payment_info {
1571            ExternalPaymentInfo::WaveBatch {
1572                prepared_chunks,
1573                payment_intent,
1574            } => {
1575                let paid_chunks = finalize_batch_payment(prepared_chunks, tx_hash_map)?;
1576                let wave_result = self
1577                    .store_paid_chunks_with_events(
1578                        paid_chunks,
1579                        progress.as_ref(),
1580                        already_stored_count,
1581                        total_chunks,
1582                    )
1583                    .await;
1584                if !wave_result.failed.is_empty() {
1585                    let failed_count = wave_result.failed.len();
1586                    let stored_count = already_stored_count + wave_result.stored.len();
1587                    let mut stored = already_stored_addresses;
1588                    stored.extend(wave_result.stored);
1589                    return Err(Error::PartialUpload {
1590                        stored,
1591                        stored_count,
1592                        failed: wave_result.failed,
1593                        failed_count,
1594                        total_chunks,
1595                        // Report the storage spend known from the payment intent
1596                        // the external signer was handed. Gas is paid by the
1597                        // signer out-of-band, so it stays unknown (0).
1598                        spend: Box::new(PartialUploadSpend {
1599                            storage_cost_atto: payment_intent.total_amount.to_string(),
1600                            gas_cost_wei: 0,
1601                        }),
1602                        reason: "finalize_upload: chunk storage failed after retries".into(),
1603                    });
1604                }
1605                let chunks_stored = already_stored_count + wave_result.stored.len();
1606
1607                info!("External-signer upload finalized: {chunks_stored} chunks stored");
1608
1609                let mut stats = WaveAggregateStats::default();
1610                stats.absorb(&wave_result);
1611
1612                Ok(FileUploadResult {
1613                    data_map: prepared.data_map,
1614                    chunks_stored,
1615                    chunks_failed: 0,
1616                    total_chunks,
1617                    payment_mode_used: PaymentMode::Single,
1618                    // Storage spend is known from the payment intent; gas is
1619                    // paid by the external signer out-of-band (unknown here).
1620                    storage_cost_atto: payment_intent.total_amount.to_string(),
1621                    gas_cost_wei: 0,
1622                    data_map_address,
1623                    chunk_attempts_total: stats.chunk_attempts_total,
1624                    store_durations_ms: stats.store_durations_ms,
1625                    retries_histogram: stats.retries_histogram,
1626                })
1627            }
1628            ExternalPaymentInfo::Merkle { .. } => Err(Error::Payment(
1629                "Cannot finalize merkle upload with wave-batch tx hashes. \
1630                 Use finalize_upload_merkle() instead."
1631                    .to_string(),
1632            )),
1633        }
1634    }
1635
1636    /// Phase 2 of external-signer upload (merkle): finalize with winner pool hash.
1637    ///
1638    /// Takes a [`PreparedUpload`] that used merkle payment and the `winner_pool_hash`
1639    /// returned by the on-chain merkle payment transaction. Generates proofs and
1640    /// stores chunks on the network.
1641    ///
1642    /// # Errors
1643    ///
1644    /// Returns an error if the prepared upload used wave-batch payment (use
1645    /// [`Client::finalize_upload`] instead), proof generation fails,
1646    /// or any chunk cannot be stored.
1647    pub async fn finalize_upload_merkle(
1648        &self,
1649        prepared: PreparedUpload,
1650        winner_pool_hash: [u8; 32],
1651    ) -> Result<FileUploadResult> {
1652        self.finalize_upload_merkle_with_progress(prepared, winner_pool_hash, None)
1653            .await
1654    }
1655
1656    /// Phase 2 of external-signer upload (merkle) with progress events.
1657    ///
1658    /// Same as [`Client::finalize_upload_merkle`] but emits [`UploadEvent::ChunkStored`]
1659    /// on the provided channel as each chunk is successfully stored.
1660    ///
1661    /// # Errors
1662    ///
1663    /// Same as [`Client::finalize_upload_merkle`].
1664    pub async fn finalize_upload_merkle_with_progress(
1665        &self,
1666        prepared: PreparedUpload,
1667        winner_pool_hash: [u8; 32],
1668        progress: Option<mpsc::Sender<UploadEvent>>,
1669    ) -> Result<FileUploadResult> {
1670        let data_map_address = prepared.data_map_address;
1671        let already_stored_count = prepared.already_stored_addresses.len();
1672        let total_chunks = prepared.total_chunks;
1673        match prepared.payment_info {
1674            ExternalPaymentInfo::Merkle {
1675                prepared_batch,
1676                chunk_contents,
1677                chunk_addresses,
1678            } => {
1679                let batch_result = finalize_merkle_batch(prepared_batch, winner_pool_hash)?;
1680                let outcome = self
1681                    .merkle_upload_chunks(
1682                        chunk_contents,
1683                        chunk_addresses,
1684                        &batch_result,
1685                        progress.as_ref(),
1686                        already_stored_count,
1687                        total_chunks,
1688                    )
1689                    .await?;
1690
1691                info!(
1692                    "External-signer merkle upload finalized: {} chunks stored, {} failed",
1693                    outcome.stored, outcome.failed
1694                );
1695
1696                Ok(FileUploadResult {
1697                    data_map: prepared.data_map,
1698                    chunks_stored: outcome.stored,
1699                    chunks_failed: outcome.failed,
1700                    total_chunks,
1701                    payment_mode_used: PaymentMode::Merkle,
1702                    storage_cost_atto: "0".into(),
1703                    gas_cost_wei: 0,
1704                    data_map_address,
1705                    chunk_attempts_total: outcome.stats.chunk_attempts_total,
1706                    store_durations_ms: outcome.stats.store_durations_ms,
1707                    retries_histogram: outcome.stats.retries_histogram,
1708                })
1709            }
1710            ExternalPaymentInfo::WaveBatch { .. } => Err(Error::Payment(
1711                "Cannot finalize wave-batch upload with merkle winner hash. \
1712                 Use finalize_upload() instead."
1713                    .to_string(),
1714            )),
1715        }
1716    }
1717
1718    /// Upload a file with a specific payment mode.
1719    ///
1720    /// Before encryption, checks that the temp directory has enough free
1721    /// disk space for the spilled chunks (~1.1× source file size).
1722    ///
1723    /// Encrypted chunks are spilled to a temp directory during encryption
1724    /// so that only their 32-byte addresses stay in memory. At upload time,
1725    /// chunks are read back one wave at a time (~64 × 4 MB ≈ 256 MB peak).
1726    ///
1727    /// # Errors
1728    ///
1729    /// Returns an error if there is insufficient disk space, the file cannot
1730    /// be read, encryption fails, or any chunk cannot be stored.
1731    #[allow(clippy::too_many_lines)]
1732    pub async fn file_upload_with_mode(
1733        &self,
1734        path: &Path,
1735        mode: PaymentMode,
1736    ) -> Result<FileUploadResult> {
1737        self.file_upload_with_progress(path, mode, None).await
1738    }
1739
1740    /// Upload a file publicly, storing the serialized [`DataMap`] as part of
1741    /// the same upload payment batch.
1742    ///
1743    /// The returned [`FileUploadResult::data_map_address`] can be shared for
1744    /// public downloads via [`Client::data_map_fetch`].
1745    #[allow(clippy::too_many_lines)]
1746    pub async fn file_upload_public_with_mode(
1747        &self,
1748        path: &Path,
1749        mode: PaymentMode,
1750    ) -> Result<FileUploadResult> {
1751        self.file_upload_with_visibility_and_progress(path, mode, Visibility::Public, None)
1752            .await
1753    }
1754
1755    /// Upload a file with progress events sent to the given channel.
1756    ///
1757    /// Same as [`Client::file_upload_with_mode`] but sends [`UploadEvent`]s to the
1758    /// provided channel for UI progress feedback.
1759    #[allow(clippy::too_many_lines)]
1760    pub async fn file_upload_with_progress(
1761        &self,
1762        path: &Path,
1763        mode: PaymentMode,
1764        progress: Option<mpsc::Sender<UploadEvent>>,
1765    ) -> Result<FileUploadResult> {
1766        self.file_upload_with_visibility_and_progress(path, mode, Visibility::Private, progress)
1767            .await
1768    }
1769
1770    /// Public file upload with progress events.
1771    ///
1772    /// Same as [`Client::file_upload_public_with_mode`] but sends
1773    /// [`UploadEvent`]s to the provided channel for UI progress feedback.
1774    #[allow(clippy::too_many_lines)]
1775    pub async fn file_upload_public_with_progress(
1776        &self,
1777        path: &Path,
1778        mode: PaymentMode,
1779        progress: Option<mpsc::Sender<UploadEvent>>,
1780    ) -> Result<FileUploadResult> {
1781        self.file_upload_with_visibility_and_progress(path, mode, Visibility::Public, progress)
1782            .await
1783    }
1784
1785    #[allow(clippy::too_many_lines)]
1786    async fn file_upload_with_visibility_and_progress(
1787        &self,
1788        path: &Path,
1789        mode: PaymentMode,
1790        visibility: Visibility,
1791        progress: Option<mpsc::Sender<UploadEvent>>,
1792    ) -> Result<FileUploadResult> {
1793        debug!(
1794            "Streaming file upload with mode {mode:?}, visibility {visibility:?}: {}",
1795            path.display()
1796        );
1797
1798        // Pre-flight: verify enough temp disk space for the chunk spill.
1799        let file_size = std::fs::metadata(path)?.len();
1800        check_disk_space_for_spill(file_size)?;
1801
1802        // Phase 1: Encrypt file and spill chunks to temp directory.
1803        // Only 32-byte addresses stay in memory — chunk data lives on disk.
1804        let (mut spill, data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
1805
1806        let data_map_address = match visibility {
1807            Visibility::Private => None,
1808            Visibility::Public => {
1809                let serialized = rmp_serde::to_vec(&data_map).map_err(|e| {
1810                    Error::Serialization(format!("Failed to serialize DataMap: {e}"))
1811                })?;
1812                let address = compute_address(&serialized);
1813                info!(
1814                    "Public upload: adding DataMap chunk ({} bytes) at address {} to payment batch",
1815                    serialized.len(),
1816                    hex::encode(address)
1817                );
1818                spill.push(&serialized)?;
1819                Some(address)
1820            }
1821        };
1822
1823        let chunk_count = spill.len();
1824        info!(
1825            "Encrypted {} into {chunk_count} chunks (spilled to disk)",
1826            path.display()
1827        );
1828        if let Some(ref tx) = progress {
1829            let _ = tx
1830                .send(UploadEvent::Encrypted {
1831                    total_chunks: chunk_count,
1832                })
1833                .await;
1834        }
1835
1836        // Phase 2: Decide payment mode and upload in waves from disk.
1837        //
1838        // For the merkle path, attempt to resume from a cached
1839        // receipt before paying again. The cache is keyed by the
1840        // CANONICAL source path so `./foo`, `/abs/foo`, and any
1841        // symlink alias all resolve to the same cache entry — a
1842        // crash-and-retry from a different cwd or via a different
1843        // alias still hits the receipt. Canonicalize may fail (the
1844        // file could have been moved between phase 1 and here); we
1845        // fall back to the display string in that case, which
1846        // preserves pre-fix behaviour rather than dropping cache
1847        // resume entirely.
1848        let file_path_key = std::fs::canonicalize(path)
1849            .map(|p| p.display().to_string())
1850            .unwrap_or_else(|_| path.display().to_string());
1851        let (chunks_stored, actual_mode, storage_cost_atto, gas_cost_wei, stats) = if self
1852            .should_use_merkle(chunk_count, mode)
1853        {
1854            info!("Using merkle batch payment for {chunk_count} file chunks");
1855
1856            let cached_merkle =
1857                crate::data::client::cached_merkle::try_load_for_file(&file_path_key)
1858                    .map(|(_cache_path, cached)| cached);
1859
1860            let merkle_plan = match self
1861                .plan_merkle_upload(spill.chunk_entries()?, DATA_TYPE_CHUNK, progress.as_ref())
1862                .await
1863            {
1864                Ok(plan) => plan,
1865                Err(e) => {
1866                    if let Some(cached) = cached_merkle
1867                        .as_ref()
1868                        .filter(|cached| cached_merkle_covers_addresses(cached, &spill.addresses))
1869                    {
1870                        info!(
1871                            "Merkle preflight failed ({e}); \
1872                             resuming with cached merkle proofs"
1873                        );
1874                        let (stored, sc, gc, stats) = self
1875                            .upload_waves_merkle(
1876                                &spill,
1877                                &spill.addresses,
1878                                cached,
1879                                &[],
1880                                progress.as_ref(),
1881                            )
1882                            .await?;
1883                        crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
1884                        return Ok(FileUploadResult {
1885                            data_map,
1886                            chunks_stored: stored,
1887                            chunks_failed: 0,
1888                            total_chunks: chunk_count,
1889                            payment_mode_used: PaymentMode::Merkle,
1890                            storage_cost_atto: sc,
1891                            gas_cost_wei: gc,
1892                            data_map_address,
1893                            chunk_attempts_total: stats.chunk_attempts_total,
1894                            store_durations_ms: stats.store_durations_ms,
1895                            retries_histogram: stats.retries_histogram,
1896                        });
1897                    }
1898                    match &e {
1899                        Error::InsufficientPeers(msg) if mode == PaymentMode::Auto => {
1900                            info!(
1901                                "Merkle preflight needs more peers ({msg}), \
1902                                 falling back to wave-batch"
1903                            );
1904                            let (stored, sc, gc, fb_stats) = self
1905                                .upload_waves_single(
1906                                    &spill,
1907                                    progress.as_ref(),
1908                                    Some(&file_path_key),
1909                                )
1910                                .await?;
1911                            crate::data::client::cached_single::try_delete_for_file(&file_path_key);
1912                            return Ok(FileUploadResult {
1913                                data_map,
1914                                chunks_stored: stored,
1915                                chunks_failed: 0,
1916                                total_chunks: chunk_count,
1917                                payment_mode_used: PaymentMode::Single,
1918                                storage_cost_atto: sc,
1919                                gas_cost_wei: gc,
1920                                data_map_address,
1921                                chunk_attempts_total: fb_stats.chunk_attempts_total,
1922                                store_durations_ms: fb_stats.store_durations_ms,
1923                                retries_histogram: fb_stats.retries_histogram,
1924                            });
1925                        }
1926                        _ => return Err(e),
1927                    }
1928                }
1929            };
1930
1931            if merkle_plan.to_upload.is_empty() {
1932                info!("All {chunk_count} merkle chunks already stored; skipping payment");
1933                crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
1934                crate::data::client::cached_single::try_delete_for_file(&file_path_key);
1935                (
1936                    chunk_count,
1937                    PaymentMode::Merkle,
1938                    "0".to_string(),
1939                    0,
1940                    WaveAggregateStats::default(),
1941                )
1942            } else if !self.should_use_merkle(merkle_plan.to_upload.len(), mode) {
1943                let remaining_chunks = merkle_plan.to_upload.len();
1944                if let Some(cached) = cached_merkle
1945                    .as_ref()
1946                    .filter(|cached| cached_merkle_covers_addresses(cached, &merkle_plan.to_upload))
1947                {
1948                    info!(
1949                        "{remaining_chunks} chunks remain below merkle threshold; \
1950                         reusing cached merkle proofs"
1951                    );
1952                    let (stored, sc, gc, stats) = self
1953                        .upload_waves_merkle(
1954                            &spill,
1955                            &merkle_plan.to_upload,
1956                            cached,
1957                            &merkle_plan.already_stored,
1958                            progress.as_ref(),
1959                        )
1960                        .await?;
1961                    crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
1962                    (stored, PaymentMode::Merkle, sc, gc, stats)
1963                } else {
1964                    if cached_merkle.is_some() {
1965                        info!(
1966                            "{remaining_chunks} chunks remain below merkle threshold, \
1967                             and the cached merkle receipt does not cover them. \
1968                             Discarding cache and using single-node payment."
1969                        );
1970                        crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
1971                    } else {
1972                        info!(
1973                            "{remaining_chunks} chunks need upload after merkle preflight; \
1974                             using single-node payment"
1975                        );
1976                    }
1977                    let (stored, sc, gc, stats) = self
1978                        .upload_spill_addresses_single(
1979                            &spill,
1980                            &merkle_plan.to_upload,
1981                            progress.as_ref(),
1982                            &merkle_plan.already_stored,
1983                            chunk_count,
1984                            Some(&file_path_key),
1985                        )
1986                        .await?;
1987                    crate::data::client::cached_single::try_delete_for_file(&file_path_key);
1988                    (stored, PaymentMode::Single, sc, gc, stats)
1989                }
1990            } else {
1991                let batch_result = if let Some(cached) = cached_merkle.as_ref() {
1992                    // Validate the cache against the chunks that still need
1993                    // storage. Extra proofs are harmless: a previous attempt
1994                    // may have paid for chunks that are now already stored.
1995                    if cached_merkle_covers_addresses(cached, &merkle_plan.to_upload) {
1996                        info!(
1997                            "Skipping merkle payment phase; resuming with \
1998                             cached proofs for {} remaining chunks",
1999                            merkle_plan.to_upload.len()
2000                        );
2001                        Ok(cached.clone())
2002                    } else {
2003                        info!(
2004                            "Cached merkle receipt does not cover the current \
2005                             remaining chunks (cached={}, remaining={}). \
2006                             Discarding cache and paying fresh.",
2007                            cached.proofs.len(),
2008                            merkle_plan.to_upload.len()
2009                        );
2010                        crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
2011                        self.pay_for_merkle_batch(
2012                            &merkle_plan.to_upload,
2013                            DATA_TYPE_CHUNK,
2014                            merkle_plan.to_upload_avg_size(),
2015                        )
2016                        .await
2017                        .inspect(|result| {
2018                            crate::data::client::cached_merkle::try_save(&file_path_key, result);
2019                        })
2020                    }
2021                } else {
2022                    self.pay_for_merkle_batch(
2023                        &merkle_plan.to_upload,
2024                        DATA_TYPE_CHUNK,
2025                        merkle_plan.to_upload_avg_size(),
2026                    )
2027                    .await
2028                    .inspect(|result| {
2029                        // Save BEFORE the store phase so a crash
2030                        // mid-upload leaves a resumable receipt.
2031                        crate::data::client::cached_merkle::try_save(&file_path_key, result);
2032                    })
2033                };
2034
2035                let batch_result = match batch_result {
2036                    Ok(result) => result,
2037                    Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => {
2038                        info!("Merkle needs more peers ({msg}), falling back to wave-batch");
2039                        let (stored, sc, gc, fb_stats) = self
2040                            .upload_spill_addresses_single(
2041                                &spill,
2042                                &merkle_plan.to_upload,
2043                                progress.as_ref(),
2044                                &merkle_plan.already_stored,
2045                                chunk_count,
2046                                Some(&file_path_key),
2047                            )
2048                            .await?;
2049                        crate::data::client::cached_single::try_delete_for_file(&file_path_key);
2050                        return Ok(FileUploadResult {
2051                            data_map,
2052                            chunks_stored: stored,
2053                            chunks_failed: 0,
2054                            total_chunks: chunk_count,
2055                            payment_mode_used: PaymentMode::Single,
2056                            storage_cost_atto: sc,
2057                            gas_cost_wei: gc,
2058                            data_map_address,
2059                            chunk_attempts_total: fb_stats.chunk_attempts_total,
2060                            store_durations_ms: fb_stats.store_durations_ms,
2061                            retries_histogram: fb_stats.retries_histogram,
2062                        });
2063                    }
2064                    Err(e) => return Err(e),
2065                };
2066
2067                let (stored, sc, gc, stats) = self
2068                    .upload_waves_merkle(
2069                        &spill,
2070                        &merkle_plan.to_upload,
2071                        &batch_result,
2072                        &merkle_plan.already_stored,
2073                        progress.as_ref(),
2074                    )
2075                    .await?;
2076                // Upload succeeded end-to-end; the cached receipt is
2077                // no longer needed.
2078                crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
2079                (stored, PaymentMode::Merkle, sc, gc, stats)
2080            }
2081        } else {
2082            let (stored, sc, gc, stats) = self
2083                .upload_waves_single(&spill, progress.as_ref(), Some(&file_path_key))
2084                .await?;
2085            // Full file success: drop any cached single-node receipt.
2086            crate::data::client::cached_single::try_delete_for_file(&file_path_key);
2087            (stored, PaymentMode::Single, sc, gc, stats)
2088        };
2089
2090        info!(
2091            "File uploaded with {actual_mode:?}: {chunks_stored} chunks stored ({})",
2092            path.display()
2093        );
2094
2095        Ok(FileUploadResult {
2096            data_map,
2097            chunks_stored,
2098            chunks_failed: 0,
2099            total_chunks: chunk_count,
2100            payment_mode_used: actual_mode,
2101            storage_cost_atto,
2102            gas_cost_wei,
2103            data_map_address,
2104            chunk_attempts_total: stats.chunk_attempts_total,
2105            store_durations_ms: stats.store_durations_ms,
2106            retries_histogram: stats.retries_histogram,
2107        })
2108    }
2109
2110    /// Encrypt a file and spill chunks to a temp directory.
2111    ///
2112    /// Logs progress every 100 chunks so users get feedback during
2113    /// multi-GB encryptions.
2114    ///
2115    /// Returns the spill buffer (addresses on disk) and the `DataMap`.
2116    async fn encrypt_file_to_spill(
2117        &self,
2118        path: &Path,
2119        progress: Option<&mpsc::Sender<UploadEvent>>,
2120    ) -> Result<(ChunkSpill, DataMap)> {
2121        let (mut chunk_rx, datamap_rx, handle) = spawn_file_encryption(path.to_path_buf())?;
2122
2123        let mut spill = ChunkSpill::new()?;
2124        while let Some(content) = chunk_rx.recv().await {
2125            spill.push(&content)?;
2126            let chunks_done = spill.len();
2127            if let Some(tx) = progress {
2128                if chunks_done.is_multiple_of(10) {
2129                    let _ = tx.send(UploadEvent::Encrypting { chunks_done }).await;
2130                }
2131            }
2132            if chunks_done % 100 == 0 {
2133                let mb = spill.total_bytes() / (1024 * 1024);
2134                info!(
2135                    "Encryption progress: {chunks_done} chunks spilled ({mb} MB) — {}",
2136                    path.display()
2137                );
2138            }
2139        }
2140
2141        // Await encryption completion to catch errors before paying.
2142        handle
2143            .await
2144            .map_err(|e| Error::Encryption(format!("encryption task panicked: {e}")))?
2145            .map_err(|e| Error::Encryption(format!("encryption failed: {e}")))?;
2146
2147        let data_map = datamap_rx
2148            .await
2149            .map_err(|_| Error::Encryption("no DataMap from encryption thread".to_string()))?;
2150
2151        Ok((spill, data_map))
2152    }
2153
2154    /// Upload chunks from a spill using wave-based per-chunk (single) payments.
2155    ///
2156    /// Reads one wave at a time from disk, prepares quotes, pays, and stores.
2157    /// Peak memory: ~`UPLOAD_WAVE_SIZE × MAX_CHUNK_SIZE` (~256 MB).
2158    ///
2159    /// Returns `(chunks_stored, storage_cost_atto, gas_cost_wei)`.
2160    async fn upload_waves_single(
2161        &self,
2162        spill: &ChunkSpill,
2163        progress: Option<&mpsc::Sender<UploadEvent>>,
2164        resume_key: Option<&str>,
2165    ) -> Result<(usize, String, u128, WaveAggregateStats)> {
2166        self.upload_spill_addresses_single(
2167            spill,
2168            &spill.addresses,
2169            progress,
2170            &[],
2171            spill.len(),
2172            resume_key,
2173        )
2174        .await
2175    }
2176
2177    async fn upload_spill_addresses_single(
2178        &self,
2179        spill: &ChunkSpill,
2180        addresses: &[[u8; 32]],
2181        progress: Option<&mpsc::Sender<UploadEvent>>,
2182        already_stored_addresses: &[[u8; 32]],
2183        total_chunks: usize,
2184        resume_key: Option<&str>,
2185    ) -> Result<(usize, String, u128, WaveAggregateStats)> {
2186        let mut total_stored = already_stored_addresses.len();
2187        let mut total_storage = Amount::ZERO;
2188        let mut total_gas: u128 = 0;
2189        let mut agg_stats = WaveAggregateStats::default();
2190        // A wave whose chunks fall short of quorum after retries must not abort
2191        // the file: its failures are accumulated here and surfaced as a single
2192        // `PartialUpload` only after every wave has been attempted, mirroring
2193        // `upload_waves_merkle`. Aborting on the first failed wave (the old `?`)
2194        // discarded all later waves' progress — already self-encrypted, spilled,
2195        // and in some cases already paid for — converting high per-chunk success
2196        // into 0% per-file success.
2197        // Seed with the addresses a preflight already confirmed stored (e.g.
2198        // the merkle-fallback path passes `merkle_plan.already_stored`), so a
2199        // returned `PartialUpload.stored` lists every stored chunk and
2200        // `stored_count == stored.len()` holds for programmatic callers.
2201        let mut stored_addresses: Vec<[u8; 32]> = already_stored_addresses.to_vec();
2202        let mut failed: Vec<([u8; 32], String)> = Vec::new();
2203        let waves: Vec<&[[u8; 32]]> = addresses.chunks(UPLOAD_WAVE_SIZE).collect();
2204        let wave_count = waves.len();
2205
2206        // Unconditional breadcrumb: lets a clean run confirm the continue-on-
2207        // partial single-node path is in effect (the old path aborted the file
2208        // on the first failed wave instead of continuing across all waves).
2209        info!(
2210            "single-node upload: {} chunk(s) in {wave_count} wave(s) (continue-on-partial)",
2211            addresses.len()
2212        );
2213
2214        for (wave_idx, wave_addrs) in waves.into_iter().enumerate() {
2215            let wave_num = wave_idx + 1;
2216            let wave_data: Vec<Bytes> = wave_addrs
2217                .iter()
2218                .map(|addr| spill.read_chunk(addr))
2219                .collect::<Result<Vec<_>>>()?;
2220
2221            info!(
2222                "Wave {wave_num}/{wave_count}: quoting {} chunks — {total_stored}/{total_chunks} stored so far",
2223                wave_data.len()
2224            );
2225            if let Some(tx) = progress {
2226                let _ = tx
2227                    .send(UploadEvent::QuotingChunks {
2228                        wave: wave_num,
2229                        total_waves: wave_count,
2230                        chunks_in_wave: wave_data.len(),
2231                    })
2232                    .await;
2233            }
2234            // Fold this wave's result. A quorum shortfall (`PartialUpload`) is
2235            // recoverable and its parts are returned to be recorded here;
2236            // genuinely fatal errors propagate via `?` and abort the file, as in
2237            // `upload_waves_merkle`.
2238            let outcome = fold_single_wave(
2239                self.batch_upload_chunks_with_events(
2240                    wave_data,
2241                    progress,
2242                    total_stored,
2243                    total_chunks,
2244                    resume_key,
2245                )
2246                .await,
2247            )?;
2248
2249            if !outcome.failed.is_empty() {
2250                warn!(
2251                    "Wave {wave_num}/{wave_count}: {} chunk(s) failed to store after retries; \
2252                     continuing with remaining waves",
2253                    outcome.failed.len()
2254                );
2255            }
2256
2257            total_stored += outcome.stored.len();
2258            stored_addresses.extend(outcome.stored);
2259            failed.extend(outcome.failed);
2260            total_storage += outcome.storage_atto;
2261            total_gas = total_gas.saturating_add(outcome.gas_wei);
2262            // Merge per-wave stats (a quorum-short wave contributes none, since
2263            // `PartialUpload` carries no stats).
2264            agg_stats.chunk_attempts_total = agg_stats
2265                .chunk_attempts_total
2266                .saturating_add(outcome.stats.chunk_attempts_total);
2267            agg_stats
2268                .store_durations_ms
2269                .extend(outcome.stats.store_durations_ms);
2270            for (slot, count) in agg_stats
2271                .retries_histogram
2272                .iter_mut()
2273                .zip(outcome.stats.retries_histogram.iter())
2274            {
2275                *slot = slot.saturating_add(*count);
2276            }
2277
2278            if let Some(tx) = progress {
2279                let _ = tx
2280                    .send(UploadEvent::WaveComplete {
2281                        wave: wave_num,
2282                        total_waves: wave_count,
2283                        stored_so_far: total_stored,
2284                        total: total_chunks,
2285                    })
2286                    .await;
2287            }
2288        }
2289
2290        // Any chunk still failed after every wave was attempted means the file
2291        // is not fully stored — surface it as `PartialUpload` (never silently
2292        // succeed with missing chunks), carrying the real on-chain spend.
2293        if !failed.is_empty() {
2294            let failed_count = failed.len();
2295            warn!(
2296                "single-node upload incomplete: {failed_count}/{total_chunks} chunks failed after retries"
2297            );
2298            return Err(Error::PartialUpload {
2299                stored: stored_addresses,
2300                stored_count: total_stored,
2301                failed,
2302                failed_count,
2303                total_chunks,
2304                spend: Box::new(PartialUploadSpend {
2305                    storage_cost_atto: total_storage.to_string(),
2306                    gas_cost_wei: total_gas,
2307                }),
2308                reason: format!("{failed_count} chunk(s) failed to store after retries"),
2309            });
2310        }
2311
2312        Ok((
2313            total_stored,
2314            total_storage.to_string(),
2315            total_gas,
2316            agg_stats,
2317        ))
2318    }
2319
2320    /// Upload chunks from a spill using pre-computed merkle proofs.
2321    ///
2322    /// Reads one wave at a time from disk, pairs each chunk with its proof,
2323    /// and uploads concurrently. Peak memory: ~`UPLOAD_WAVE_SIZE × MAX_CHUNK_SIZE`.
2324    ///
2325    /// A chunk that is transiently short of quorum (`InsufficientPeers`) does
2326    /// **not** abort the file, nor does it block the pipeline: each wave is
2327    /// stored in a **single pass** (no in-wave backoff barrier), and chunks
2328    /// short of quorum are collected into a file-level deferred set rather than
2329    /// retried in place. After the last wave, [`merkle_deferred_retry`] retries
2330    /// the whole deferred set in concurrent rounds ([`DEFERRED_ROUND_DELAYS_SECS`]
2331    /// delays), re-reading each chunk's body from the spill and reusing its
2332    /// proof. This keeps every wave running at full fan-out instead of parking
2333    /// idle slots behind one slow chunk's backoff, while peak memory stays
2334    /// bounded (bodies are re-read from disk, never pinned). Non-quorum errors
2335    /// (e.g. a missing proof) stay fatal and abort immediately.
2336    ///
2337    /// Returns `(chunks_stored, storage_cost_atto, gas_cost_wei)` on success.
2338    /// Costs come from the `batch_result` which was populated during payment.
2339    ///
2340    /// # Errors
2341    ///
2342    /// Returns [`Error::PartialUpload`] if any chunk is still short of quorum
2343    /// after all retries across every wave (other chunks remain stored), or the
2344    /// underlying error for a non-quorum failure.
2345    async fn upload_waves_merkle(
2346        &self,
2347        spill: &ChunkSpill,
2348        addresses: &[[u8; 32]],
2349        batch_result: &MerkleBatchPaymentResult,
2350        already_stored_addresses: &[[u8; 32]],
2351        progress: Option<&mpsc::Sender<UploadEvent>>,
2352    ) -> Result<(usize, String, u128, WaveAggregateStats)> {
2353        let mut total_stored = already_stored_addresses.len();
2354        let total_chunks = total_stored + addresses.len();
2355        let mut stored_addresses: Vec<[u8; 32]> = already_stored_addresses.to_vec();
2356        let mut failed: Vec<([u8; 32], String)> = Vec::new();
2357        // Chunks short of quorum on their single wave pass are collected here and
2358        // retried after the last wave (see `merkle_deferred_retry`), so a slow
2359        // chunk never holds its wave's other slots idle behind a backoff.
2360        let mut deferred: Vec<([u8; 32], String)> = Vec::new();
2361        let mut agg_stats = WaveAggregateStats::default();
2362
2363        // Chunks without a merkle proof were never paid for: a partial
2364        // `pay_for_merkle_multi_batch` result carries proofs only for the
2365        // sub-batches whose on-chain payment succeeded. Such a chunk cannot be
2366        // stored, so record it as failed (surfaced via `PartialUpload` once the
2367        // storable chunks have been attempted) rather than letting its
2368        // "missing proof" error abort the whole file and discard every other
2369        // chunk's progress.
2370        let (to_store, missing_proof) =
2371            partition_addresses_by_proof(addresses, &batch_result.proofs);
2372        if !missing_proof.is_empty() {
2373            warn!(
2374                "{} chunk(s) lack a merkle proof (partial payment); reporting them as failed",
2375                missing_proof.len()
2376            );
2377            for addr in &missing_proof {
2378                failed.push((
2379                    *addr,
2380                    format!("Missing merkle proof for chunk {}", hex::encode(addr)),
2381                ));
2382            }
2383        }
2384
2385        let waves: Vec<&[[u8; 32]]> = to_store.chunks(UPLOAD_WAVE_SIZE).collect();
2386        let wave_count = waves.len();
2387
2388        let store_limiter = self.controller().store.clone();
2389
2390        // Store one chunk to its (freshly re-collected) close group, reusing the
2391        // chunk's merkle proof. Shared across every retry round so a converged
2392        // routing table yields a fresh group. Only `InsufficientPeers` is
2393        // recoverable; a missing proof stays fatal. Mirrors the external-signer
2394        // path's closure in `merkle_upload_chunks`.
2395        let store_one = |addr: [u8; 32], content: Bytes| {
2396            let limiter = store_limiter.clone();
2397            let proof_bytes = batch_result.proofs.get(&addr).cloned();
2398            async move {
2399                let started = std::time::Instant::now();
2400                let proof = proof_bytes.ok_or_else(|| {
2401                    Error::Payment(format!(
2402                        "Missing merkle proof for chunk {}",
2403                        hex::encode(addr)
2404                    ))
2405                })?;
2406                let peers = self.close_group_peers(&addr).await?;
2407                observe_op(
2408                    &limiter,
2409                    || async move { self.chunk_put_to_close_group(content, proof, &peers).await },
2410                    classify_error,
2411                )
2412                .await
2413                .map(|_| started)
2414            }
2415        };
2416
2417        for (wave_idx, wave_addrs) in waves.into_iter().enumerate() {
2418            let wave_num = wave_idx + 1;
2419            let wave = spill.read_wave(wave_addrs)?;
2420
2421            info!(
2422                "Wave {wave_num}/{wave_count}: storing {} chunks (merkle) — {total_stored}/{total_chunks} stored so far",
2423                wave.len()
2424            );
2425
2426            // Clamp fan-out to wave size — partial last wave should
2427            // not pay for extra slots (see PERF-RESULTS.md).
2428            let store_concurrency = store_limiter.current().min(wave.len().max(1));
2429            let chunks: Vec<([u8; 32], Bytes)> = wave
2430                .into_iter()
2431                .map(|(content, addr)| (addr, content))
2432                .collect();
2433
2434            // Store the wave in a SINGLE pass (`max_attempts = 1`, no backoff):
2435            // quorum-short chunks are collected and deferred to a post-wave
2436            // concurrent retry rather than parking this wave's other slots
2437            // behind a backoff. `stored_offset` is the running cumulative count
2438            // so the progress events the driver emits stay correctly numbered
2439            // across waves.
2440            let outcome = merkle_store_with_retry(
2441                chunks,
2442                store_concurrency,
2443                1,
2444                std::time::Duration::ZERO,
2445                progress,
2446                total_stored,
2447                total_chunks,
2448                &store_one,
2449            )
2450            .await?;
2451
2452            // Record this wave's confirmed stores from the explicit set the
2453            // store helper reports. Using that set (rather than inferring
2454            // "wave chunks minus failed") keeps `stored_addresses` correct even
2455            // when a fatal abort leaves some of the wave neither stored nor
2456            // reported short of quorum.
2457            stored_addresses.extend(&outcome.stored_addresses);
2458            total_stored = outcome.stored;
2459
2460            // Merge per-wave stats (durations, attempts, per-round histogram).
2461            agg_stats.chunk_attempts_total = agg_stats
2462                .chunk_attempts_total
2463                .saturating_add(outcome.stats.chunk_attempts_total);
2464            agg_stats
2465                .store_durations_ms
2466                .extend(outcome.stats.store_durations_ms);
2467            for (slot, count) in agg_stats
2468                .retries_histogram
2469                .iter_mut()
2470                .zip(outcome.stats.retries_histogram.iter())
2471            {
2472                *slot = slot.saturating_add(*count);
2473            }
2474
2475            if let Some(e) = outcome.fatal {
2476                // A non-quorum store error is fatal (missing proofs were
2477                // filtered out above, so this is a genuine network/store
2478                // failure). Preserve every chunk stored so far — including this
2479                // wave's same-pass successes — and report every not-stored chunk
2480                // as failed, so the `PartialUpload` counts are accurate rather
2481                // than omitting same-wave stores and under-counting failures.
2482                warn!("merkle wave {wave_num}/{wave_count} aborted: {e}");
2483                // Best per-chunk messages we already have: missing-proof, this
2484                // wave's shortfalls, and earlier waves' deferred shortfalls.
2485                let mut known_failed = failed;
2486                known_failed.extend(outcome.failed_addresses);
2487                known_failed.extend(std::mem::take(&mut deferred));
2488                return Err(partial_upload_after_fatal(
2489                    addresses,
2490                    stored_addresses,
2491                    total_stored,
2492                    total_chunks,
2493                    known_failed,
2494                    PartialUploadSpend {
2495                        storage_cost_atto: batch_result.storage_cost_atto.clone(),
2496                        gas_cost_wei: batch_result.gas_cost_wei,
2497                    },
2498                    format!("merkle chunk store aborted: {e}"),
2499                ));
2500            }
2501
2502            // Non-fatal: this wave's quorum-short chunks are deferred (not failed
2503            // yet) for the post-wave concurrent retry. A deferred chunk joins
2504            // `stored_addresses` only if/when a later round stores it.
2505            deferred.extend(outcome.failed_addresses);
2506
2507            if let Some(tx) = progress {
2508                let _ = tx
2509                    .send(UploadEvent::WaveComplete {
2510                        wave: wave_num,
2511                        total_waves: wave_count,
2512                        stored_so_far: total_stored,
2513                        total: total_chunks,
2514                    })
2515                    .await;
2516            }
2517        }
2518
2519        // The wave passes never blocked on backoff; now retry the whole
2520        // file-level deferred set in concurrent rounds. Bodies are re-read from
2521        // the spill at retry time (peak RAM unchanged) and proofs are re-attached
2522        // by `store_one`. Chunks still short after the final round become
2523        // `failed`; a non-quorum error aborts as `PartialUpload`.
2524        if !deferred.is_empty() {
2525            info!(
2526                "Deferring {} merkle chunk(s) short of quorum for concurrent retry after final wave",
2527                deferred.len()
2528            );
2529            let dr = merkle_deferred_retry(
2530                deferred,
2531                &DEFERRED_ROUND_DELAYS_SECS,
2532                // Read and store at most one wave's worth of bodies at a time so
2533                // the deferred path keeps the wave path's ~256 MB peak-memory
2534                // bound regardless of how many chunks were deferred file-wide.
2535                UPLOAD_WAVE_SIZE,
2536                |addrs: &[[u8; 32]]| {
2537                    spill.read_wave(addrs).map(|wave| {
2538                        wave.into_iter()
2539                            .map(|(content, addr)| (addr, content))
2540                            .collect()
2541                    })
2542                },
2543                |n: usize| store_limiter.current().min(n.max(1)),
2544                progress,
2545                total_stored,
2546                total_chunks,
2547                &store_one,
2548            )
2549            .await?;
2550
2551            stored_addresses.extend(dr.stored_addresses);
2552            total_stored = dr.stored;
2553
2554            // Merge the deferred pass's stats — its histogram is already mapped
2555            // to the right per-round slots — into the file aggregate.
2556            agg_stats.chunk_attempts_total = agg_stats
2557                .chunk_attempts_total
2558                .saturating_add(dr.stats.chunk_attempts_total);
2559            agg_stats
2560                .store_durations_ms
2561                .extend(dr.stats.store_durations_ms);
2562            for (slot, count) in agg_stats
2563                .retries_histogram
2564                .iter_mut()
2565                .zip(dr.stats.retries_histogram.iter())
2566            {
2567                *slot = slot.saturating_add(*count);
2568            }
2569
2570            if let Some(reason) = dr.fatal {
2571                // A non-quorum store error during a deferred round is fatal, the
2572                // same as in the wave path: preserve everything stored so far and
2573                // report every not-stored chunk as failed.
2574                warn!("merkle deferred retry aborted: {reason}");
2575                let mut known_failed = failed;
2576                known_failed.extend(dr.failed_addresses);
2577                return Err(partial_upload_after_fatal(
2578                    addresses,
2579                    stored_addresses,
2580                    total_stored,
2581                    total_chunks,
2582                    known_failed,
2583                    PartialUploadSpend {
2584                        storage_cost_atto: batch_result.storage_cost_atto.clone(),
2585                        gas_cost_wei: batch_result.gas_cost_wei,
2586                    },
2587                    format!("merkle chunk store aborted: {reason}"),
2588                ));
2589            }
2590            failed.extend(dr.failed_addresses);
2591        }
2592
2593        // A file with any permanently-failed chunk is not fully stored — surface
2594        // it as `PartialUpload`, but only after the single wave pass and every
2595        // deferred retry round are exhausted (never silently succeed with
2596        // missing chunks).
2597        if !failed.is_empty() {
2598            let failed_count = failed.len();
2599            let total_attempts = 1 + DEFERRED_ROUND_DELAYS_SECS.len();
2600            warn!(
2601                "merkle upload incomplete: {failed_count}/{total_chunks} chunks short of quorum after retries"
2602            );
2603            return Err(Error::PartialUpload {
2604                stored: stored_addresses,
2605                stored_count: total_stored,
2606                failed,
2607                failed_count,
2608                total_chunks,
2609                spend: Box::new(PartialUploadSpend {
2610                    storage_cost_atto: batch_result.storage_cost_atto.clone(),
2611                    gas_cost_wei: batch_result.gas_cost_wei,
2612                }),
2613                reason: format!(
2614                    "{failed_count} chunk(s) short of quorum after {total_attempts} attempts"
2615                ),
2616            });
2617        }
2618
2619        Ok((
2620            total_stored,
2621            batch_result.storage_cost_atto.clone(),
2622            batch_result.gas_cost_wei,
2623            agg_stats,
2624        ))
2625    }
2626
2627    /// Download and decrypt a file from the network, writing it to disk.
2628    ///
2629    /// Uses `streaming_decrypt` so that only one batch of chunks lives in
2630    /// memory at a time, avoiding OOM on large files. Chunks are fetched
2631    /// concurrently within each batch, then decrypted data is written to
2632    /// disk incrementally.
2633    ///
2634    /// Returns the number of bytes written.
2635    ///
2636    /// # Panics
2637    ///
2638    /// Requires a multi-threaded Tokio runtime (`flavor = "multi_thread"`).
2639    /// Will panic if called from a `current_thread` runtime because
2640    /// `streaming_decrypt` takes a synchronous callback that must bridge
2641    /// back to async via `block_in_place`.
2642    ///
2643    /// # Errors
2644    ///
2645    /// Returns an error if any chunk cannot be retrieved, decryption fails,
2646    /// or the file cannot be written.
2647    pub async fn file_download(&self, data_map: &DataMap, output: &Path) -> Result<u64> {
2648        self.file_download_with_progress(data_map, output, None)
2649            .await
2650    }
2651
2652    /// Download and decrypt a file, trying the requested number of
2653    /// closest peers for every chunk fetch.
2654    ///
2655    /// Returns the number of bytes written.
2656    ///
2657    /// # Errors
2658    ///
2659    /// Returns an error if any chunk cannot be retrieved, decryption fails,
2660    /// or the file cannot be written.
2661    pub async fn file_download_from_closest_peers(
2662        &self,
2663        data_map: &DataMap,
2664        output: &Path,
2665        peer_count: NonZeroUsize,
2666    ) -> Result<u64> {
2667        self.file_download_with_progress_using_peer_count(data_map, output, None, peer_count.get())
2668            .await
2669    }
2670
2671    /// Download and decrypt a file with progress events, trying the
2672    /// requested number of closest peers for every chunk fetch.
2673    ///
2674    /// Same as [`Client::file_download_from_closest_peers`] but sends
2675    /// [`DownloadEvent`]s for UI feedback.
2676    ///
2677    /// # Errors
2678    ///
2679    /// Returns an error if any chunk cannot be retrieved, decryption fails,
2680    /// or the file cannot be written.
2681    pub async fn file_download_with_progress_from_closest_peers(
2682        &self,
2683        data_map: &DataMap,
2684        output: &Path,
2685        progress: Option<mpsc::Sender<DownloadEvent>>,
2686        peer_count: NonZeroUsize,
2687    ) -> Result<u64> {
2688        self.file_download_with_progress_using_peer_count(
2689            data_map,
2690            output,
2691            progress,
2692            peer_count.get(),
2693        )
2694        .await
2695    }
2696
2697    /// Shared download core: resolve the DataMap, then fetch + streaming-decrypt
2698    /// the file one batch at a time, handing each decrypted plaintext segment
2699    /// (in order) to `on_chunk`. Constant memory — only one decrypt batch is
2700    /// resident at a time. Returns the total plaintext bytes produced.
2701    ///
2702    /// `on_chunk` is async so a sink can apply backpressure (e.g. a bounded
2703    /// channel). Driving the decrypt iterator runs the batched chunk fetch via
2704    /// `block_in_place`, so this requires a multi-threaded Tokio runtime.
2705    ///
2706    /// Every chunk fetch tries `peer_count` closest peers.
2707    ///
2708    /// Progress reporting (via `progress`):
2709    /// 1. Resolves hierarchical DataMaps to the root level first (reports as
2710    ///    `ChunksFetched` with `total: 0` during resolution)
2711    /// 2. Once the root DataMap is known, sends `total_chunks` with accurate count
2712    /// 3. Fetches data chunks with accurate `fetched/total` progress
2713    async fn download_decrypted_chunks<F, Fut>(
2714        &self,
2715        data_map: &DataMap,
2716        progress: Option<mpsc::Sender<DownloadEvent>>,
2717        peer_count: usize,
2718        mut on_chunk: F,
2719    ) -> Result<u64>
2720    where
2721        F: FnMut(Bytes) -> Fut,
2722        Fut: std::future::Future<Output = Result<()>>,
2723    {
2724        let handle = Handle::current();
2725
2726        // Phase 1: Resolve hierarchical DataMap to root level.
2727        // This fetches child DataMap chunks (typically 3) to discover the real chunk count.
2728        let root_map = if data_map.is_child() {
2729            let dm_chunks = data_map.len();
2730            if let Some(ref tx) = progress {
2731                let _ = tx.try_send(DownloadEvent::ResolvingDataMap {
2732                    total_map_chunks: dm_chunks,
2733                });
2734            }
2735
2736            let resolve_progress = progress.clone();
2737            let resolve_counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
2738
2739            let resolved = tokio::task::block_in_place(|| {
2740                let counter_ref = resolve_counter.clone();
2741                let progress_ref = resolve_progress.clone();
2742                let fetch_limiter = self.controller().fetch.clone();
2743                let fetch = |batch: &[(usize, XorName)]| {
2744                    let batch_owned: Vec<(usize, XorName)> = batch.to_vec();
2745                    let counter = counter_ref.clone();
2746                    let prog = progress_ref.clone();
2747                    let limiter = fetch_limiter.clone();
2748                    handle.block_on(async {
2749                        // Use rebucketed_unordered so the in-flight cap
2750                        // is re-read from the limiter as each slot frees.
2751                        // `buffer_unordered` snapshots the cap once at
2752                        // pipeline build, which means observe_op
2753                        // signals from inside chunk_get cannot reduce
2754                        // concurrency on the current batch — exactly
2755                        // the case where load-shedding is needed.
2756                        let mut results = rebucketed_unordered(
2757                            &limiter,
2758                            batch_owned,
2759                            |(idx, hash): (usize, XorName)| {
2760                                let counter = counter.clone();
2761                                let prog = prog.clone();
2762                                async move {
2763                                    let addr = hash.0;
2764                                    // chunk_get_observed feeds the
2765                                    // adaptive fetch limiter once per
2766                                    // call via chunk_get_outcome
2767                                    // (Ok(None) -> Timeout is the
2768                                    // load-shedding signal for
2769                                    // sustained close-group exhaustion).
2770                                    let chunk = self
2771                                        .chunk_get_observed_from_closest_peers(&addr, peer_count)
2772                                        .await
2773                                        .map_err(|e| {
2774                                            self_encryption::Error::Generic(format!(
2775                                                "DataMap resolution failed: {e}"
2776                                            ))
2777                                        })?
2778                                        .ok_or_else(|| {
2779                                            self_encryption::Error::Generic(format!(
2780                                                "DataMap chunk not found: {}",
2781                                                hex::encode(addr)
2782                                            ))
2783                                        })?;
2784                                    let fetched = counter
2785                                        .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
2786                                        + 1;
2787                                    if let Some(ref tx) = prog {
2788                                        let _ =
2789                                            tx.try_send(DownloadEvent::MapChunkFetched { fetched });
2790                                    }
2791                                    Ok::<_, self_encryption::Error>((idx, chunk.content))
2792                                }
2793                            },
2794                        )
2795                        .await?;
2796                        // CRITICAL: self_encryption::get_root_data_map_parallel
2797                        // pairs the returned Vec POSITIONALLY with the input
2798                        // hashes via .zip() and discards our idx field.
2799                        // rebucketed_unordered preserves first-completion
2800                        // order, so sort by idx to restore input order
2801                        // before returning.
2802                        results.sort_by_key(|(idx, _)| *idx);
2803                        Ok(results)
2804                    })
2805                };
2806                get_root_data_map_parallel(data_map.clone(), &fetch)
2807            })
2808            .map_err(|e| Error::Encryption(format!("DataMap resolution failed: {e}")))?;
2809
2810            info!(
2811                "Resolved hierarchical DataMap: {} data chunks",
2812                resolved.len()
2813            );
2814            resolved
2815        } else {
2816            data_map.clone()
2817        };
2818
2819        // Phase 2: Now we know the real chunk count.
2820        let total_chunks = root_map.len();
2821        if let Some(ref tx) = progress {
2822            let _ = tx.try_send(DownloadEvent::DataMapResolved { total_chunks });
2823        }
2824
2825        // Phase 3: Fetch and decrypt data chunks with accurate progress.
2826        let fetched_counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
2827        let fetched_for_closure = fetched_counter.clone();
2828        let progress_for_closure = progress.clone();
2829
2830        let fetch_limiter_outer = self.controller().fetch.clone();
2831        let usable_memory = usable_memory_bytes();
2832        let configured_batch_floor = stream_decrypt_batch_size();
2833        let fetch_cap = fetch_limiter_outer.current();
2834        let decrypt_batch_size = adaptive_stream_decrypt_batch_size(
2835            total_chunks,
2836            fetch_cap,
2837            configured_batch_floor,
2838            usable_memory,
2839        );
2840        info!(
2841            total_chunks,
2842            fetch_cap,
2843            configured_batch_floor,
2844            ?usable_memory,
2845            decrypt_batch_size,
2846            "Selected adaptive stream decrypt batch size"
2847        );
2848
2849        let stream = streaming_decrypt_with_batch_size(
2850            &root_map,
2851            |batch: &[(usize, XorName)]| {
2852                let batch_owned: Vec<(usize, XorName)> = batch.to_vec();
2853                let fetched_ref = fetched_for_closure.clone();
2854                let progress_ref = progress_for_closure.clone();
2855                let fetch_limiter = fetch_limiter_outer.clone();
2856
2857                tokio::task::block_in_place(|| {
2858                    handle.block_on(async {
2859                        // First pass: try every chunk in the batch via
2860                        // chunk_get_observed (which already does its own
2861                        // first-attempt + retry sweep). A chunk that
2862                        // returns Ok(None) here is NOT a fatal failure
2863                        // — it's a candidate for a deferred retry below.
2864                        // We carry the chunk's XorName through so the
2865                        // retry pass can re-fetch by address.
2866                        //
2867                        // The closure ONLY returns Err on a true
2868                        // protocol/network error from chunk_get (the
2869                        // Err variant). Ok(None) is encoded as
2870                        // `Err(addr)` in the inner Result so the outer
2871                        // rebucketed pass doesn't early-abort on it.
2872                        type BatchEntry =
2873                            (usize, std::result::Result<bytes::Bytes, XorName>);
2874                        let raw: Vec<BatchEntry> = rebucketed_unordered(
2875                            &fetch_limiter,
2876                            batch_owned,
2877                            |(idx, hash): (usize, XorName)| {
2878                                let fetched_ref = fetched_ref.clone();
2879                                let progress_ref = progress_ref.clone();
2880                                async move {
2881                                    let addr = hash.0;
2882                                    let addr_hex = hex::encode(addr);
2883                                    match self
2884                                        .chunk_get_observed_from_closest_peers(&addr, peer_count)
2885                                        .await
2886                                    {
2887                                        Ok(Some(chunk)) => {
2888                                            let fetched = fetched_ref.fetch_add(
2889                                                1,
2890                                                std::sync::atomic::Ordering::Relaxed,
2891                                            ) + 1;
2892                                            info!("Downloaded {fetched}/{total_chunks}");
2893                                            if let Some(ref tx) = progress_ref {
2894                                                let _ = tx.try_send(
2895                                                    DownloadEvent::ChunksFetched {
2896                                                        fetched,
2897                                                        total: total_chunks,
2898                                                    },
2899                                                );
2900                                            }
2901                                            Ok::<BatchEntry, self_encryption::Error>((
2902                                                idx,
2903                                                Ok(chunk.content),
2904                                            ))
2905                                        }
2906                                        // chunk_get returned Ok(None): defer
2907                                        // this chunk for a later retry rather
2908                                        // than aborting the whole batch.
2909                                        Ok(None) => Ok((idx, Err(hash))),
2910                                        // A transient error for one chunk
2911                                        // (e.g. its close-group DHT walk
2912                                        // erroring on this pass) must not
2913                                        // abort a multi-hundred-chunk
2914                                        // download. Defer it to the retry
2915                                        // rounds, same as Ok(None); only a
2916                                        // chunk that survives all deferred
2917                                        // rounds is fatal.
2918                                        Err(e) => {
2919                                            info!(
2920                                                "First-pass fetch error for {addr_hex}: {e}; deferring"
2921                                            );
2922                                            Ok((idx, Err(hash)))
2923                                        }
2924                                    }
2925                                }
2926                            },
2927                        )
2928                        .await?;
2929
2930                        // Partition: things we already have vs the
2931                        // deferred set we need to retry.
2932                        let mut results: Vec<(usize, bytes::Bytes)> = Vec::new();
2933                        let mut deferred: Vec<(usize, XorName)> = Vec::new();
2934                        for (idx, inner) in raw {
2935                            match inner {
2936                                Ok(bytes) => results.push((idx, bytes)),
2937                                Err(hash) => deferred.push((idx, hash)),
2938                            }
2939                        }
2940
2941                        // Deferred retry pass: retry the deferred chunks
2942                        // in CONCURRENT rounds (reusing the fetch
2943                        // limiter's cap), not serially. The first round
2944                        // fires immediately — most deferrals on a
2945                        // healthy-but-lossy link are peer-side noise
2946                        // that clears in well under a second, and
2947                        // serializing them behind mandatory multi-second
2948                        // sleeps was the single biggest throughput sink
2949                        // on such links (a batch deferring ~20 chunks
2950                        // burned minutes of near-zero throughput even
2951                        // though every chunk succeeded on its first
2952                        // retry). Only chunks that survive a round get a
2953                        // longer back-off before the next, so genuine
2954                        // saturation still gets time to settle.
2955                        if !deferred.is_empty() {
2956                            // Round delays in seconds. Round 0 is
2957                            // immediate; later rounds back off to ride
2958                            // out sustained saturation.
2959                            const DEFERRED_ROUND_DELAYS_SECS: [u64; 3] = [0, 15, 45];
2960                            info!(
2961                                "Deferring {} chunk(s) for concurrent retry after batch settles",
2962                                deferred.len()
2963                            );
2964                            let mut remaining = deferred;
2965                            for (round, &delay_secs) in DEFERRED_ROUND_DELAYS_SECS
2966                                .iter()
2967                                .enumerate()
2968                            {
2969                                if remaining.is_empty() {
2970                                    break;
2971                                }
2972                                if delay_secs > 0 {
2973                                    tokio::time::sleep(std::time::Duration::from_secs(
2974                                        delay_secs,
2975                                    ))
2976                                    .await;
2977                                }
2978                                info!(
2979                                    "Deferred retry round {}/{}: {} chunk(s)",
2980                                    round + 1,
2981                                    DEFERRED_ROUND_DELAYS_SECS.len(),
2982                                    remaining.len(),
2983                                );
2984                                let round_input = std::mem::take(&mut remaining);
2985                                let round_results: Vec<BatchEntry> = rebucketed_unordered(
2986                                    &fetch_limiter,
2987                                    round_input,
2988                                    |(idx, hash): (usize, XorName)| {
2989                                        let fetched_ref = fetched_ref.clone();
2990                                        let progress_ref = progress_ref.clone();
2991                                        async move {
2992                                            let addr = hash.0;
2993                                            // Both Ok(None) and a transient
2994                                            // Err re-defer the chunk to the
2995                                            // next round rather than
2996                                            // aborting; only the final
2997                                            // round's leftovers are fatal.
2998                                            match self
2999                                                .chunk_get_observed_from_closest_peers(
3000                                                    &addr, peer_count,
3001                                                )
3002                                                .await
3003                                            {
3004                                                Ok(Some(chunk)) => {
3005                                                    let fetched = fetched_ref.fetch_add(
3006                                                        1,
3007                                                        std::sync::atomic::Ordering::Relaxed,
3008                                                    ) + 1;
3009                                                    info!(
3010                                                        "Downloaded {fetched}/{total_chunks} (deferred retry)"
3011                                                    );
3012                                                    if let Some(ref tx) = progress_ref {
3013                                                        let _ = tx.try_send(
3014                                                            DownloadEvent::ChunksFetched {
3015                                                                fetched,
3016                                                                total: total_chunks,
3017                                                            },
3018                                                        );
3019                                                    }
3020                                                    Ok::<BatchEntry, self_encryption::Error>((
3021                                                        idx,
3022                                                        Ok(chunk.content),
3023                                                    ))
3024                                                }
3025                                                Ok(None) => Ok((idx, Err(hash))),
3026                                                Err(e) => {
3027                                                    info!(
3028                                                        "Deferred retry for {} hit transient error: {e}; re-deferring",
3029                                                        hex::encode(addr)
3030                                                    );
3031                                                    Ok((idx, Err(hash)))
3032                                                }
3033                                            }
3034                                        }
3035                                    },
3036                                )
3037                                .await?;
3038                                for (idx, inner) in round_results {
3039                                    match inner {
3040                                        Ok(bytes) => results.push((idx, bytes)),
3041                                        Err(hash) => remaining.push((idx, hash)),
3042                                    }
3043                                }
3044                            }
3045                            if let Some((_, hash)) = remaining.first() {
3046                                return Err(self_encryption::Error::Generic(format!(
3047                                    "Chunk not found after {} deferred retry rounds: {}",
3048                                    DEFERRED_ROUND_DELAYS_SECS.len(),
3049                                    hex::encode(hash.0),
3050                                )));
3051                            }
3052                        }
3053
3054                        // streaming_decrypt itself sort_by_keys before
3055                        // zipping, but the same closure is also passed
3056                        // through get_root_data_map_parallel internally
3057                        // (see self_encryption::stream_decrypt.rs::new), and
3058                        // THAT path zips positionally without sorting. Sort
3059                        // here so both consumers see input order.
3060                        results.sort_by_key(|(idx, _)| *idx);
3061                        Ok(results)
3062                    })
3063                })
3064            },
3065            decrypt_batch_size,
3066        )
3067        .map_err(|e| Error::Encryption(format!("streaming decrypt failed: {e}")))?;
3068
3069        // Drive the iterator (each `next()` runs the batched fetch via
3070        // block_in_place) and hand each decrypted segment to the sink in
3071        // order. Awaiting the sink between items yields back to the runtime so
3072        // a bounded sink can apply backpressure.
3073        let mut bytes_total = 0u64;
3074        for chunk_result in stream {
3075            let chunk: Bytes =
3076                chunk_result.map_err(|e| Error::Encryption(format!("decryption failed: {e}")))?;
3077            bytes_total += chunk.len() as u64;
3078            on_chunk(chunk).await?;
3079        }
3080        Ok(bytes_total)
3081    }
3082
3083    /// Download and decrypt a file to disk, with optional progress events.
3084    ///
3085    /// Same as [`Client::file_download`] but sends [`DownloadEvent`]s for UI
3086    /// feedback. Streams to a temp file (one decrypt batch resident at a time)
3087    /// and renames atomically on success. A `TempDownload` guard removes the
3088    /// staging file on any error path, including a panic.
3089    pub async fn file_download_with_progress(
3090        &self,
3091        data_map: &DataMap,
3092        output: &Path,
3093        progress: Option<mpsc::Sender<DownloadEvent>>,
3094    ) -> Result<u64> {
3095        self.file_download_with_progress_using_peer_count(
3096            data_map,
3097            output,
3098            progress,
3099            self.config().close_group_size,
3100        )
3101        .await
3102    }
3103
3104    /// Download and decrypt a file to disk with progress events, trying
3105    /// `peer_count` closest peers for every chunk fetch.
3106    ///
3107    /// Streams to a temp file (one decrypt batch resident at a time) and
3108    /// renames atomically on success.
3109    async fn file_download_with_progress_using_peer_count(
3110        &self,
3111        data_map: &DataMap,
3112        output: &Path,
3113        progress: Option<mpsc::Sender<DownloadEvent>>,
3114        peer_count: usize,
3115    ) -> Result<u64> {
3116        debug!("Downloading file to {}", output.display());
3117
3118        let parent = output.parent().unwrap_or_else(|| Path::new("."));
3119        let unique: u64 = rand::random();
3120        let tmp_path = parent.join(format!(".ant_download_{}_{unique}.tmp", std::process::id()));
3121
3122        // Guard removes the staging file on any early return OR a panic unwind
3123        // out of the `block_in_place` decrypt loop; defused only by a
3124        // successful commit(). Centralizes what used to be three duplicated
3125        // cleanup arms.
3126        let tmp = TempDownload::new(tmp_path);
3127        let mut file = std::fs::File::create(tmp.path())?;
3128
3129        let bytes_written = self
3130            .download_decrypted_chunks(data_map, progress, peer_count, |bytes| {
3131                let r = file.write_all(&bytes).map_err(Error::from);
3132                std::future::ready(r)
3133            })
3134            .await?;
3135        file.flush()?;
3136        drop(file); // close the handle before rename (Windows won't rename an open file)
3137
3138        tmp.commit(output)?;
3139        info!(
3140            "File downloaded: {bytes_written} bytes written to {}",
3141            output.display()
3142        );
3143        Ok(bytes_written)
3144    }
3145
3146    /// Download and decrypt a file, streaming the plaintext to `sink` instead
3147    /// of writing to disk.
3148    ///
3149    /// Constant memory (one decrypt batch resident at a time); the caller
3150    /// receives bytes progressively as each batch decrypts, suitable for
3151    /// forwarding to an HTTP chunked body or a gRPC response stream. The
3152    /// bounded `sink` applies backpressure. If the receiver is dropped (e.g.
3153    /// the client disconnected) the download stops early and returns
3154    /// [`Error::Cancelled`].
3155    ///
3156    /// The channel item type is `Result<Bytes, Error>`, so the caller sets up:
3157    ///
3158    /// ```ignore
3159    /// let (tx, rx) = tokio::sync::mpsc::channel::<Result<Bytes, Error>>(8);
3160    /// ```
3161    ///
3162    /// Typically the caller `tokio::spawn`s this and converts the matching
3163    /// `Receiver` into its response stream. Requires a multi-threaded Tokio
3164    /// runtime (the decrypt iterator uses `block_in_place`).
3165    pub async fn file_download_to_sender(
3166        &self,
3167        data_map: &DataMap,
3168        sink: mpsc::Sender<std::result::Result<Bytes, Error>>,
3169        progress: Option<mpsc::Sender<DownloadEvent>>,
3170    ) -> Result<u64> {
3171        let peer_count = self.config().close_group_size;
3172        self.download_decrypted_chunks(data_map, progress, peer_count, |bytes| {
3173            let sink = sink.clone();
3174            async move {
3175                sink.send(Ok(bytes))
3176                    .await
3177                    .map_err(|_| Error::Cancelled("download stream receiver dropped".into()))
3178            }
3179        })
3180        .await
3181    }
3182}
3183
3184#[cfg(test)]
3185#[allow(clippy::unwrap_used)]
3186mod tests {
3187    use super::*;
3188
3189    #[test]
3190    fn distributed_sample_indices_spreads_across_large_file() {
3191        // cap 5 over 100 chunks: first and last included, evenly spread.
3192        assert_eq!(distributed_sample_indices(100, 5), vec![0, 24, 49, 74, 99]);
3193    }
3194
3195    #[test]
3196    fn distributed_sample_indices_covers_whole_small_file() {
3197        // total <= cap returns every index, preserving the exact
3198        // "whole file sampled" detection in estimate_upload_cost.
3199        assert_eq!(distributed_sample_indices(3, 5), vec![0, 1, 2]);
3200        assert_eq!(distributed_sample_indices(5, 5), vec![0, 1, 2, 3, 4]);
3201    }
3202
3203    #[test]
3204    fn distributed_sample_indices_is_in_range_and_increasing() {
3205        assert!(distributed_sample_indices(0, 5).is_empty());
3206        assert_eq!(distributed_sample_indices(1, 5), vec![0]);
3207        for total in 1..200usize {
3208            let idx = distributed_sample_indices(total, 5);
3209            assert_eq!(*idx.first().unwrap(), 0);
3210            assert_eq!(*idx.last().unwrap(), total - 1);
3211            assert!(idx.iter().all(|&i| i < total));
3212            assert!(idx.windows(2).all(|w| w[0] < w[1]));
3213        }
3214    }
3215
3216    #[test]
3217    fn disk_space_check_passes_for_small_file() {
3218        // A 1 KB file should always pass the disk space check
3219        check_disk_space_for_spill(1024).unwrap();
3220    }
3221
3222    #[test]
3223    fn disk_space_check_fails_for_absurd_size() {
3224        // Requesting space for a 1 exabyte file should fail on any real system
3225        let result = check_disk_space_for_spill(u64::MAX / 2);
3226        assert!(result.is_err());
3227        let err = result.unwrap_err();
3228        assert!(
3229            matches!(err, Error::InsufficientDiskSpace(_)),
3230            "expected InsufficientDiskSpace, got: {err}"
3231        );
3232    }
3233
3234    #[test]
3235    fn adaptive_stream_decrypt_batch_size_tracks_fetch_headroom() {
3236        let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, Some(u64::MAX));
3237
3238        assert_eq!(batch_size, 64 * DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER);
3239    }
3240
3241    #[test]
3242    fn adaptive_stream_decrypt_batch_size_caps_to_total_chunks() {
3243        let batch_size = adaptive_stream_decrypt_batch_size(12, 64, 10, Some(u64::MAX));
3244
3245        assert_eq!(batch_size, 12);
3246    }
3247
3248    #[test]
3249    fn adaptive_stream_decrypt_batch_size_honours_configured_floor() {
3250        let batch_size = adaptive_stream_decrypt_batch_size(1_000, 1, 32, None);
3251
3252        assert_eq!(batch_size, 32);
3253    }
3254
3255    #[test]
3256    fn adaptive_stream_decrypt_batch_size_does_not_expand_without_memory_reading() {
3257        let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, None);
3258
3259        assert_eq!(batch_size, 10);
3260    }
3261
3262    #[test]
3263    fn adaptive_stream_decrypt_batch_size_caps_to_memory_budget() {
3264        let estimated_bytes_per_chunk = (self_encryption::MAX_CHUNK_SIZE as u64)
3265            .saturating_mul(DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER)
3266            .max(1);
3267        let usable_memory = estimated_bytes_per_chunk
3268            .saturating_mul(16)
3269            .saturating_mul(DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR);
3270        let batch_size = adaptive_stream_decrypt_batch_size(1_000, 256, 10, Some(usable_memory));
3271
3272        assert_eq!(batch_size, 16);
3273    }
3274
3275    #[test]
3276    fn adaptive_stream_decrypt_batch_size_keeps_one_chunk_when_memory_is_tight() {
3277        let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, Some(1));
3278
3279        assert_eq!(batch_size, 1);
3280    }
3281
3282    #[test]
3283    fn cached_merkle_covers_only_when_all_addresses_have_proofs() {
3284        let covered = compute_address(&Bytes::from_static(b"covered"));
3285        let extra = compute_address(&Bytes::from_static(b"extra"));
3286        let missing = compute_address(&Bytes::from_static(b"missing"));
3287        let cached = MerkleBatchPaymentResult {
3288            proofs: HashMap::from([(covered, vec![1]), (extra, vec![2])]),
3289            chunk_count: 2,
3290            storage_cost_atto: "0".to_string(),
3291            gas_cost_wei: 0,
3292            merkle_payment_timestamp: 0,
3293        };
3294
3295        assert!(cached_merkle_covers_addresses(&cached, &[covered]));
3296        assert!(cached_merkle_covers_addresses(&cached, &[covered, extra]));
3297        assert!(!cached_merkle_covers_addresses(
3298            &cached,
3299            &[covered, missing]
3300        ));
3301    }
3302
3303    /// A partial merkle payment leaves some addresses without a proof. Those
3304    /// must be split out so `upload_waves_merkle` reports them as failed
3305    /// (`PartialUpload`) instead of aborting the whole file — preserving the
3306    /// addresses' original order in each group.
3307    #[test]
3308    fn partition_addresses_by_proof_splits_paid_and_unpaid() {
3309        let paid_a = [1u8; 32];
3310        let unpaid_b = [2u8; 32];
3311        let paid_c = [3u8; 32];
3312        let unpaid_d = [4u8; 32];
3313        let proofs: HashMap<[u8; 32], Vec<u8>> =
3314            HashMap::from([(paid_a, vec![0xaa]), (paid_c, vec![0xcc])]);
3315
3316        let (to_store, missing) =
3317            partition_addresses_by_proof(&[paid_a, unpaid_b, paid_c, unpaid_d], &proofs);
3318
3319        assert_eq!(to_store, vec![paid_a, paid_c]);
3320        assert_eq!(missing, vec![unpaid_b, unpaid_d]);
3321    }
3322
3323    /// A wave that returns `Ok` contributes its stored chunks, parsed cost, and
3324    /// stats; nothing is recorded as failed.
3325    #[test]
3326    fn fold_single_wave_keeps_ok_wave() {
3327        let stored = vec![[1u8; 32], [2u8; 32]];
3328        let stats = WaveAggregateStats {
3329            chunk_attempts_total: 7,
3330            ..Default::default()
3331        };
3332
3333        let outcome = fold_single_wave(Ok((stored.clone(), "100".to_string(), 9, stats))).unwrap();
3334
3335        assert_eq!(outcome.stored, stored);
3336        assert!(outcome.failed.is_empty());
3337        assert_eq!(outcome.storage_atto.to_string(), "100");
3338        assert_eq!(outcome.gas_wei, 9);
3339        assert_eq!(outcome.stats.chunk_attempts_total, 7);
3340    }
3341
3342    /// The core V2-461 semantic: a wave short of quorum (`PartialUpload`) is
3343    /// recoverable — its stored chunks, failed chunks, and on-chain spend are
3344    /// folded so the caller can continue to the next wave rather than aborting
3345    /// the whole file.
3346    #[test]
3347    fn fold_single_wave_folds_partial_upload() {
3348        let stored = vec![[3u8; 32]];
3349        let failed = vec![([4u8; 32], "short of quorum".to_string())];
3350        let err = Error::PartialUpload {
3351            stored: stored.clone(),
3352            stored_count: 1,
3353            failed: failed.clone(),
3354            failed_count: 1,
3355            total_chunks: 2,
3356            spend: Box::new(PartialUploadSpend {
3357                storage_cost_atto: "250".to_string(),
3358                gas_cost_wei: 11,
3359            }),
3360            reason: "wave store failed after retries".to_string(),
3361        };
3362
3363        let outcome = fold_single_wave(Err(err)).unwrap();
3364
3365        assert_eq!(outcome.stored, stored);
3366        assert_eq!(outcome.failed, failed);
3367        assert_eq!(outcome.storage_atto.to_string(), "250");
3368        assert_eq!(outcome.gas_wei, 11);
3369        // `PartialUpload` carries no stats, so the failed wave contributes none.
3370        assert_eq!(outcome.stats.chunk_attempts_total, 0);
3371    }
3372
3373    /// A non-`PartialUpload` error (wallet/payment-infrastructure failure) is
3374    /// fatal and must abort the file, not be folded into the failed set.
3375    #[test]
3376    fn fold_single_wave_propagates_fatal_error() {
3377        let result = fold_single_wave(Err(Error::Payment("wallet unavailable".to_string())));
3378
3379        assert!(
3380            matches!(result, Err(Error::Payment(_))),
3381            "fatal payment error must propagate, got: {result:?}"
3382        );
3383    }
3384
3385    #[test]
3386    fn partition_addresses_by_proof_handles_all_or_nothing() {
3387        let a = [5u8; 32];
3388        let b = [6u8; 32];
3389
3390        // No proofs at all → every address is missing.
3391        let empty: HashMap<[u8; 32], Vec<u8>> = HashMap::new();
3392        let (to_store, missing) = partition_addresses_by_proof(&[a, b], &empty);
3393        assert!(to_store.is_empty());
3394        assert_eq!(missing, vec![a, b]);
3395
3396        // All proofs present → nothing missing.
3397        let full: HashMap<[u8; 32], Vec<u8>> = HashMap::from([(a, vec![1]), (b, vec![2])]);
3398        let (to_store, missing) = partition_addresses_by_proof(&[a, b], &full);
3399        assert_eq!(to_store, vec![a, b]);
3400        assert!(missing.is_empty());
3401    }
3402
3403    #[test]
3404    fn chunk_spill_round_trip() {
3405        let mut spill = ChunkSpill::new().unwrap();
3406        let data1 = vec![0xAA; 1024];
3407        let data2 = vec![0xBB; 2048];
3408
3409        spill.push(&data1).unwrap();
3410        spill.push(&data2).unwrap();
3411
3412        assert_eq!(spill.len(), 2);
3413        assert_eq!(spill.total_bytes(), 1024 + 2048);
3414        let chunk_entries = spill.chunk_entries().unwrap();
3415        let entry_total: u64 = chunk_entries.iter().map(|(_, size)| *size).sum();
3416        assert_eq!(entry_total, 1024 + 2048);
3417
3418        // Read back and verify
3419        let chunk1 = spill.read_chunk(spill.addresses.first().unwrap()).unwrap();
3420        assert_eq!(&chunk1[..], &data1[..]);
3421
3422        let chunk2 = spill.read_chunk(spill.addresses.get(1).unwrap()).unwrap();
3423        assert_eq!(&chunk2[..], &data2[..]);
3424
3425        // Verify waves with 1-chunk wave size
3426        let waves: Vec<_> = spill.addresses.chunks(1).collect();
3427        assert_eq!(waves.len(), 2);
3428    }
3429
3430    #[test]
3431    fn chunk_spill_cleanup_on_drop() {
3432        let dir;
3433        {
3434            let spill = ChunkSpill::new().unwrap();
3435            dir = spill.dir.clone();
3436            assert!(dir.exists());
3437        }
3438        // After drop, the directory should be cleaned up
3439        assert!(!dir.exists(), "spill dir should be removed on drop");
3440    }
3441
3442    #[test]
3443    fn chunk_spill_deduplicates_identical_content() {
3444        let mut spill = ChunkSpill::new().unwrap();
3445        let data = vec![0xCC; 512];
3446
3447        spill.push(&data).unwrap();
3448        spill.push(&data).unwrap(); // same content, should be skipped
3449        spill.push(&data).unwrap(); // again
3450
3451        assert_eq!(spill.len(), 1, "duplicate chunks should be deduplicated");
3452        assert_eq!(
3453            spill.total_bytes(),
3454            512,
3455            "total_bytes should count unique only"
3456        );
3457
3458        // Different content should still be added
3459        let data2 = vec![0xDD; 256];
3460        spill.push(&data2).unwrap();
3461        assert_eq!(spill.len(), 2);
3462        assert_eq!(spill.total_bytes(), 512 + 256);
3463    }
3464}
3465
3466/// Compile-time assertions that Client file method futures are Send.
3467#[cfg(test)]
3468mod send_assertions {
3469    use super::*;
3470
3471    fn _assert_send<T: Send>(_: &T) {}
3472
3473    #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
3474    async fn _file_upload_is_send(client: &Client) {
3475        let fut = client.file_upload(Path::new("/dev/null"));
3476        _assert_send(&fut);
3477    }
3478
3479    #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
3480    async fn _file_upload_with_mode_is_send(client: &Client) {
3481        let fut = client.file_upload_with_mode(Path::new("/dev/null"), PaymentMode::Auto);
3482        _assert_send(&fut);
3483    }
3484
3485    #[allow(
3486        dead_code,
3487        unreachable_code,
3488        unused_variables,
3489        clippy::diverging_sub_expression
3490    )]
3491    async fn _file_download_is_send(client: &Client) {
3492        let dm: DataMap = todo!();
3493        let fut = client.file_download(&dm, Path::new("/dev/null"));
3494        _assert_send(&fut);
3495    }
3496}