Skip to main content

ant_core/data/client/
file.rs

1//! File operations using streaming self-encryption.
2//!
3//! Upload files directly from disk without loading them entirely into memory.
4//! Uses `stream_encrypt` to process files in 8KB chunks, encrypting and
5//! uploading each piece as it's produced.
6//!
7//! Encrypted chunks are spilled to a temporary directory during encryption
8//! so that peak memory usage is bounded to one wave (~256 MB for 64 × 4 MB
9//! chunks) regardless of file size.
10//!
11//! For in-memory data uploads, see the `data` module.
12
13use crate::data::client::adaptive::observe_op;
14use crate::data::client::batch::{
15    finalize_batch_payment, PaymentIntent, PreparedChunk, WaveAggregateStats,
16};
17use crate::data::client::classify_error;
18use crate::data::client::merkle::{
19    finalize_merkle_batch, should_use_merkle, MerkleBatchPaymentResult, PaymentMode,
20    PreparedMerkleBatch,
21};
22use crate::data::client::Client;
23use crate::data::error::{Error, Result};
24use ant_protocol::evm::{Amount, PaymentQuote, QuoteHash, TxHash, MAX_LEAVES};
25use ant_protocol::transport::{MultiAddr, PeerId};
26use ant_protocol::{compute_address, DATA_TYPE_CHUNK};
27use bytes::Bytes;
28use fs2::FileExt;
29use futures::stream::{self, StreamExt};
30use self_encryption::{get_root_data_map_parallel, stream_encrypt, streaming_decrypt, DataMap};
31use std::collections::{HashMap, HashSet};
32use std::io::Write;
33use std::path::{Path, PathBuf};
34use std::sync::{Arc, Mutex};
35use tokio::runtime::Handle;
36use tokio::sync::mpsc;
37use tracing::{debug, info, warn};
38use xor_name::XorName;
39
40/// Progress events emitted during file upload for UI feedback.
41#[derive(Debug, Clone)]
42pub enum UploadEvent {
43    /// A chunk has been encrypted and spilled to disk.
44    Encrypting { chunks_done: usize },
45    /// File encryption complete.
46    Encrypted { total_chunks: usize },
47    /// Starting quote collection for a wave.
48    QuotingChunks {
49        wave: usize,
50        total_waves: usize,
51        chunks_in_wave: usize,
52    },
53    /// A chunk has been quoted (peer discovery + price received).
54    /// This is the slow phase — each quote involves network round-trips.
55    ChunkQuoted { quoted: usize, total: usize },
56    /// A chunk has been stored on the network.
57    ChunkStored { stored: usize, total: usize },
58    /// A wave has completed.
59    WaveComplete {
60        wave: usize,
61        total_waves: usize,
62        stored_so_far: usize,
63        total: usize,
64    },
65}
66
67/// Progress events emitted during file download for UI feedback.
68#[derive(Debug, Clone)]
69pub enum DownloadEvent {
70    /// Resolving hierarchical DataMap to discover real chunk count.
71    ResolvingDataMap { total_map_chunks: usize },
72    /// A DataMap chunk has been fetched during resolution.
73    MapChunkFetched { fetched: usize },
74    /// DataMap resolved — total data chunk count now known.
75    DataMapResolved { total_chunks: usize },
76    /// Data chunks are being fetched from the network.
77    ChunksFetched { fetched: usize, total: usize },
78}
79
80/// One entry in the per-chunk quote list returned by
81/// [`Client::get_store_quotes`]: the responding peer, its addresses, the
82/// signed quote it returned, and the payment amount it is demanding.
83type QuoteEntry = (PeerId, Vec<MultiAddr>, PaymentQuote, Amount);
84
85/// Number of chunks per upload wave (matches batch.rs PAYMENT_WAVE_SIZE).
86const UPLOAD_WAVE_SIZE: usize = 64;
87
88/// Maximum number of distinct chunk addresses to sample when probing for a
89/// representative quote in [`Client::estimate_upload_cost`].
90///
91/// Bounded small so we never spend more than a couple of round-trips on the
92/// `AlreadyStored` retry path, which only matters when many leading chunks
93/// of a file already live on the network.
94const ESTIMATE_SAMPLE_CAP: usize = 5;
95
96/// Gas used by one `pay_for_quotes` transaction that packs up to
97/// `UPLOAD_WAVE_SIZE` (quote_hash, rewards_address, amount) entries.
98///
99/// `batch_pay` in `batch.rs` flattens every chunk's close-group quotes into a
100/// single EVM call, so the dominant cost is the SSTOREs for each entry plus
101/// the base tx overhead. On Arbitrum that is roughly
102/// `21_000 + 64 × (20_000 + small)` ≈ 1.3M; we round up to 1.5M as a
103/// conservative per-wave upper bound.
104const GAS_PER_WAVE_TX: u128 = 1_500_000;
105
106/// Gas used by one merkle batch payment transaction.
107///
108/// One on-chain tx per merkle sub-batch, but each tx verifies a merkle tree
109/// and posts a pool commitment, so budget higher than a plain transfer.
110const GAS_PER_MERKLE_TX: u128 = 500_000;
111
112/// Advisory gas price (wei/gas) used to turn the gas estimate into an ETH
113/// figure when no live gas oracle is consulted.
114///
115/// Arbitrum One typically settles around 0.1 gwei on quiet blocks; we use
116/// that as the default so the CLI prints a sensible order-of-magnitude
117/// number. Users should treat the reported gas cost as an estimate, not a
118/// commitment — real gas is bid at submission time.
119const ARBITRUM_GAS_PRICE_WEI: u128 = 100_000_000;
120
121/// Extra headroom percentage for disk space check.
122///
123/// Encrypted chunks are slightly larger than the source data due to padding
124/// and self-encryption overhead. We require file_size + 10% free space in
125/// the temp directory to account for this.
126const DISK_SPACE_HEADROOM_PERCENT: u64 = 10;
127
128/// Temporary on-disk buffer for encrypted chunks.
129///
130/// During file encryption, chunks are written to a temp directory so that
131/// only their 32-byte addresses stay in memory. At upload time chunks are
132/// read back one wave at a time, keeping peak RAM at ~`UPLOAD_WAVE_SIZE × 4 MB`.
133/// Grace period (in seconds) before a spill dir is eligible for stale cleanup.
134///
135/// This is a small TOCTOU guard covering the sub-millisecond window inside
136/// [`ChunkSpill::new`] between `create_dir` and `try_lock_exclusive`. Once a
137/// dir is older than this and its lockfile is releasable, the owning process
138/// is gone and the dir is safe to reap — regardless of how old it is.
139///
140/// The previous policy waited 24 h before reaping any orphan, which meant
141/// that any non-graceful exit (SIGKILL, kernel OOM, panic abort) leaked its
142/// spill dir until the next day's upload — and on a host being restart-looped
143/// by systemd, orphans could fill the disk well within that window.
144const SPILL_STALE_GRACE_SECS: u64 = 30;
145
146/// Prefix for spill directory names to distinguish from user files.
147const SPILL_DIR_PREFIX: &str = "spill_";
148
149/// Lockfile name inside each spill dir to signal active use.
150const SPILL_LOCK_NAME: &str = ".lock";
151
152struct ChunkSpill {
153    /// Directory holding spilled chunk files (named by hex address).
154    dir: PathBuf,
155    /// Lockfile held for the lifetime of this spill (prevents stale cleanup).
156    _lock: std::fs::File,
157    /// Deduplicated list of chunk addresses.
158    addresses: Vec<[u8; 32]>,
159    /// Tracks seen addresses for deduplication.
160    seen: HashSet<[u8; 32]>,
161    /// Running total of unique chunk byte sizes (for average-size calculation).
162    total_bytes: u64,
163}
164
165impl ChunkSpill {
166    /// Return the parent directory for all spill dirs: `<data_dir>/spill/`.
167    fn spill_root() -> Result<PathBuf> {
168        use crate::config;
169        let root = config::data_dir()
170            .map_err(|e| Error::Config(format!("cannot determine data dir for spill: {e}")))?
171            .join("spill");
172        Ok(root)
173    }
174
175    /// Create a new spill directory under `<data_dir>/spill/`.
176    ///
177    /// Directory name is `spill_<timestamp>_<random>` so orphans can be
178    /// identified by prefix and cleaned up by age. A lockfile inside the
179    /// dir prevents concurrent cleanup from deleting an active spill.
180    fn new() -> Result<Self> {
181        let root = Self::spill_root()?;
182        std::fs::create_dir_all(&root)?;
183
184        // Clean up stale spill dirs from previous crashed runs.
185        Self::cleanup_stale(&root);
186
187        let now = std::time::SystemTime::now()
188            .duration_since(std::time::UNIX_EPOCH)
189            .unwrap_or_default()
190            .as_secs();
191        let unique: u64 = rand::random();
192        let dir = root.join(format!("{SPILL_DIR_PREFIX}{now}_{unique}"));
193        std::fs::create_dir(&dir)?;
194
195        // Create and hold a lockfile for the lifetime of this spill.
196        // cleanup_stale() will skip dirs with locked files.
197        let lock_path = dir.join(SPILL_LOCK_NAME);
198        let lock_file = std::fs::File::create(&lock_path).map_err(|e| {
199            Error::Io(std::io::Error::new(
200                e.kind(),
201                format!("failed to create spill lockfile: {e}"),
202            ))
203        })?;
204        lock_file.try_lock_exclusive().map_err(|e| {
205            Error::Io(std::io::Error::new(
206                e.kind(),
207                format!("failed to lock spill lockfile: {e}"),
208            ))
209        })?;
210
211        Ok(Self {
212            dir,
213            _lock: lock_file,
214            addresses: Vec::new(),
215            seen: HashSet::new(),
216            total_bytes: 0,
217        })
218    }
219
220    /// Clean up stale spill directories. Best-effort, errors are logged.
221    ///
222    /// A spill dir is reaped when:
223    /// 1. Its name starts with `SPILL_DIR_PREFIX` (ignores unrelated files)
224    /// 2. It is an actual directory, not a symlink (prevents symlink attacks)
225    /// 3. Its timestamp is older than `SPILL_STALE_GRACE_SECS` (TOCTOU guard)
226    /// 4. Its lockfile is releasable — i.e. no live process holds it
227    ///
228    /// The lockfile is the primary correctness gate: a releasable lock means
229    /// the owning `ChunkSpill` has been dropped or the process is gone, so
230    /// the dir is fair game. The grace period covers only the brief window
231    /// inside [`Self::new`] between `create_dir` and `try_lock_exclusive`.
232    ///
233    /// Safe to call concurrently from multiple processes.
234    fn cleanup_stale(root: &Path) {
235        let now = std::time::SystemTime::now()
236            .duration_since(std::time::UNIX_EPOCH)
237            .unwrap_or_default()
238            .as_secs();
239
240        if now == 0 {
241            // Clock is broken (before Unix epoch). Skip cleanup to avoid
242            // misidentifying dirs as stale.
243            warn!("System clock before Unix epoch, skipping spill cleanup");
244            return;
245        }
246
247        let entries = match std::fs::read_dir(root) {
248            Ok(entries) => entries,
249            Err(_) => return,
250        };
251
252        for entry in entries.flatten() {
253            let name = entry.file_name();
254            let name_str = name.to_string_lossy();
255
256            // Only process dirs with our prefix.
257            let suffix = match name_str.strip_prefix(SPILL_DIR_PREFIX) {
258                Some(s) => s,
259                None => continue,
260            };
261
262            // Parse timestamp: "spill_<timestamp>_<random>"
263            let timestamp: u64 = match suffix.split('_').next().and_then(|s| s.parse().ok()) {
264                Some(ts) => ts,
265                None => continue,
266            };
267
268            if now.saturating_sub(timestamp) < SPILL_STALE_GRACE_SECS {
269                continue;
270            }
271
272            // Safety: only delete actual directories, not symlinks.
273            let file_type = match entry.file_type() {
274                Ok(ft) => ft,
275                Err(_) => continue,
276            };
277            if !file_type.is_dir() {
278                continue;
279            }
280
281            let path = entry.path();
282
283            // Check lockfile: if locked, the dir is in active use -- skip it.
284            let lock_path = path.join(SPILL_LOCK_NAME);
285            if let Ok(lock_file) = std::fs::File::open(&lock_path) {
286                use fs2::FileExt;
287                if lock_file.try_lock_exclusive().is_err() {
288                    // Lock held by another process -- dir is active.
289                    debug!("Skipping active spill dir: {}", path.display());
290                    continue;
291                }
292                // We acquired the lock, so no one else holds it.
293                // Drop it before deleting.
294                drop(lock_file);
295            }
296
297            info!("Cleaning up stale spill dir: {}", path.display());
298            if let Err(e) = std::fs::remove_dir_all(&path) {
299                warn!("Failed to clean up stale spill dir {}: {e}", path.display());
300            }
301        }
302    }
303
304    /// Run stale spill cleanup. Call at client startup or periodically.
305    #[allow(dead_code)]
306    pub(crate) fn run_cleanup() {
307        if let Ok(root) = Self::spill_root() {
308            Self::cleanup_stale(&root);
309        }
310    }
311
312    /// Write one encrypted chunk to disk and record its address.
313    ///
314    /// Deduplicates by content address: if the same chunk was already
315    /// spilled, the write and accounting are skipped. This prevents
316    /// double-uploads and inflated quoting metrics.
317    fn push(&mut self, content: &[u8]) -> Result<()> {
318        let address = compute_address(content);
319        if !self.seen.insert(address) {
320            return Ok(());
321        }
322        let path = self.dir.join(hex::encode(address));
323        std::fs::write(&path, content)?;
324        self.total_bytes += content.len() as u64;
325        self.addresses.push(address);
326        Ok(())
327    }
328
329    /// Number of chunks stored.
330    fn len(&self) -> usize {
331        self.addresses.len()
332    }
333
334    /// Total bytes of all spilled chunks.
335    fn total_bytes(&self) -> u64 {
336        self.total_bytes
337    }
338
339    /// Average chunk size in bytes (for quoting metrics).
340    fn avg_chunk_size(&self) -> u64 {
341        if self.addresses.is_empty() {
342            return 0;
343        }
344        self.total_bytes / self.addresses.len() as u64
345    }
346
347    /// Read a single chunk back from disk by address.
348    fn read_chunk(&self, address: &[u8; 32]) -> Result<Bytes> {
349        let path = self.dir.join(hex::encode(address));
350        let data = std::fs::read(&path).map_err(|e| {
351            Error::Io(std::io::Error::new(
352                e.kind(),
353                format!("reading spilled chunk {}: {e}", hex::encode(address)),
354            ))
355        })?;
356        Ok(Bytes::from(data))
357    }
358
359    /// Iterate over address slices in wave-sized groups.
360    fn waves(&self) -> std::slice::Chunks<'_, [u8; 32]> {
361        self.addresses.chunks(UPLOAD_WAVE_SIZE)
362    }
363
364    /// Read a wave of chunks from disk.
365    fn read_wave(&self, wave_addrs: &[[u8; 32]]) -> Result<Vec<(Bytes, [u8; 32])>> {
366        let mut out = Vec::with_capacity(wave_addrs.len());
367        for addr in wave_addrs {
368            let content = self.read_chunk(addr)?;
369            out.push((content, *addr));
370        }
371        Ok(out)
372    }
373
374    /// Clean up the spill directory.
375    fn cleanup(&self) {
376        if let Err(e) = std::fs::remove_dir_all(&self.dir) {
377            warn!(
378                "Failed to clean up chunk spill dir {}: {e}",
379                self.dir.display()
380            );
381        }
382    }
383}
384
385impl Drop for ChunkSpill {
386    fn drop(&mut self) {
387        self.cleanup();
388    }
389}
390
391/// Check that the spill directory has enough free space for the spilled chunks.
392///
393/// `file_size` is the source file's byte count. We require
394/// `file_size + 10%` free space to account for self-encryption overhead.
395fn check_disk_space_for_spill(file_size: u64) -> Result<()> {
396    let spill_root = ChunkSpill::spill_root()?;
397
398    // Ensure the root exists so fs2 can query it.
399    std::fs::create_dir_all(&spill_root)?;
400
401    let available = fs2::available_space(&spill_root).map_err(|e| {
402        Error::Io(std::io::Error::new(
403            e.kind(),
404            format!(
405                "failed to query disk space on {}: {e}",
406                spill_root.display()
407            ),
408        ))
409    })?;
410
411    // Use integer arithmetic to avoid f64 precision loss on large file sizes.
412    let headroom = file_size / DISK_SPACE_HEADROOM_PERCENT;
413    let required = file_size.saturating_add(headroom);
414
415    if available < required {
416        let avail_mb = available / (1024 * 1024);
417        let req_mb = required / (1024 * 1024);
418        return Err(Error::InsufficientDiskSpace(format!(
419            "need ~{req_mb} MB in spill dir ({}) but only {avail_mb} MB available",
420            spill_root.display()
421        )));
422    }
423
424    debug!(
425        "Disk space check passed: {available} bytes available, {required} bytes required (spill: {})",
426        spill_root.display()
427    );
428    Ok(())
429}
430
431/// Whether the data map is published to the network for address-based retrieval.
432///
433/// A private upload stores only the data chunks and returns the `DataMap` to
434/// the caller — only someone holding that `DataMap` can reconstruct the file.
435/// A public upload additionally stores the serialized `DataMap` as a chunk on
436/// the network, yielding a single chunk address that anyone can use to
437/// retrieve the `DataMap` (via [`Client::data_map_fetch`]) and then the file.
438#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
439pub enum Visibility {
440    /// Keep the data map local; only the holder can retrieve the file.
441    #[default]
442    Private,
443    /// Publish the data map as a network chunk so anyone with the returned
444    /// address can retrieve and decrypt the file.
445    Public,
446}
447
448/// Estimated cost of uploading a file, returned by
449/// [`Client::estimate_upload_cost`].
450#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
451pub struct UploadCostEstimate {
452    /// Original file size in bytes.
453    pub file_size: u64,
454    /// Number of chunks the file would be split into (data chunks only,
455    /// does not include the DataMap chunk added during public uploads).
456    pub chunk_count: usize,
457    /// Estimated total storage cost in atto (token smallest unit).
458    pub storage_cost_atto: String,
459    /// Estimated gas cost in wei as a string. This is a rough heuristic
460    /// based on chunk count and payment mode, NOT a live gas price query.
461    pub estimated_gas_cost_wei: String,
462    /// Payment mode that would be used.
463    pub payment_mode: PaymentMode,
464}
465
466/// Result of a file upload: the `DataMap` needed to retrieve the file.
467///
468/// Marked `#[non_exhaustive]` so adding a new field in future is not a
469/// breaking change for downstream consumers that construct or pattern-match
470/// on this struct.
471#[derive(Debug, Clone)]
472#[non_exhaustive]
473pub struct FileUploadResult {
474    /// The data map containing chunk metadata for reconstruction.
475    pub data_map: DataMap,
476    /// Number of chunks stored on the network.
477    pub chunks_stored: usize,
478    /// Number of chunks that failed to store. Always 0 for a successful
479    /// upload — partial-failure information is conveyed via
480    /// [`crate::data::Error::PartialUpload`] instead.
481    pub chunks_failed: usize,
482    /// Total number of chunks the upload attempted to store. On full
483    /// success this equals `chunks_stored`.
484    pub total_chunks: usize,
485    /// Which payment mode was actually used (not just requested).
486    pub payment_mode_used: PaymentMode,
487    /// Total storage cost paid in token units (atto). "0" if all chunks already existed.
488    pub storage_cost_atto: String,
489    /// Total gas cost in wei. 0 if no on-chain transactions were made.
490    pub gas_cost_wei: u128,
491    /// Chunk address of the serialized `DataMap`, set only for
492    /// [`Visibility::Public`] uploads. **`Some` means this address is
493    /// retrievable from the network (via [`Client::data_map_fetch`])**, not
494    /// necessarily that *this* upload paid to store it — if the serialized
495    /// `DataMap` hashed to a chunk that was already on the network (same
496    /// file uploaded before; deterministic via self-encryption), the address
497    /// is still returned but no storage payment was made for it.
498    pub data_map_address: Option<[u8; 32]>,
499    /// Sum of chunk-store RPC attempts across the upload
500    /// (`>= chunks_stored` on full success; more if any chunk retried).
501    /// `0` for paths that don't run the wave store loop.
502    pub chunk_attempts_total: usize,
503    /// Per-chunk store wall-clock in ms (length == `chunks_stored` on full
504    /// success, empty for paths that don't run the wave store loop).
505    pub store_durations_ms: Vec<u64>,
506    /// Count of stored chunks that succeeded on each retry round
507    /// (index 0 = first attempt, 1 = first retry, etc.). All zeros for
508    /// paths that don't run the wave store loop.
509    pub retries_histogram: [usize; 4],
510}
511
512/// Payment information for external signing — either wave-batch or merkle.
513#[derive(Debug)]
514pub enum ExternalPaymentInfo {
515    /// Wave-batch: individual (quote_hash, rewards_address, amount) tuples.
516    WaveBatch {
517        /// Chunks ready for payment (needed for finalize).
518        prepared_chunks: Vec<PreparedChunk>,
519        /// Payment intent for external signing.
520        payment_intent: PaymentIntent,
521    },
522    /// Merkle: single on-chain call with depth, pool commitments, timestamp.
523    Merkle {
524        /// The prepared merkle batch (public fields sent to frontend, private fields stay in Rust).
525        prepared_batch: PreparedMerkleBatch,
526        /// Raw chunk contents (needed for upload after payment).
527        chunk_contents: Vec<Bytes>,
528        /// Chunk addresses in order (needed for upload after payment).
529        chunk_addresses: Vec<[u8; 32]>,
530    },
531}
532
533/// Prepared upload ready for external payment.
534///
535/// Contains everything needed to construct the on-chain payment transaction
536/// externally (e.g. via WalletConnect in a desktop app) and then finalize
537/// the upload without a Rust-side wallet.
538///
539/// Note: This struct stays in Rust memory — only the public fields of
540/// `payment_info` are sent to the frontend. `PreparedChunk` contains
541/// non-serializable network types, so the full struct cannot derive `Serialize`.
542///
543/// Marked `#[non_exhaustive]` so adding a new field in future is not a
544/// breaking change for downstream consumers.
545#[derive(Debug)]
546#[non_exhaustive]
547pub struct PreparedUpload {
548    /// The data map for later retrieval.
549    pub data_map: DataMap,
550    /// Payment information — either wave-batch or merkle depending on chunk count.
551    pub payment_info: ExternalPaymentInfo,
552    /// Chunk address of the serialized `DataMap` when this upload was
553    /// prepared with [`Visibility::Public`]. `Some` means the address is
554    /// retrievable on the network after finalization — either because this
555    /// upload paid to store the chunk in `payment_info`, or because the
556    /// chunk was already on the network (deterministic self-encryption).
557    /// Carried through to [`FileUploadResult::data_map_address`].
558    pub data_map_address: Option<[u8; 32]>,
559}
560
561/// Return type for [`spawn_file_encryption`]: chunk receiver, `DataMap` oneshot, join handle.
562type EncryptionChannels = (
563    tokio::sync::mpsc::Receiver<Bytes>,
564    tokio::sync::oneshot::Receiver<DataMap>,
565    tokio::task::JoinHandle<Result<()>>,
566);
567
568/// Spawn a blocking task that streams file encryption through a channel.
569fn spawn_file_encryption(path: PathBuf) -> Result<EncryptionChannels> {
570    let metadata = std::fs::metadata(&path)?;
571    let data_size = usize::try_from(metadata.len())
572        .map_err(|e| Error::Encryption(format!("file size exceeds platform usize: {e}")))?;
573
574    let (chunk_tx, chunk_rx) = tokio::sync::mpsc::channel(2);
575    let (datamap_tx, datamap_rx) = tokio::sync::oneshot::channel();
576
577    let handle = tokio::task::spawn_blocking(move || {
578        let file = std::fs::File::open(&path)?;
579        let mut reader = std::io::BufReader::new(file);
580
581        let read_error: Arc<Mutex<Option<std::io::Error>>> = Arc::new(Mutex::new(None));
582        let read_error_clone = Arc::clone(&read_error);
583
584        let data_iter = std::iter::from_fn(move || {
585            let mut buffer = vec![0u8; 8192];
586            match std::io::Read::read(&mut reader, &mut buffer) {
587                Ok(0) => None,
588                Ok(n) => {
589                    buffer.truncate(n);
590                    Some(Bytes::from(buffer))
591                }
592                Err(e) => {
593                    let mut guard = read_error_clone
594                        .lock()
595                        .unwrap_or_else(|poisoned| poisoned.into_inner());
596                    *guard = Some(e);
597                    None
598                }
599            }
600        });
601
602        let mut stream = stream_encrypt(data_size, data_iter)
603            .map_err(|e| Error::Encryption(format!("stream_encrypt failed: {e}")))?;
604
605        for chunk_result in stream.chunks() {
606            // Check for captured read errors immediately after each chunk.
607            // stream_encrypt sees None (EOF) when a read fails, so it stops
608            // producing chunks. We must detect this before sending the
609            // partial results to avoid uploading a truncated DataMap.
610            {
611                let guard = read_error
612                    .lock()
613                    .unwrap_or_else(|poisoned| poisoned.into_inner());
614                if let Some(ref e) = *guard {
615                    return Err(Error::Io(std::io::Error::new(e.kind(), e.to_string())));
616                }
617            }
618
619            let (_hash, content) = chunk_result
620                .map_err(|e| Error::Encryption(format!("chunk encryption failed: {e}")))?;
621            if chunk_tx.blocking_send(content).is_err() {
622                return Err(Error::Encryption("upload receiver dropped".to_string()));
623            }
624        }
625
626        // Final check: read error after last chunk (stream saw EOF).
627        {
628            let guard = read_error
629                .lock()
630                .unwrap_or_else(|poisoned| poisoned.into_inner());
631            if let Some(ref e) = *guard {
632                return Err(Error::Io(std::io::Error::new(e.kind(), e.to_string())));
633            }
634        }
635
636        let datamap = stream
637            .into_datamap()
638            .ok_or_else(|| Error::Encryption("no DataMap after encryption".to_string()))?;
639        if datamap_tx.send(datamap).is_err() {
640            warn!("DataMap receiver dropped — upload may have been cancelled");
641        }
642        Ok(())
643    });
644
645    Ok((chunk_rx, datamap_rx, handle))
646}
647
648impl Client {
649    /// Upload a file to the network using streaming self-encryption.
650    ///
651    /// Automatically selects merkle batch payment for files that produce
652    /// 64+ chunks (saves gas). Encrypted chunks are spilled to a temp
653    /// directory so peak memory stays at ~256 MB regardless of file size.
654    ///
655    /// # Errors
656    ///
657    /// Returns an error if the file cannot be read, encryption fails,
658    /// or any chunk cannot be stored.
659    pub async fn file_upload(&self, path: &Path) -> Result<FileUploadResult> {
660        self.file_upload_with_mode(path, PaymentMode::Auto).await
661    }
662
663    /// Estimate the cost of uploading a file without actually uploading.
664    ///
665    /// Encrypts the file to determine chunk count and sizes, then requests
666    /// a single quote from the network for a representative chunk. The
667    /// per-chunk price is extrapolated to the total chunk count.
668    ///
669    /// The estimate is fast (~2-5s) and does not require a wallet. Spilled
670    /// chunks are cleaned up automatically when the function returns.
671    ///
672    /// Gas cost is an advisory heuristic, not a live gas-oracle query. It is
673    /// derived from realistic per-transaction budgets (`GAS_PER_WAVE_TX`,
674    /// `GAS_PER_MERKLE_TX`) priced at `ARBITRUM_GAS_PRICE_WEI`. Real gas
675    /// varies with network conditions.
676    ///
677    /// If the first sampled chunk is already stored on the network, the
678    /// function retries with subsequent chunk addresses (up to
679    /// `ESTIMATE_SAMPLE_CAP`). If every sampled address reports stored,
680    /// a [`Error::CostEstimationInconclusive`] is returned so callers can
681    /// decide how to react rather than trust a bogus "free" estimate. Only
682    /// when every address in the file is stored do we return a zero-cost
683    /// estimate.
684    ///
685    /// # Errors
686    ///
687    /// Returns an error if the file cannot be read, encryption fails,
688    /// the network cannot provide a quote, or every sampled chunk is
689    /// already stored ([`Error::CostEstimationInconclusive`]).
690    pub async fn estimate_upload_cost(
691        &self,
692        path: &Path,
693        mode: PaymentMode,
694        progress: Option<mpsc::Sender<UploadEvent>>,
695    ) -> Result<UploadCostEstimate> {
696        let file_size = std::fs::metadata(path).map_err(Error::Io)?.len();
697
698        if file_size < 3 {
699            return Err(Error::InvalidData(
700                "File too small: self-encryption requires at least 3 bytes".into(),
701            ));
702        }
703
704        check_disk_space_for_spill(file_size)?;
705
706        info!(
707            "Estimating upload cost for {} ({file_size} bytes)",
708            path.display()
709        );
710
711        let (spill, _data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
712        let chunk_count = spill.len();
713
714        if let Some(ref tx) = progress {
715            let _ = tx
716                .send(UploadEvent::Encrypted {
717                    total_chunks: chunk_count,
718                })
719                .await;
720        }
721
722        info!("Encrypted into {chunk_count} chunks, requesting quote");
723
724        // Sample up to ESTIMATE_SAMPLE_CAP distinct chunk addresses. A single
725        // AlreadyStored result says nothing about the rest of the file — the
726        // first chunk is often a DataMap-adjacent chunk that collides with
727        // prior uploads even when 99% of the file is new. Only treat the
728        // whole file as "fully stored" when every sample comes back stored.
729        let sample_limit = spill.addresses.len().min(ESTIMATE_SAMPLE_CAP);
730        let mut sampled = 0usize;
731        let mut all_already_stored = true;
732        let mut quotes_opt: Option<Vec<QuoteEntry>> = None;
733
734        for addr in spill.addresses.iter().take(sample_limit) {
735            sampled += 1;
736            let chunk_bytes = spill.read_chunk(addr)?;
737            let data_size = u64::try_from(chunk_bytes.len())
738                .map_err(|e| Error::InvalidData(format!("chunk size too large: {e}")))?;
739            match self
740                .get_store_quotes(addr, data_size, DATA_TYPE_CHUNK)
741                .await
742            {
743                Ok(q) => {
744                    quotes_opt = Some(q);
745                    all_already_stored = false;
746                    break;
747                }
748                Err(Error::AlreadyStored) => {
749                    debug!(
750                        "Sample chunk {} already stored; trying next address ({sampled}/{sample_limit})",
751                        hex::encode(addr)
752                    );
753                    continue;
754                }
755                Err(e) => return Err(e),
756            }
757        }
758
759        let uses_merkle = should_use_merkle(chunk_count, mode);
760
761        let quotes = match quotes_opt {
762            Some(q) => q,
763            None if all_already_stored && sampled == chunk_count => {
764                // Every address in the file was sampled and every one is
765                // already on the network — returning a zero-cost estimate is
766                // accurate in this case.
767                info!("All {chunk_count} chunks already stored; returning zero-cost estimate");
768                return Ok(UploadCostEstimate {
769                    file_size,
770                    chunk_count,
771                    storage_cost_atto: "0".into(),
772                    estimated_gas_cost_wei: "0".into(),
773                    payment_mode: if uses_merkle {
774                        PaymentMode::Merkle
775                    } else {
776                        PaymentMode::Single
777                    },
778                });
779            }
780            None => {
781                return Err(Error::CostEstimationInconclusive(format!(
782                    "sampled {sampled} chunk addresses out of {chunk_count} and every \
783                     one reported AlreadyStored; cannot infer a representative price \
784                     for the remaining chunks"
785                )));
786            }
787        };
788
789        // Use the median price × 3 (matches SingleNodePayment::from_quotes
790        // which pays 3x the median to incentivize reliable storage).
791        let mut prices: Vec<Amount> = quotes.iter().map(|(_, _, _, price)| *price).collect();
792        prices.sort();
793        let median_price = prices
794            .get(prices.len() / 2)
795            .copied()
796            .unwrap_or(Amount::ZERO);
797        let per_chunk_cost = median_price * Amount::from(3u64);
798
799        let chunk_count_u64 = u64::try_from(chunk_count).unwrap_or(u64::MAX);
800        let total_storage = per_chunk_cost * Amount::from(chunk_count_u64);
801
802        // Estimate gas cost from realistic per-transaction budgets rather
803        // than a flat per-chunk or per-wave number.
804        //
805        // - Single mode: `batch_pay` packs up to UPLOAD_WAVE_SIZE chunks'
806        //   close-group quotes into one `pay_for_quotes` call on Arbitrum.
807        //   The dominant cost is one SSTORE per entry plus base tx overhead,
808        //   so we use GAS_PER_WAVE_TX (≈1.5M) as a conservative upper bound
809        //   on a full wave and multiply by the number of waves. The previous
810        //   per-wave figure of 150k was closer to a single-entry transfer
811        //   and understated cost by 5–10x for full waves.
812        // - Merkle mode: one tx per sub-batch that verifies a merkle tree
813        //   and posts a pool commitment (GAS_PER_MERKLE_TX ≈ 500k each).
814        //
815        // Gas is priced at ARBITRUM_GAS_PRICE_WEI (~0.1 gwei, a typical
816        // Arbitrum baseline). Treat the result as advisory, not a commitment.
817        let waves = u128::try_from(chunk_count.div_ceil(UPLOAD_WAVE_SIZE)).unwrap_or(u128::MAX);
818        let merkle_batches = u128::try_from(chunk_count.div_ceil(MAX_LEAVES)).unwrap_or(u128::MAX);
819        let estimated_gas: u128 = if uses_merkle {
820            merkle_batches
821                .saturating_mul(GAS_PER_MERKLE_TX)
822                .saturating_mul(ARBITRUM_GAS_PRICE_WEI)
823        } else {
824            waves
825                .saturating_mul(GAS_PER_WAVE_TX)
826                .saturating_mul(ARBITRUM_GAS_PRICE_WEI)
827        };
828
829        info!(
830            "Estimate: {chunk_count} chunks, storage={total_storage} atto, gas~={estimated_gas} wei"
831        );
832
833        Ok(UploadCostEstimate {
834            file_size,
835            chunk_count,
836            storage_cost_atto: total_storage.to_string(),
837            estimated_gas_cost_wei: estimated_gas.to_string(),
838            payment_mode: if uses_merkle {
839                PaymentMode::Merkle
840            } else {
841                PaymentMode::Single
842            },
843        })
844    }
845
846    /// Phase 1 of external-signer upload: encrypt file and prepare chunks.
847    ///
848    /// Equivalent to [`Client::file_prepare_upload_with_visibility`] with
849    /// [`Visibility::Private`] — see that method for details.
850    pub async fn file_prepare_upload(&self, path: &Path) -> Result<PreparedUpload> {
851        self.file_prepare_upload_with_progress(path, Visibility::Private, None)
852            .await
853    }
854
855    /// Phase 1 of external-signer upload with explicit [`Visibility`] control.
856    ///
857    /// Equivalent to [`Client::file_prepare_upload_with_progress`] with
858    /// `progress: None` — see that method for details.
859    pub async fn file_prepare_upload_with_visibility(
860        &self,
861        path: &Path,
862        visibility: Visibility,
863    ) -> Result<PreparedUpload> {
864        self.file_prepare_upload_with_progress(path, visibility, None)
865            .await
866    }
867
868    /// Phase 1 of external-signer upload with progress events.
869    ///
870    /// Requires an EVM network (for contract price queries) but NOT a wallet.
871    /// Returns a [`PreparedUpload`] containing the data map, prepared chunks,
872    /// and a [`PaymentIntent`] that the external signer uses to construct
873    /// and submit the on-chain payment transaction.
874    ///
875    /// When `visibility` is [`Visibility::Public`], the serialized `DataMap`
876    /// is bundled into the payment batch as an additional chunk and its
877    /// address is recorded on the returned [`PreparedUpload`]. After
878    /// [`Client::finalize_upload`] (or `_merkle`) succeeds, that address is
879    /// surfaced via [`FileUploadResult::data_map_address`] so the uploader
880    /// can share a single address from which anyone can retrieve the file.
881    ///
882    /// When `progress` is `Some`, [`UploadEvent`]s are emitted on the channel
883    /// during encryption ([`UploadEvent::Encrypting`] / [`UploadEvent::Encrypted`])
884    /// and per-chunk quoting ([`UploadEvent::ChunkQuoted`]). Storage events are
885    /// emitted later by [`Client::finalize_upload_with_progress`] /
886    /// [`Client::finalize_upload_merkle_with_progress`].
887    ///
888    /// **Memory note:** Encryption uses disk spilling for bounded memory, but
889    /// the returned [`PreparedUpload`] holds all chunk content in memory (each
890    /// [`PreparedChunk`] contains a `Bytes` with the full chunk data). This is
891    /// inherent to the two-phase external-signer protocol — the chunks must
892    /// stay in memory until [`Client::finalize_upload`] stores them. For very
893    /// large files, prefer [`Client::file_upload`] which streams directly.
894    ///
895    /// # Errors
896    ///
897    /// Returns an error if there is insufficient disk space, the file cannot
898    /// be read, encryption fails, or quote collection fails.
899    pub async fn file_prepare_upload_with_progress(
900        &self,
901        path: &Path,
902        visibility: Visibility,
903        progress: Option<mpsc::Sender<UploadEvent>>,
904    ) -> Result<PreparedUpload> {
905        debug!(
906            "Preparing file upload for external signing (visibility={visibility:?}): {}",
907            path.display()
908        );
909
910        let file_size = std::fs::metadata(path)?.len();
911        check_disk_space_for_spill(file_size)?;
912
913        let (spill, data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
914
915        info!(
916            "Encrypted {} into {} chunks for external signing (spilled to disk)",
917            path.display(),
918            spill.len()
919        );
920
921        // Read each chunk from disk and collect quotes concurrently.
922        // Note: all PreparedChunks accumulate in memory because the external-signer
923        // protocol requires them for finalize_upload. NOT memory-bounded for large files.
924        let mut chunk_data: Vec<Bytes> = spill
925            .addresses
926            .iter()
927            .map(|addr| spill.read_chunk(addr))
928            .collect::<std::result::Result<Vec<_>, _>>()?;
929
930        // For public uploads, bundle the serialized DataMap as an extra chunk
931        // in the same payment batch. This lets the external signer pay for
932        // the data chunks and the DataMap chunk in one flow, and lets the
933        // finalize step return the DataMap's chunk address as the shareable
934        // retrieval address.
935        let data_map_address = match visibility {
936            Visibility::Private => None,
937            Visibility::Public => {
938                let serialized = rmp_serde::to_vec(&data_map).map_err(|e| {
939                    Error::Serialization(format!("Failed to serialize DataMap: {e}"))
940                })?;
941                let bytes = Bytes::from(serialized);
942                let address = compute_address(&bytes);
943                info!(
944                    "Public upload: bundling DataMap chunk ({} bytes) at address {}",
945                    bytes.len(),
946                    hex::encode(address)
947                );
948                chunk_data.push(bytes);
949                Some(address)
950            }
951        };
952
953        let chunk_count = chunk_data.len();
954
955        if let Some(ref tx) = progress {
956            let _ = tx
957                .send(UploadEvent::Encrypted {
958                    total_chunks: chunk_count,
959                })
960                .await;
961        }
962
963        let payment_info = if should_use_merkle(chunk_count, PaymentMode::Auto) {
964            // Merkle path: build tree, collect candidate pools, return for external payment.
965            info!("Using merkle batch preparation for {chunk_count} file chunks");
966
967            let addresses: Vec<[u8; 32]> = chunk_data.iter().map(|c| compute_address(c)).collect();
968
969            let avg_size =
970                chunk_data.iter().map(bytes::Bytes::len).sum::<usize>() / chunk_count.max(1);
971            let avg_size_u64 = u64::try_from(avg_size).unwrap_or(0);
972
973            let prepared_batch = self
974                .prepare_merkle_batch_external(&addresses, DATA_TYPE_CHUNK, avg_size_u64)
975                .await?;
976
977            info!(
978                "File prepared for external merkle signing: {} chunks, depth={} ({})",
979                chunk_count,
980                prepared_batch.depth,
981                path.display()
982            );
983
984            ExternalPaymentInfo::Merkle {
985                prepared_batch,
986                chunk_contents: chunk_data,
987                chunk_addresses: addresses,
988            }
989        } else {
990            // Wave-batch path: collect quotes per chunk concurrently, emitting
991            // a `ChunkQuoted` event after each completion so callers can drive
992            // a progress bar through the (slow) quoting phase.
993            // Clamp fan-out to chunk_count so a partial wave doesn't
994            // pay for slots it can't fill (see PERF-RESULTS.md).
995            let quote_limiter = self.controller().quote.clone();
996            let quote_concurrency = quote_limiter.current().min(chunk_count.max(1));
997            let mut quote_stream = stream::iter(chunk_data)
998                .map(|content| {
999                    let limiter = quote_limiter.clone();
1000                    async move {
1001                        observe_op(
1002                            &limiter,
1003                            || async move { self.prepare_chunk_payment(content).await },
1004                            classify_error,
1005                        )
1006                        .await
1007                    }
1008                })
1009                .buffer_unordered(quote_concurrency);
1010
1011            let mut prepared_chunks = Vec::with_capacity(spill.len());
1012            let mut quoted = 0usize;
1013            while let Some(result) = quote_stream.next().await {
1014                if let Some(prepared) = result? {
1015                    prepared_chunks.push(prepared);
1016                }
1017                quoted += 1;
1018                if let Some(ref tx) = progress {
1019                    let _ = tx.try_send(UploadEvent::ChunkQuoted {
1020                        quoted,
1021                        total: chunk_count,
1022                    });
1023                }
1024            }
1025
1026            // Surface the "DataMap chunk was already on the network" case
1027            // so debugging "why is data_map_address set but no storage cost
1028            // appears for it?" doesn't require reading the source. See the
1029            // `data_map_address` doc comment for why this is still a valid
1030            // `Some(addr)` outcome.
1031            if let Some(addr) = data_map_address {
1032                if !prepared_chunks.iter().any(|c| c.address == addr) {
1033                    info!(
1034                        "Public upload: DataMap chunk {} was already stored \
1035                         on the network — address is retrievable without a \
1036                         new payment",
1037                        hex::encode(addr)
1038                    );
1039                }
1040            }
1041
1042            let payment_intent = PaymentIntent::from_prepared_chunks(&prepared_chunks);
1043
1044            info!(
1045                "File prepared for external signing: {} chunks, total {} atto ({})",
1046                prepared_chunks.len(),
1047                payment_intent.total_amount,
1048                path.display()
1049            );
1050
1051            ExternalPaymentInfo::WaveBatch {
1052                prepared_chunks,
1053                payment_intent,
1054            }
1055        };
1056
1057        Ok(PreparedUpload {
1058            data_map,
1059            payment_info,
1060            data_map_address,
1061        })
1062    }
1063
1064    /// Phase 2 of external-signer upload (wave-batch): finalize with externally-signed tx hashes.
1065    ///
1066    /// Takes a [`PreparedUpload`] that used wave-batch payment and a map
1067    /// of `quote_hash -> tx_hash` provided by the external signer after on-chain
1068    /// payment. Builds payment proofs and stores chunks on the network.
1069    ///
1070    /// # Errors
1071    ///
1072    /// Returns an error if the prepared upload used merkle payment (use
1073    /// [`Client::finalize_upload_merkle`] instead), proof construction fails,
1074    /// or any chunk cannot be stored.
1075    pub async fn finalize_upload(
1076        &self,
1077        prepared: PreparedUpload,
1078        tx_hash_map: &HashMap<QuoteHash, TxHash>,
1079    ) -> Result<FileUploadResult> {
1080        self.finalize_upload_with_progress(prepared, tx_hash_map, None)
1081            .await
1082    }
1083
1084    /// Phase 2 of external-signer upload (wave-batch) with progress events.
1085    ///
1086    /// Same as [`Client::finalize_upload`] but emits [`UploadEvent::ChunkStored`]
1087    /// on the provided channel as each chunk is successfully stored.
1088    ///
1089    /// # Errors
1090    ///
1091    /// Same as [`Client::finalize_upload`].
1092    pub async fn finalize_upload_with_progress(
1093        &self,
1094        prepared: PreparedUpload,
1095        tx_hash_map: &HashMap<QuoteHash, TxHash>,
1096        progress: Option<mpsc::Sender<UploadEvent>>,
1097    ) -> Result<FileUploadResult> {
1098        let data_map_address = prepared.data_map_address;
1099        match prepared.payment_info {
1100            ExternalPaymentInfo::WaveBatch {
1101                prepared_chunks,
1102                payment_intent: _,
1103            } => {
1104                let total_chunks = prepared_chunks.len();
1105                let paid_chunks = finalize_batch_payment(prepared_chunks, tx_hash_map)?;
1106                let wave_result = self
1107                    .store_paid_chunks_with_events(paid_chunks, progress.as_ref(), 0, total_chunks)
1108                    .await;
1109                if !wave_result.failed.is_empty() {
1110                    let failed_count = wave_result.failed.len();
1111                    let stored_count = wave_result.stored.len();
1112                    return Err(Error::PartialUpload {
1113                        stored: wave_result.stored.clone(),
1114                        stored_count,
1115                        failed: wave_result.failed,
1116                        failed_count,
1117                        total_chunks: stored_count + failed_count,
1118                        reason: "finalize_upload: chunk storage failed after retries".into(),
1119                    });
1120                }
1121                let chunks_stored = wave_result.stored.len();
1122
1123                info!("External-signer upload finalized: {chunks_stored} chunks stored");
1124
1125                let mut stats = WaveAggregateStats::default();
1126                stats.absorb(&wave_result);
1127
1128                Ok(FileUploadResult {
1129                    data_map: prepared.data_map,
1130                    chunks_stored,
1131                    chunks_failed: 0,
1132                    total_chunks: chunks_stored,
1133                    payment_mode_used: PaymentMode::Single,
1134                    storage_cost_atto: "0".into(),
1135                    gas_cost_wei: 0,
1136                    data_map_address,
1137                    chunk_attempts_total: stats.chunk_attempts_total,
1138                    store_durations_ms: stats.store_durations_ms,
1139                    retries_histogram: stats.retries_histogram,
1140                })
1141            }
1142            ExternalPaymentInfo::Merkle { .. } => Err(Error::Payment(
1143                "Cannot finalize merkle upload with wave-batch tx hashes. \
1144                 Use finalize_upload_merkle() instead."
1145                    .to_string(),
1146            )),
1147        }
1148    }
1149
1150    /// Phase 2 of external-signer upload (merkle): finalize with winner pool hash.
1151    ///
1152    /// Takes a [`PreparedUpload`] that used merkle payment and the `winner_pool_hash`
1153    /// returned by the on-chain merkle payment transaction. Generates proofs and
1154    /// stores chunks on the network.
1155    ///
1156    /// # Errors
1157    ///
1158    /// Returns an error if the prepared upload used wave-batch payment (use
1159    /// [`Client::finalize_upload`] instead), proof generation fails,
1160    /// or any chunk cannot be stored.
1161    pub async fn finalize_upload_merkle(
1162        &self,
1163        prepared: PreparedUpload,
1164        winner_pool_hash: [u8; 32],
1165    ) -> Result<FileUploadResult> {
1166        self.finalize_upload_merkle_with_progress(prepared, winner_pool_hash, None)
1167            .await
1168    }
1169
1170    /// Phase 2 of external-signer upload (merkle) with progress events.
1171    ///
1172    /// Same as [`Client::finalize_upload_merkle`] but emits [`UploadEvent::ChunkStored`]
1173    /// on the provided channel as each chunk is successfully stored.
1174    ///
1175    /// # Errors
1176    ///
1177    /// Same as [`Client::finalize_upload_merkle`].
1178    pub async fn finalize_upload_merkle_with_progress(
1179        &self,
1180        prepared: PreparedUpload,
1181        winner_pool_hash: [u8; 32],
1182        progress: Option<mpsc::Sender<UploadEvent>>,
1183    ) -> Result<FileUploadResult> {
1184        let data_map_address = prepared.data_map_address;
1185        match prepared.payment_info {
1186            ExternalPaymentInfo::Merkle {
1187                prepared_batch,
1188                chunk_contents,
1189                chunk_addresses,
1190            } => {
1191                let batch_result = finalize_merkle_batch(prepared_batch, winner_pool_hash)?;
1192                let (chunks_stored, stats) = self
1193                    .merkle_upload_chunks(
1194                        chunk_contents,
1195                        chunk_addresses,
1196                        &batch_result,
1197                        progress.as_ref(),
1198                    )
1199                    .await?;
1200
1201                info!("External-signer merkle upload finalized: {chunks_stored} chunks stored");
1202
1203                Ok(FileUploadResult {
1204                    data_map: prepared.data_map,
1205                    chunks_stored,
1206                    chunks_failed: 0,
1207                    total_chunks: chunks_stored,
1208                    payment_mode_used: PaymentMode::Merkle,
1209                    storage_cost_atto: "0".into(),
1210                    gas_cost_wei: 0,
1211                    data_map_address,
1212                    chunk_attempts_total: stats.chunk_attempts_total,
1213                    store_durations_ms: stats.store_durations_ms,
1214                    retries_histogram: stats.retries_histogram,
1215                })
1216            }
1217            ExternalPaymentInfo::WaveBatch { .. } => Err(Error::Payment(
1218                "Cannot finalize wave-batch upload with merkle winner hash. \
1219                 Use finalize_upload() instead."
1220                    .to_string(),
1221            )),
1222        }
1223    }
1224
1225    /// Upload a file with a specific payment mode.
1226    ///
1227    /// Before encryption, checks that the temp directory has enough free
1228    /// disk space for the spilled chunks (~1.1× source file size).
1229    ///
1230    /// Encrypted chunks are spilled to a temp directory during encryption
1231    /// so that only their 32-byte addresses stay in memory. At upload time,
1232    /// chunks are read back one wave at a time (~64 × 4 MB ≈ 256 MB peak).
1233    ///
1234    /// # Errors
1235    ///
1236    /// Returns an error if there is insufficient disk space, the file cannot
1237    /// be read, encryption fails, or any chunk cannot be stored.
1238    #[allow(clippy::too_many_lines)]
1239    pub async fn file_upload_with_mode(
1240        &self,
1241        path: &Path,
1242        mode: PaymentMode,
1243    ) -> Result<FileUploadResult> {
1244        self.file_upload_with_progress(path, mode, None).await
1245    }
1246
1247    /// Upload a file with progress events sent to the given channel.
1248    ///
1249    /// Same as [`Client::file_upload_with_mode`] but sends [`UploadEvent`]s to the
1250    /// provided channel for UI progress feedback.
1251    #[allow(clippy::too_many_lines)]
1252    pub async fn file_upload_with_progress(
1253        &self,
1254        path: &Path,
1255        mode: PaymentMode,
1256        progress: Option<mpsc::Sender<UploadEvent>>,
1257    ) -> Result<FileUploadResult> {
1258        debug!(
1259            "Streaming file upload with mode {mode:?}: {}",
1260            path.display()
1261        );
1262
1263        // Pre-flight: verify enough temp disk space for the chunk spill.
1264        let file_size = std::fs::metadata(path)?.len();
1265        check_disk_space_for_spill(file_size)?;
1266
1267        // Phase 1: Encrypt file and spill chunks to temp directory.
1268        // Only 32-byte addresses stay in memory — chunk data lives on disk.
1269        let (spill, data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
1270
1271        let chunk_count = spill.len();
1272        info!(
1273            "Encrypted {} into {chunk_count} chunks (spilled to disk)",
1274            path.display()
1275        );
1276        if let Some(ref tx) = progress {
1277            let _ = tx
1278                .send(UploadEvent::Encrypted {
1279                    total_chunks: chunk_count,
1280                })
1281                .await;
1282        }
1283
1284        // Phase 2: Decide payment mode and upload in waves from disk.
1285        //
1286        // For the merkle path, attempt to resume from a cached
1287        // receipt before paying again. The cache is keyed by the
1288        // source file path; a successful upload deletes the cache so
1289        // a subsequent re-upload of the same path will pay anew.
1290        let file_path_key = path.display().to_string();
1291        let (chunks_stored, actual_mode, storage_cost_atto, gas_cost_wei, stats) = if self
1292            .should_use_merkle(chunk_count, mode)
1293        {
1294            info!("Using merkle batch payment for {chunk_count} file chunks");
1295
1296            let batch_result = if let Some((_cache_path, cached)) =
1297                crate::data::client::cached_merkle::try_load_for_file(&file_path_key)
1298            {
1299                // Validate the cache matches this upload. If the
1300                // file was edited between attempts the cached
1301                // proofs would no longer be valid for the new
1302                // chunk addresses; in that case drop the cache
1303                // and pay fresh.
1304                let addresses_match = spill
1305                    .addresses
1306                    .iter()
1307                    .all(|addr| cached.proofs.contains_key(addr));
1308                if addresses_match && cached.proofs.len() == chunk_count {
1309                    info!(
1310                        "Skipping merkle payment phase; resuming with \
1311                             cached proofs ({} chunks)",
1312                        cached.proofs.len()
1313                    );
1314                    Ok(cached)
1315                } else {
1316                    info!(
1317                        "Cached merkle receipt does not match current file \
1318                             content (cached={}, file={chunk_count}). \
1319                             Discarding cache and paying fresh.",
1320                        cached.proofs.len()
1321                    );
1322                    crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
1323                    self.pay_for_merkle_batch(
1324                        &spill.addresses,
1325                        DATA_TYPE_CHUNK,
1326                        spill.avg_chunk_size(),
1327                    )
1328                    .await
1329                    .inspect(|result| {
1330                        crate::data::client::cached_merkle::try_save(&file_path_key, result);
1331                    })
1332                }
1333            } else {
1334                self.pay_for_merkle_batch(&spill.addresses, DATA_TYPE_CHUNK, spill.avg_chunk_size())
1335                    .await
1336                    .inspect(|result| {
1337                        // Save BEFORE the store phase so a crash
1338                        // mid-upload leaves a resumable receipt.
1339                        crate::data::client::cached_merkle::try_save(&file_path_key, result);
1340                    })
1341            };
1342
1343            let batch_result = match batch_result {
1344                Ok(result) => result,
1345                Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => {
1346                    info!("Merkle needs more peers ({msg}), falling back to wave-batch");
1347                    let (stored, sc, gc, fb_stats) =
1348                        self.upload_waves_single(&spill, progress.as_ref()).await?;
1349                    return Ok(FileUploadResult {
1350                        data_map,
1351                        chunks_stored: stored,
1352                        chunks_failed: 0,
1353                        total_chunks: chunk_count,
1354                        payment_mode_used: PaymentMode::Single,
1355                        storage_cost_atto: sc,
1356                        gas_cost_wei: gc,
1357                        data_map_address: None,
1358                        chunk_attempts_total: fb_stats.chunk_attempts_total,
1359                        store_durations_ms: fb_stats.store_durations_ms,
1360                        retries_histogram: fb_stats.retries_histogram,
1361                    });
1362                }
1363                Err(e) => return Err(e),
1364            };
1365
1366            let (stored, sc, gc, stats) = self
1367                .upload_waves_merkle(&spill, &batch_result, progress.as_ref())
1368                .await?;
1369            // Upload succeeded end-to-end; the cached receipt is
1370            // no longer needed.
1371            crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
1372            (stored, PaymentMode::Merkle, sc, gc, stats)
1373        } else {
1374            let (stored, sc, gc, stats) =
1375                self.upload_waves_single(&spill, progress.as_ref()).await?;
1376            (stored, PaymentMode::Single, sc, gc, stats)
1377        };
1378
1379        info!(
1380            "File uploaded with {actual_mode:?}: {chunks_stored} chunks stored ({})",
1381            path.display()
1382        );
1383
1384        Ok(FileUploadResult {
1385            data_map,
1386            chunks_stored,
1387            chunks_failed: 0,
1388            total_chunks: chunk_count,
1389            payment_mode_used: actual_mode,
1390            storage_cost_atto,
1391            gas_cost_wei,
1392            data_map_address: None,
1393            chunk_attempts_total: stats.chunk_attempts_total,
1394            store_durations_ms: stats.store_durations_ms,
1395            retries_histogram: stats.retries_histogram,
1396        })
1397    }
1398
1399    /// Encrypt a file and spill chunks to a temp directory.
1400    ///
1401    /// Logs progress every 100 chunks so users get feedback during
1402    /// multi-GB encryptions.
1403    ///
1404    /// Returns the spill buffer (addresses on disk) and the `DataMap`.
1405    async fn encrypt_file_to_spill(
1406        &self,
1407        path: &Path,
1408        progress: Option<&mpsc::Sender<UploadEvent>>,
1409    ) -> Result<(ChunkSpill, DataMap)> {
1410        let (mut chunk_rx, datamap_rx, handle) = spawn_file_encryption(path.to_path_buf())?;
1411
1412        let mut spill = ChunkSpill::new()?;
1413        while let Some(content) = chunk_rx.recv().await {
1414            spill.push(&content)?;
1415            let chunks_done = spill.len();
1416            if let Some(tx) = progress {
1417                if chunks_done.is_multiple_of(10) {
1418                    let _ = tx.send(UploadEvent::Encrypting { chunks_done }).await;
1419                }
1420            }
1421            if chunks_done % 100 == 0 {
1422                let mb = spill.total_bytes() / (1024 * 1024);
1423                info!(
1424                    "Encryption progress: {chunks_done} chunks spilled ({mb} MB) — {}",
1425                    path.display()
1426                );
1427            }
1428        }
1429
1430        // Await encryption completion to catch errors before paying.
1431        handle
1432            .await
1433            .map_err(|e| Error::Encryption(format!("encryption task panicked: {e}")))?
1434            .map_err(|e| Error::Encryption(format!("encryption failed: {e}")))?;
1435
1436        let data_map = datamap_rx
1437            .await
1438            .map_err(|_| Error::Encryption("no DataMap from encryption thread".to_string()))?;
1439
1440        Ok((spill, data_map))
1441    }
1442
1443    /// Upload chunks from a spill using wave-based per-chunk (single) payments.
1444    ///
1445    /// Reads one wave at a time from disk, prepares quotes, pays, and stores.
1446    /// Peak memory: ~`UPLOAD_WAVE_SIZE × MAX_CHUNK_SIZE` (~256 MB).
1447    ///
1448    /// Returns `(chunks_stored, storage_cost_atto, gas_cost_wei)`.
1449    async fn upload_waves_single(
1450        &self,
1451        spill: &ChunkSpill,
1452        progress: Option<&mpsc::Sender<UploadEvent>>,
1453    ) -> Result<(usize, String, u128, WaveAggregateStats)> {
1454        let mut total_stored = 0usize;
1455        let mut total_storage = Amount::ZERO;
1456        let mut total_gas: u128 = 0;
1457        let mut agg_stats = WaveAggregateStats::default();
1458        let total_chunks = spill.len();
1459        let waves: Vec<&[[u8; 32]]> = spill.waves().collect();
1460        let wave_count = waves.len();
1461
1462        for (wave_idx, wave_addrs) in waves.into_iter().enumerate() {
1463            let wave_num = wave_idx + 1;
1464            let wave_data: Vec<Bytes> = wave_addrs
1465                .iter()
1466                .map(|addr| spill.read_chunk(addr))
1467                .collect::<Result<Vec<_>>>()?;
1468
1469            info!(
1470                "Wave {wave_num}/{wave_count}: quoting {} chunks — {total_stored}/{total_chunks} stored so far",
1471                wave_data.len()
1472            );
1473            if let Some(tx) = progress {
1474                let _ = tx
1475                    .send(UploadEvent::QuotingChunks {
1476                        wave: wave_num,
1477                        total_waves: wave_count,
1478                        chunks_in_wave: wave_data.len(),
1479                    })
1480                    .await;
1481            }
1482            let (addresses, wave_storage, wave_gas, wave_stats) = self
1483                .batch_upload_chunks_with_events(wave_data, progress, total_stored, total_chunks)
1484                .await?;
1485            total_stored += addresses.len();
1486            if let Ok(cost) = wave_storage.parse::<Amount>() {
1487                total_storage += cost;
1488            }
1489            total_gas = total_gas.saturating_add(wave_gas);
1490            // Merge per-call stats (each call already aggregates across the
1491            // waves it ran internally, so a simple sum/extend is correct).
1492            agg_stats.chunk_attempts_total = agg_stats
1493                .chunk_attempts_total
1494                .saturating_add(wave_stats.chunk_attempts_total);
1495            agg_stats
1496                .store_durations_ms
1497                .extend(wave_stats.store_durations_ms);
1498            for (slot, count) in agg_stats
1499                .retries_histogram
1500                .iter_mut()
1501                .zip(wave_stats.retries_histogram.iter())
1502            {
1503                *slot = slot.saturating_add(*count);
1504            }
1505            if let Some(tx) = progress {
1506                let _ = tx
1507                    .send(UploadEvent::WaveComplete {
1508                        wave: wave_num,
1509                        total_waves: wave_count,
1510                        stored_so_far: total_stored,
1511                        total: total_chunks,
1512                    })
1513                    .await;
1514            }
1515        }
1516
1517        Ok((
1518            total_stored,
1519            total_storage.to_string(),
1520            total_gas,
1521            agg_stats,
1522        ))
1523    }
1524
1525    /// Upload chunks from a spill using pre-computed merkle proofs.
1526    ///
1527    /// Reads one wave at a time from disk, pairs each chunk with its proof,
1528    /// and uploads concurrently. Peak memory: ~`UPLOAD_WAVE_SIZE × MAX_CHUNK_SIZE`.
1529    ///
1530    /// Returns `(chunks_stored, storage_cost_atto, gas_cost_wei)`.
1531    /// Costs come from the `batch_result` which was populated during payment.
1532    async fn upload_waves_merkle(
1533        &self,
1534        spill: &ChunkSpill,
1535        batch_result: &MerkleBatchPaymentResult,
1536        progress: Option<&mpsc::Sender<UploadEvent>>,
1537    ) -> Result<(usize, String, u128, WaveAggregateStats)> {
1538        let mut total_stored = 0usize;
1539        let total_chunks = spill.len();
1540        let waves: Vec<&[[u8; 32]]> = spill.waves().collect();
1541        let wave_count = waves.len();
1542        let mut stored_addresses: Vec<[u8; 32]> = Vec::new();
1543        let mut agg_stats = WaveAggregateStats::default();
1544
1545        for (wave_idx, wave_addrs) in waves.into_iter().enumerate() {
1546            let wave_num = wave_idx + 1;
1547            let wave = spill.read_wave(wave_addrs)?;
1548
1549            info!(
1550                "Wave {wave_num}/{wave_count}: storing {} chunks (merkle) — {total_stored}/{total_chunks} stored so far",
1551                wave.len()
1552            );
1553
1554            let store_limiter = self.controller().store.clone();
1555            // Clamp fan-out to wave size — partial last wave should
1556            // not pay for extra slots (see PERF-RESULTS.md).
1557            let store_concurrency = store_limiter.current().min(wave.len().max(1));
1558            let mut upload_stream = stream::iter(wave.into_iter().map(|(content, addr)| {
1559                let proof_bytes = batch_result.proofs.get(&addr).cloned();
1560                let limiter = store_limiter.clone();
1561                async move {
1562                    let started = std::time::Instant::now();
1563                    let proof = proof_bytes.ok_or_else(|| {
1564                        (
1565                            addr,
1566                            Error::Payment(format!(
1567                                "Missing merkle proof for chunk {}",
1568                                hex::encode(addr)
1569                            )),
1570                            started,
1571                        )
1572                    })?;
1573                    let peers = self
1574                        .close_group_peers(&addr)
1575                        .await
1576                        .map_err(|e| (addr, e, started))?;
1577                    observe_op(
1578                        &limiter,
1579                        || async move {
1580                            self.chunk_put_to_close_group(content, proof, &peers).await
1581                        },
1582                        classify_error,
1583                    )
1584                    .await
1585                    .map(|_| (addr, started))
1586                    .map_err(|e| (addr, e, started))
1587                }
1588            }))
1589            .buffer_unordered(store_concurrency);
1590
1591            while let Some(result) = upload_stream.next().await {
1592                match result {
1593                    Ok((addr, started)) => {
1594                        let duration_ms =
1595                            u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX);
1596                        agg_stats.store_durations_ms.push(duration_ms);
1597                        agg_stats.chunk_attempts_total =
1598                            agg_stats.chunk_attempts_total.saturating_add(1);
1599                        agg_stats.retries_histogram[0] =
1600                            agg_stats.retries_histogram[0].saturating_add(1);
1601                        stored_addresses.push(addr);
1602                        total_stored += 1;
1603                        info!("Stored {total_stored}/{total_chunks}");
1604                        if let Some(tx) = progress {
1605                            let _ = tx
1606                                .send(UploadEvent::ChunkStored {
1607                                    stored: total_stored,
1608                                    total: total_chunks,
1609                                })
1610                                .await;
1611                        }
1612                    }
1613                    Err((addr, e, _started)) => {
1614                        warn!("merkle upload failed for chunk {}: {e}", hex::encode(addr));
1615                        return Err(Error::PartialUpload {
1616                            stored: stored_addresses,
1617                            stored_count: total_stored,
1618                            failed: vec![(addr, e.to_string())],
1619                            failed_count: 1,
1620                            total_chunks,
1621                            reason: format!("merkle chunk upload failed: {e}"),
1622                        });
1623                    }
1624                }
1625            }
1626
1627            if let Some(tx) = progress {
1628                let _ = tx
1629                    .send(UploadEvent::WaveComplete {
1630                        wave: wave_num,
1631                        total_waves: wave_count,
1632                        stored_so_far: total_stored,
1633                        total: total_chunks,
1634                    })
1635                    .await;
1636            }
1637        }
1638
1639        Ok((
1640            total_stored,
1641            batch_result.storage_cost_atto.clone(),
1642            batch_result.gas_cost_wei,
1643            agg_stats,
1644        ))
1645    }
1646
1647    /// Download and decrypt a file from the network, writing it to disk.
1648    ///
1649    /// Uses `streaming_decrypt` so that only one batch of chunks lives in
1650    /// memory at a time, avoiding OOM on large files. Chunks are fetched
1651    /// concurrently within each batch, then decrypted data is written to
1652    /// disk incrementally.
1653    ///
1654    /// Returns the number of bytes written.
1655    ///
1656    /// # Panics
1657    ///
1658    /// Requires a multi-threaded Tokio runtime (`flavor = "multi_thread"`).
1659    /// Will panic if called from a `current_thread` runtime because
1660    /// `streaming_decrypt` takes a synchronous callback that must bridge
1661    /// back to async via `block_in_place`.
1662    ///
1663    /// # Errors
1664    ///
1665    /// Returns an error if any chunk cannot be retrieved, decryption fails,
1666    /// or the file cannot be written.
1667    #[allow(clippy::unused_async)]
1668    pub async fn file_download(&self, data_map: &DataMap, output: &Path) -> Result<u64> {
1669        self.file_download_with_progress(data_map, output, None)
1670            .await
1671    }
1672
1673    /// Download and decrypt a file with progress events.
1674    ///
1675    /// Same as [`Client::file_download`] but sends [`DownloadEvent`]s for UI feedback.
1676    ///
1677    /// Progress reporting:
1678    /// 1. Resolves hierarchical DataMaps to the root level first (reports as
1679    ///    `ChunksFetched` with `total: 0` during resolution)
1680    /// 2. Once the root DataMap is known, sends `total_chunks` with accurate count
1681    /// 3. Fetches data chunks with accurate `fetched/total` progress
1682    #[allow(clippy::unused_async)]
1683    pub async fn file_download_with_progress(
1684        &self,
1685        data_map: &DataMap,
1686        output: &Path,
1687        progress: Option<mpsc::Sender<DownloadEvent>>,
1688    ) -> Result<u64> {
1689        debug!("Downloading file to {}", output.display());
1690
1691        let handle = Handle::current();
1692
1693        // Phase 1: Resolve hierarchical DataMap to root level.
1694        // This fetches child DataMap chunks (typically 3) to discover the real chunk count.
1695        let root_map = if data_map.is_child() {
1696            let dm_chunks = data_map.len();
1697            if let Some(ref tx) = progress {
1698                let _ = tx.try_send(DownloadEvent::ResolvingDataMap {
1699                    total_map_chunks: dm_chunks,
1700                });
1701            }
1702
1703            let resolve_progress = progress.clone();
1704            let resolve_counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
1705
1706            let resolved = tokio::task::block_in_place(|| {
1707                let counter_ref = resolve_counter.clone();
1708                let progress_ref = resolve_progress.clone();
1709                let fetch_limiter = self.controller().fetch.clone();
1710                let fetch = |batch: &[(usize, XorName)]| {
1711                    let batch_owned: Vec<(usize, XorName)> = batch.to_vec();
1712                    let counter = counter_ref.clone();
1713                    let prog = progress_ref.clone();
1714                    let limiter = fetch_limiter.clone();
1715                    handle.block_on(async {
1716                        // Clamp fan-out to batch size — self_encryption
1717                        // requests small batches (default 10), so a
1718                        // higher cap from the controller would be slots
1719                        // we never fill (see PERF-RESULTS.md).
1720                        let cap = limiter.current().min(batch_owned.len().max(1));
1721                        let mut stream = futures::stream::iter(batch_owned)
1722                            .map(|(idx, hash)| {
1723                                let addr = hash.0;
1724                                let limiter = limiter.clone();
1725                                async move {
1726                                    let result = observe_op(
1727                                        &limiter,
1728                                        || async move { self.chunk_get(&addr).await },
1729                                        classify_error,
1730                                    )
1731                                    .await;
1732                                    (idx, hash, result)
1733                                }
1734                            })
1735                            .buffer_unordered(cap);
1736                        let mut results = Vec::new();
1737                        while let Some((idx, hash, result)) =
1738                            futures::StreamExt::next(&mut stream).await
1739                        {
1740                            let chunk = result
1741                                .map_err(|e| {
1742                                    self_encryption::Error::Generic(format!(
1743                                        "DataMap resolution failed: {e}"
1744                                    ))
1745                                })?
1746                                .ok_or_else(|| {
1747                                    self_encryption::Error::Generic(format!(
1748                                        "DataMap chunk not found: {}",
1749                                        hex::encode(hash.0)
1750                                    ))
1751                                })?;
1752                            results.push((idx, chunk.content));
1753                            let fetched =
1754                                counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
1755                            if let Some(ref tx) = prog {
1756                                let _ = tx.try_send(DownloadEvent::MapChunkFetched { fetched });
1757                            }
1758                        }
1759                        // CRITICAL: self_encryption::get_root_data_map_parallel
1760                        // pairs the returned Vec POSITIONALLY with the input
1761                        // hashes via .zip() and discards our idx field. We
1762                        // used buffer_unordered, so completions arrive in
1763                        // arbitrary order. Sort by idx to restore the input
1764                        // order before returning, otherwise chunk bytes get
1765                        // paired with the wrong hashes and the root data
1766                        // map is corrupted.
1767                        results.sort_by_key(|(idx, _)| *idx);
1768                        Ok(results)
1769                    })
1770                };
1771                get_root_data_map_parallel(data_map.clone(), &fetch)
1772            })
1773            .map_err(|e| Error::Encryption(format!("DataMap resolution failed: {e}")))?;
1774
1775            info!(
1776                "Resolved hierarchical DataMap: {} data chunks",
1777                resolved.len()
1778            );
1779            resolved
1780        } else {
1781            data_map.clone()
1782        };
1783
1784        // Phase 2: Now we know the real chunk count.
1785        let total_chunks = root_map.len();
1786        if let Some(ref tx) = progress {
1787            let _ = tx.try_send(DownloadEvent::DataMapResolved { total_chunks });
1788        }
1789
1790        // Phase 3: Fetch and decrypt data chunks with accurate progress.
1791        let fetched_counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
1792        let fetched_for_closure = fetched_counter.clone();
1793        let progress_for_closure = progress.clone();
1794
1795        let fetch_limiter_outer = self.controller().fetch.clone();
1796        let stream = streaming_decrypt(&root_map, |batch: &[(usize, XorName)]| {
1797            let batch_owned: Vec<(usize, XorName)> = batch.to_vec();
1798            let fetched_ref = fetched_for_closure.clone();
1799            let progress_ref = progress_for_closure.clone();
1800            let fetch_limiter = fetch_limiter_outer.clone();
1801
1802            tokio::task::block_in_place(|| {
1803                handle.block_on(async {
1804                    // Clamp fan-out to batch size — see PERF-RESULTS.md.
1805                    let cap = fetch_limiter.current().min(batch_owned.len().max(1));
1806                    let mut stream = futures::stream::iter(batch_owned)
1807                        .map(|(idx, hash)| {
1808                            let addr = hash.0;
1809                            let limiter = fetch_limiter.clone();
1810                            async move {
1811                                let result = observe_op(
1812                                    &limiter,
1813                                    || async move { self.chunk_get(&addr).await },
1814                                    classify_error,
1815                                )
1816                                .await;
1817                                (idx, hash, result)
1818                            }
1819                        })
1820                        .buffer_unordered(cap);
1821
1822                    let mut results = Vec::new();
1823                    while let Some((idx, hash, result)) =
1824                        futures::StreamExt::next(&mut stream).await
1825                    {
1826                        let addr_hex = hex::encode(hash.0);
1827                        let chunk = result
1828                            .map_err(|e| {
1829                                self_encryption::Error::Generic(format!(
1830                                    "Network fetch failed for {addr_hex}: {e}"
1831                                ))
1832                            })?
1833                            .ok_or_else(|| {
1834                                self_encryption::Error::Generic(format!(
1835                                    "Chunk not found: {addr_hex}"
1836                                ))
1837                            })?;
1838                        results.push((idx, chunk.content));
1839                        let fetched =
1840                            fetched_ref.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
1841                        info!("Downloaded {fetched}/{total_chunks}");
1842                        if let Some(ref tx) = progress_ref {
1843                            let _ = tx.try_send(DownloadEvent::ChunksFetched {
1844                                fetched,
1845                                total: total_chunks,
1846                            });
1847                        }
1848                    }
1849                    // streaming_decrypt itself sort_by_keys before
1850                    // zipping, but the same closure is also passed
1851                    // through get_root_data_map_parallel internally
1852                    // (see self_encryption::stream_decrypt.rs::new), and
1853                    // THAT path zips positionally without sorting. Sort
1854                    // here so both consumers see input order.
1855                    results.sort_by_key(|(idx, _)| *idx);
1856                    Ok(results)
1857                })
1858            })
1859        })
1860        .map_err(|e| Error::Encryption(format!("streaming decrypt failed: {e}")))?;
1861
1862        // Write decrypted chunks to a temp file, then rename atomically.
1863        let parent = output.parent().unwrap_or_else(|| Path::new("."));
1864        let unique: u64 = rand::random();
1865        let tmp_path = parent.join(format!(".ant_download_{}_{unique}.tmp", std::process::id()));
1866
1867        let write_result = (|| -> Result<u64> {
1868            let mut file = std::fs::File::create(&tmp_path)?;
1869            let mut bytes_written = 0u64;
1870            for chunk_result in stream {
1871                let chunk_bytes = chunk_result
1872                    .map_err(|e| Error::Encryption(format!("decryption failed: {e}")))?;
1873                file.write_all(&chunk_bytes)?;
1874                bytes_written += chunk_bytes.len() as u64;
1875            }
1876            file.flush()?;
1877            Ok(bytes_written)
1878        })();
1879
1880        match write_result {
1881            Ok(bytes_written) => match std::fs::rename(&tmp_path, output) {
1882                Ok(()) => {
1883                    info!(
1884                        "File downloaded: {bytes_written} bytes written to {}",
1885                        output.display()
1886                    );
1887                    Ok(bytes_written)
1888                }
1889                Err(rename_err) => {
1890                    if let Err(cleanup_err) = std::fs::remove_file(&tmp_path) {
1891                        warn!(
1892                            "Failed to remove temp download file {}: {cleanup_err}",
1893                            tmp_path.display()
1894                        );
1895                    }
1896                    Err(rename_err.into())
1897                }
1898            },
1899            Err(e) => {
1900                if let Err(cleanup_err) = std::fs::remove_file(&tmp_path) {
1901                    warn!(
1902                        "Failed to remove temp download file {}: {cleanup_err}",
1903                        tmp_path.display()
1904                    );
1905                }
1906                Err(e)
1907            }
1908        }
1909    }
1910}
1911
1912#[cfg(test)]
1913#[allow(clippy::unwrap_used)]
1914mod tests {
1915    use super::*;
1916
1917    #[test]
1918    fn disk_space_check_passes_for_small_file() {
1919        // A 1 KB file should always pass the disk space check
1920        check_disk_space_for_spill(1024).unwrap();
1921    }
1922
1923    #[test]
1924    fn disk_space_check_fails_for_absurd_size() {
1925        // Requesting space for a 1 exabyte file should fail on any real system
1926        let result = check_disk_space_for_spill(u64::MAX / 2);
1927        assert!(result.is_err());
1928        let err = result.unwrap_err();
1929        assert!(
1930            matches!(err, Error::InsufficientDiskSpace(_)),
1931            "expected InsufficientDiskSpace, got: {err}"
1932        );
1933    }
1934
1935    #[test]
1936    fn chunk_spill_round_trip() {
1937        let mut spill = ChunkSpill::new().unwrap();
1938        let data1 = vec![0xAA; 1024];
1939        let data2 = vec![0xBB; 2048];
1940
1941        spill.push(&data1).unwrap();
1942        spill.push(&data2).unwrap();
1943
1944        assert_eq!(spill.len(), 2);
1945        assert_eq!(spill.total_bytes(), 1024 + 2048);
1946        assert_eq!(spill.avg_chunk_size(), (1024 + 2048) / 2);
1947
1948        // Read back and verify
1949        let chunk1 = spill.read_chunk(spill.addresses.first().unwrap()).unwrap();
1950        assert_eq!(&chunk1[..], &data1[..]);
1951
1952        let chunk2 = spill.read_chunk(spill.addresses.get(1).unwrap()).unwrap();
1953        assert_eq!(&chunk2[..], &data2[..]);
1954
1955        // Verify waves with 1-chunk wave size
1956        let waves: Vec<_> = spill.addresses.chunks(1).collect();
1957        assert_eq!(waves.len(), 2);
1958    }
1959
1960    #[test]
1961    fn chunk_spill_cleanup_on_drop() {
1962        let dir;
1963        {
1964            let spill = ChunkSpill::new().unwrap();
1965            dir = spill.dir.clone();
1966            assert!(dir.exists());
1967        }
1968        // After drop, the directory should be cleaned up
1969        assert!(!dir.exists(), "spill dir should be removed on drop");
1970    }
1971
1972    #[test]
1973    fn chunk_spill_deduplicates_identical_content() {
1974        let mut spill = ChunkSpill::new().unwrap();
1975        let data = vec![0xCC; 512];
1976
1977        spill.push(&data).unwrap();
1978        spill.push(&data).unwrap(); // same content, should be skipped
1979        spill.push(&data).unwrap(); // again
1980
1981        assert_eq!(spill.len(), 1, "duplicate chunks should be deduplicated");
1982        assert_eq!(
1983            spill.total_bytes(),
1984            512,
1985            "total_bytes should count unique only"
1986        );
1987
1988        // Different content should still be added
1989        let data2 = vec![0xDD; 256];
1990        spill.push(&data2).unwrap();
1991        assert_eq!(spill.len(), 2);
1992        assert_eq!(spill.total_bytes(), 512 + 256);
1993    }
1994}
1995
1996/// Compile-time assertions that Client file method futures are Send.
1997#[cfg(test)]
1998mod send_assertions {
1999    use super::*;
2000
2001    fn _assert_send<T: Send>(_: &T) {}
2002
2003    #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
2004    async fn _file_upload_is_send(client: &Client) {
2005        let fut = client.file_upload(Path::new("/dev/null"));
2006        _assert_send(&fut);
2007    }
2008
2009    #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
2010    async fn _file_upload_with_mode_is_send(client: &Client) {
2011        let fut = client.file_upload_with_mode(Path::new("/dev/null"), PaymentMode::Auto);
2012        _assert_send(&fut);
2013    }
2014
2015    #[allow(
2016        dead_code,
2017        unreachable_code,
2018        unused_variables,
2019        clippy::diverging_sub_expression
2020    )]
2021    async fn _file_download_is_send(client: &Client) {
2022        let dm: DataMap = todo!();
2023        let fut = client.file_download(&dm, Path::new("/dev/null"));
2024        _assert_send(&fut);
2025    }
2026}