Skip to main content

ant_core/data/client/
file.rs

1//! File operations using streaming self-encryption.
2//!
3//! Upload files directly from disk without loading them entirely into memory.
4//! Uses `stream_encrypt` to process files in 8KB chunks, encrypting and
5//! uploading each piece as it's produced.
6//!
7//! Encrypted chunks are spilled to a temporary directory during encryption
8//! so that peak memory usage is bounded to one wave (~256 MB for 64 × 4 MB
9//! chunks) regardless of file size.
10//!
11//! For in-memory data uploads, see the `data` module.
12
13use crate::data::client::adaptive::observe_op;
14use crate::data::client::batch::{
15    finalize_batch_payment, PaymentIntent, PreparedChunk, WaveAggregateStats,
16};
17use crate::data::client::classify_error;
18use crate::data::client::merkle::{
19    finalize_merkle_batch, should_use_merkle, MerkleBatchPaymentResult, PaymentMode,
20    PreparedMerkleBatch,
21};
22use crate::data::client::Client;
23use crate::data::error::{Error, Result};
24use ant_protocol::evm::{Amount, PaymentQuote, QuoteHash, TxHash, MAX_LEAVES};
25use ant_protocol::transport::{MultiAddr, PeerId};
26use ant_protocol::{compute_address, DATA_TYPE_CHUNK};
27use bytes::Bytes;
28use fs2::FileExt;
29use futures::stream::{self, StreamExt};
30use self_encryption::{
31    get_root_data_map_parallel, stream_decrypt_batch_size, stream_encrypt,
32    streaming_decrypt_with_batch_size, DataMap,
33};
34use std::collections::{HashMap, HashSet};
35use std::io::Write;
36use std::path::{Path, PathBuf};
37use std::sync::{Arc, Mutex};
38use tokio::runtime::Handle;
39use tokio::sync::mpsc;
40use tracing::{debug, info, warn};
41use xor_name::XorName;
42
43/// Progress events emitted during file upload for UI feedback.
44#[derive(Debug, Clone)]
45pub enum UploadEvent {
46    /// A chunk has been encrypted and spilled to disk.
47    Encrypting { chunks_done: usize },
48    /// File encryption complete.
49    Encrypted { total_chunks: usize },
50    /// Starting quote collection for a wave.
51    QuotingChunks {
52        wave: usize,
53        total_waves: usize,
54        chunks_in_wave: usize,
55    },
56    /// A chunk has been quoted (peer discovery + price received).
57    /// This is the slow phase — each quote involves network round-trips.
58    ChunkQuoted { quoted: usize, total: usize },
59    /// A chunk has been stored on the network.
60    ChunkStored { stored: usize, total: usize },
61    /// A wave has completed.
62    WaveComplete {
63        wave: usize,
64        total_waves: usize,
65        stored_so_far: usize,
66        total: usize,
67    },
68}
69
70/// Progress events emitted during file download for UI feedback.
71#[derive(Debug, Clone)]
72pub enum DownloadEvent {
73    /// Resolving hierarchical DataMap to discover real chunk count.
74    ResolvingDataMap { total_map_chunks: usize },
75    /// A DataMap chunk has been fetched during resolution.
76    MapChunkFetched { fetched: usize },
77    /// DataMap resolved — total data chunk count now known.
78    DataMapResolved { total_chunks: usize },
79    /// Data chunks are being fetched from the network.
80    ChunksFetched { fetched: usize, total: usize },
81}
82
83/// One entry in the per-chunk quote list returned by
84/// [`Client::get_store_quotes`]: the responding peer, its addresses, the
85/// signed quote it returned, and the payment amount it is demanding.
86type QuoteEntry = (PeerId, Vec<MultiAddr>, PaymentQuote, Amount);
87
88/// Number of chunks per upload wave (matches batch.rs PAYMENT_WAVE_SIZE).
89const UPLOAD_WAVE_SIZE: usize = 64;
90
91/// Stream decrypt batches should be larger than fetch fan-out so
92/// `buffer_unordered` can keep launching new chunk GETs as earlier ones
93/// complete, instead of stopping at each self-encryption batch boundary.
94const DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER: usize = 4;
95
96/// Use at most this fraction of currently usable RAM for one decrypt batch.
97const DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR: u64 = 4;
98
99/// A decrypt batch briefly holds encrypted chunk bytes, decrypted chunk bytes,
100/// and Vec/Bytes overhead. Use a conservative multiplier rather than assuming
101/// payload bytes alone.
102const DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER: u64 = 3;
103
104/// Maximum number of distinct chunk addresses to sample when probing for a
105/// representative quote in [`Client::estimate_upload_cost`].
106///
107/// Bounded small so we never spend more than a couple of round-trips on the
108/// `AlreadyStored` retry path, which only matters when many leading chunks
109/// of a file already live on the network.
110const ESTIMATE_SAMPLE_CAP: usize = 5;
111
112/// Gas used by one `pay_for_quotes` transaction that packs up to
113/// `UPLOAD_WAVE_SIZE` (quote_hash, rewards_address, amount) entries.
114///
115/// `batch_pay` in `batch.rs` flattens every chunk's close-group quotes into a
116/// single EVM call, so the dominant cost is the SSTOREs for each entry plus
117/// the base tx overhead. On Arbitrum that is roughly
118/// `21_000 + 64 × (20_000 + small)` ≈ 1.3M; we round up to 1.5M as a
119/// conservative per-wave upper bound.
120const GAS_PER_WAVE_TX: u128 = 1_500_000;
121
122/// Gas used by one merkle batch payment transaction.
123///
124/// One on-chain tx per merkle sub-batch, but each tx verifies a merkle tree
125/// and posts a pool commitment, so budget higher than a plain transfer.
126const GAS_PER_MERKLE_TX: u128 = 500_000;
127
128/// Advisory gas price (wei/gas) used to turn the gas estimate into an ETH
129/// figure when no live gas oracle is consulted.
130///
131/// Arbitrum One typically settles around 0.1 gwei on quiet blocks; we use
132/// that as the default so the CLI prints a sensible order-of-magnitude
133/// number. Users should treat the reported gas cost as an estimate, not a
134/// commitment — real gas is bid at submission time.
135const ARBITRUM_GAS_PRICE_WEI: u128 = 100_000_000;
136
137/// Extra headroom percentage for disk space check.
138///
139/// Encrypted chunks are slightly larger than the source data due to padding
140/// and self-encryption overhead. We require file_size + 10% free space in
141/// the temp directory to account for this.
142const DISK_SPACE_HEADROOM_PERCENT: u64 = 10;
143
144/// Temporary on-disk buffer for encrypted chunks.
145///
146/// During file encryption, chunks are written to a temp directory so that
147/// only their 32-byte addresses stay in memory. At upload time chunks are
148/// read back one wave at a time, keeping peak RAM at ~`UPLOAD_WAVE_SIZE × 4 MB`.
149/// Grace period (in seconds) before a spill dir is eligible for stale cleanup.
150///
151/// This is a small TOCTOU guard covering the sub-millisecond window inside
152/// [`ChunkSpill::new`] between `create_dir` and `try_lock_exclusive`. Once a
153/// dir is older than this and its lockfile is releasable, the owning process
154/// is gone and the dir is safe to reap — regardless of how old it is.
155///
156/// The previous policy waited 24 h before reaping any orphan, which meant
157/// that any non-graceful exit (SIGKILL, kernel OOM, panic abort) leaked its
158/// spill dir until the next day's upload — and on a host being restart-looped
159/// by systemd, orphans could fill the disk well within that window.
160const SPILL_STALE_GRACE_SECS: u64 = 30;
161
162/// Prefix for spill directory names to distinguish from user files.
163const SPILL_DIR_PREFIX: &str = "spill_";
164
165/// Lockfile name inside each spill dir to signal active use.
166const SPILL_LOCK_NAME: &str = ".lock";
167
168struct ChunkSpill {
169    /// Directory holding spilled chunk files (named by hex address).
170    dir: PathBuf,
171    /// Lockfile held for the lifetime of this spill (prevents stale cleanup).
172    _lock: std::fs::File,
173    /// Deduplicated list of chunk addresses.
174    addresses: Vec<[u8; 32]>,
175    /// Tracks seen addresses for deduplication.
176    seen: HashSet<[u8; 32]>,
177    /// Running total of unique chunk byte sizes (for average-size calculation).
178    total_bytes: u64,
179}
180
181impl ChunkSpill {
182    /// Return the parent directory for all spill dirs: `<data_dir>/spill/`.
183    fn spill_root() -> Result<PathBuf> {
184        use crate::config;
185        let root = config::data_dir()
186            .map_err(|e| Error::Config(format!("cannot determine data dir for spill: {e}")))?
187            .join("spill");
188        Ok(root)
189    }
190
191    /// Create a new spill directory under `<data_dir>/spill/`.
192    ///
193    /// Directory name is `spill_<timestamp>_<random>` so orphans can be
194    /// identified by prefix and cleaned up by age. A lockfile inside the
195    /// dir prevents concurrent cleanup from deleting an active spill.
196    fn new() -> Result<Self> {
197        let root = Self::spill_root()?;
198        std::fs::create_dir_all(&root)?;
199
200        // Clean up stale spill dirs from previous crashed runs.
201        Self::cleanup_stale(&root);
202
203        let now = std::time::SystemTime::now()
204            .duration_since(std::time::UNIX_EPOCH)
205            .unwrap_or_default()
206            .as_secs();
207        let unique: u64 = rand::random();
208        let dir = root.join(format!("{SPILL_DIR_PREFIX}{now}_{unique}"));
209        std::fs::create_dir(&dir)?;
210
211        // Create and hold a lockfile for the lifetime of this spill.
212        // cleanup_stale() will skip dirs with locked files.
213        let lock_path = dir.join(SPILL_LOCK_NAME);
214        let lock_file = std::fs::File::create(&lock_path).map_err(|e| {
215            Error::Io(std::io::Error::new(
216                e.kind(),
217                format!("failed to create spill lockfile: {e}"),
218            ))
219        })?;
220        lock_file.try_lock_exclusive().map_err(|e| {
221            Error::Io(std::io::Error::new(
222                e.kind(),
223                format!("failed to lock spill lockfile: {e}"),
224            ))
225        })?;
226
227        Ok(Self {
228            dir,
229            _lock: lock_file,
230            addresses: Vec::new(),
231            seen: HashSet::new(),
232            total_bytes: 0,
233        })
234    }
235
236    /// Clean up stale spill directories. Best-effort, errors are logged.
237    ///
238    /// A spill dir is reaped when:
239    /// 1. Its name starts with `SPILL_DIR_PREFIX` (ignores unrelated files)
240    /// 2. It is an actual directory, not a symlink (prevents symlink attacks)
241    /// 3. Its timestamp is older than `SPILL_STALE_GRACE_SECS` (TOCTOU guard)
242    /// 4. Its lockfile is releasable — i.e. no live process holds it
243    ///
244    /// The lockfile is the primary correctness gate: a releasable lock means
245    /// the owning `ChunkSpill` has been dropped or the process is gone, so
246    /// the dir is fair game. The grace period covers only the brief window
247    /// inside [`Self::new`] between `create_dir` and `try_lock_exclusive`.
248    ///
249    /// Safe to call concurrently from multiple processes.
250    fn cleanup_stale(root: &Path) {
251        let now = std::time::SystemTime::now()
252            .duration_since(std::time::UNIX_EPOCH)
253            .unwrap_or_default()
254            .as_secs();
255
256        if now == 0 {
257            // Clock is broken (before Unix epoch). Skip cleanup to avoid
258            // misidentifying dirs as stale.
259            warn!("System clock before Unix epoch, skipping spill cleanup");
260            return;
261        }
262
263        let entries = match std::fs::read_dir(root) {
264            Ok(entries) => entries,
265            Err(_) => return,
266        };
267
268        for entry in entries.flatten() {
269            let name = entry.file_name();
270            let name_str = name.to_string_lossy();
271
272            // Only process dirs with our prefix.
273            let suffix = match name_str.strip_prefix(SPILL_DIR_PREFIX) {
274                Some(s) => s,
275                None => continue,
276            };
277
278            // Parse timestamp: "spill_<timestamp>_<random>"
279            let timestamp: u64 = match suffix.split('_').next().and_then(|s| s.parse().ok()) {
280                Some(ts) => ts,
281                None => continue,
282            };
283
284            if now.saturating_sub(timestamp) < SPILL_STALE_GRACE_SECS {
285                continue;
286            }
287
288            // Safety: only delete actual directories, not symlinks.
289            let file_type = match entry.file_type() {
290                Ok(ft) => ft,
291                Err(_) => continue,
292            };
293            if !file_type.is_dir() {
294                continue;
295            }
296
297            let path = entry.path();
298
299            // Check lockfile: if locked, the dir is in active use -- skip it.
300            let lock_path = path.join(SPILL_LOCK_NAME);
301            if let Ok(lock_file) = std::fs::File::open(&lock_path) {
302                use fs2::FileExt;
303                if lock_file.try_lock_exclusive().is_err() {
304                    // Lock held by another process -- dir is active.
305                    debug!("Skipping active spill dir: {}", path.display());
306                    continue;
307                }
308                // We acquired the lock, so no one else holds it.
309                // Drop it before deleting.
310                drop(lock_file);
311            }
312
313            info!("Cleaning up stale spill dir: {}", path.display());
314            if let Err(e) = std::fs::remove_dir_all(&path) {
315                warn!("Failed to clean up stale spill dir {}: {e}", path.display());
316            }
317        }
318    }
319
320    /// Run stale spill cleanup. Call at client startup or periodically.
321    #[allow(dead_code)]
322    pub(crate) fn run_cleanup() {
323        if let Ok(root) = Self::spill_root() {
324            Self::cleanup_stale(&root);
325        }
326    }
327
328    /// Write one encrypted chunk to disk and record its address.
329    ///
330    /// Deduplicates by content address: if the same chunk was already
331    /// spilled, the write and accounting are skipped. This prevents
332    /// double-uploads and inflated quoting metrics.
333    fn push(&mut self, content: &[u8]) -> Result<()> {
334        let address = compute_address(content);
335        if !self.seen.insert(address) {
336            return Ok(());
337        }
338        let path = self.dir.join(hex::encode(address));
339        std::fs::write(&path, content)?;
340        self.total_bytes += content.len() as u64;
341        self.addresses.push(address);
342        Ok(())
343    }
344
345    /// Number of chunks stored.
346    fn len(&self) -> usize {
347        self.addresses.len()
348    }
349
350    /// Total bytes of all spilled chunks.
351    fn total_bytes(&self) -> u64 {
352        self.total_bytes
353    }
354
355    /// Average chunk size in bytes (for quoting metrics).
356    fn avg_chunk_size(&self) -> u64 {
357        if self.addresses.is_empty() {
358            return 0;
359        }
360        self.total_bytes / self.addresses.len() as u64
361    }
362
363    /// Read a single chunk back from disk by address.
364    fn read_chunk(&self, address: &[u8; 32]) -> Result<Bytes> {
365        let path = self.dir.join(hex::encode(address));
366        let data = std::fs::read(&path).map_err(|e| {
367            Error::Io(std::io::Error::new(
368                e.kind(),
369                format!("reading spilled chunk {}: {e}", hex::encode(address)),
370            ))
371        })?;
372        Ok(Bytes::from(data))
373    }
374
375    /// Iterate over address slices in wave-sized groups.
376    fn waves(&self) -> std::slice::Chunks<'_, [u8; 32]> {
377        self.addresses.chunks(UPLOAD_WAVE_SIZE)
378    }
379
380    /// Read a wave of chunks from disk.
381    fn read_wave(&self, wave_addrs: &[[u8; 32]]) -> Result<Vec<(Bytes, [u8; 32])>> {
382        let mut out = Vec::with_capacity(wave_addrs.len());
383        for addr in wave_addrs {
384            let content = self.read_chunk(addr)?;
385            out.push((content, *addr));
386        }
387        Ok(out)
388    }
389
390    /// Clean up the spill directory.
391    fn cleanup(&self) {
392        if let Err(e) = std::fs::remove_dir_all(&self.dir) {
393            warn!(
394                "Failed to clean up chunk spill dir {}: {e}",
395                self.dir.display()
396            );
397        }
398    }
399}
400
401impl Drop for ChunkSpill {
402    fn drop(&mut self) {
403        self.cleanup();
404    }
405}
406
407/// Check that the spill directory has enough free space for the spilled chunks.
408///
409/// `file_size` is the source file's byte count. We require
410/// `file_size + 10%` free space to account for self-encryption overhead.
411fn check_disk_space_for_spill(file_size: u64) -> Result<()> {
412    let spill_root = ChunkSpill::spill_root()?;
413
414    // Ensure the root exists so fs2 can query it.
415    std::fs::create_dir_all(&spill_root)?;
416
417    let available = fs2::available_space(&spill_root).map_err(|e| {
418        Error::Io(std::io::Error::new(
419            e.kind(),
420            format!(
421                "failed to query disk space on {}: {e}",
422                spill_root.display()
423            ),
424        ))
425    })?;
426
427    // Use integer arithmetic to avoid f64 precision loss on large file sizes.
428    let headroom = file_size / DISK_SPACE_HEADROOM_PERCENT;
429    let required = file_size.saturating_add(headroom);
430
431    if available < required {
432        let avail_mb = available / (1024 * 1024);
433        let req_mb = required / (1024 * 1024);
434        return Err(Error::InsufficientDiskSpace(format!(
435            "need ~{req_mb} MB in spill dir ({}) but only {avail_mb} MB available",
436            spill_root.display()
437        )));
438    }
439
440    debug!(
441        "Disk space check passed: {available} bytes available, {required} bytes required (spill: {})",
442        spill_root.display()
443    );
444    Ok(())
445}
446
447fn usable_memory_bytes() -> Option<u64> {
448    let mut system = sysinfo::System::new();
449    system.refresh_memory();
450
451    let available_memory = system.available_memory();
452    let free_memory = system.free_memory();
453    let used_memory = system.used_memory();
454    let total_memory = system.total_memory();
455    let unused_memory = total_memory.saturating_sub(used_memory);
456
457    let mut usable = [available_memory, free_memory, unused_memory]
458        .into_iter()
459        .filter(|bytes| *bytes > 0)
460        .max();
461
462    let cgroup_free_memory = system
463        .cgroup_limits()
464        .filter(|limits| limits.total_memory > 0)
465        .map(|limits| limits.free_memory);
466    if let Some(cgroup_free_memory) = cgroup_free_memory {
467        usable = Some(usable.unwrap_or(u64::MAX).min(cgroup_free_memory));
468    }
469
470    debug!(
471        available_memory,
472        free_memory,
473        used_memory,
474        total_memory,
475        cgroup_free_memory,
476        usable_memory = ?usable,
477        "Detected usable memory for stream decrypt batch sizing"
478    );
479
480    usable
481}
482
483fn stream_decrypt_batch_memory_cap(usable_memory_bytes: u64) -> usize {
484    let budget = usable_memory_bytes / DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR;
485    let estimated_bytes_per_chunk = (self_encryption::MAX_CHUNK_SIZE as u64)
486        .saturating_mul(DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER)
487        .max(1);
488    let cap = (budget / estimated_bytes_per_chunk).max(1);
489
490    usize::try_from(cap).unwrap_or(usize::MAX)
491}
492
493fn adaptive_stream_decrypt_batch_size(
494    total_chunks: usize,
495    fetch_cap: usize,
496    configured_batch_floor: usize,
497    usable_memory_bytes: Option<u64>,
498) -> usize {
499    let fetch_target = fetch_cap
500        .max(1)
501        .saturating_mul(DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER);
502    let requested = match usable_memory_bytes {
503        Some(bytes) => {
504            let memory_cap = stream_decrypt_batch_memory_cap(bytes);
505            configured_batch_floor
506                .max(fetch_target)
507                .max(1)
508                .min(memory_cap)
509        }
510        None => configured_batch_floor.max(1),
511    };
512
513    requested.min(total_chunks.max(1)).max(1)
514}
515
516/// Whether the data map is published to the network for address-based retrieval.
517///
518/// A private upload stores only the data chunks and returns the `DataMap` to
519/// the caller — only someone holding that `DataMap` can reconstruct the file.
520/// A public upload additionally stores the serialized `DataMap` as a chunk on
521/// the network, yielding a single chunk address that anyone can use to
522/// retrieve the `DataMap` (via [`Client::data_map_fetch`]) and then the file.
523#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
524pub enum Visibility {
525    /// Keep the data map local; only the holder can retrieve the file.
526    #[default]
527    Private,
528    /// Publish the data map as a network chunk so anyone with the returned
529    /// address can retrieve and decrypt the file.
530    Public,
531}
532
533/// Estimated cost of uploading a file, returned by
534/// [`Client::estimate_upload_cost`].
535#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
536pub struct UploadCostEstimate {
537    /// Original file size in bytes.
538    pub file_size: u64,
539    /// Number of chunks the file would be split into (data chunks only,
540    /// does not include the DataMap chunk added during public uploads).
541    pub chunk_count: usize,
542    /// Estimated total storage cost in atto (token smallest unit).
543    pub storage_cost_atto: String,
544    /// Estimated gas cost in wei as a string. This is a rough heuristic
545    /// based on chunk count and payment mode, NOT a live gas price query.
546    pub estimated_gas_cost_wei: String,
547    /// Payment mode that would be used.
548    pub payment_mode: PaymentMode,
549}
550
551/// Result of a file upload: the `DataMap` needed to retrieve the file.
552///
553/// Marked `#[non_exhaustive]` so adding a new field in future is not a
554/// breaking change for downstream consumers that construct or pattern-match
555/// on this struct.
556#[derive(Debug, Clone)]
557#[non_exhaustive]
558pub struct FileUploadResult {
559    /// The data map containing chunk metadata for reconstruction.
560    pub data_map: DataMap,
561    /// Number of chunks stored on the network.
562    pub chunks_stored: usize,
563    /// Number of chunks that failed to store. Always 0 for a successful
564    /// upload — partial-failure information is conveyed via
565    /// [`crate::data::Error::PartialUpload`] instead.
566    pub chunks_failed: usize,
567    /// Total number of chunks the upload attempted to store. On full
568    /// success this equals `chunks_stored`.
569    pub total_chunks: usize,
570    /// Which payment mode was actually used (not just requested).
571    pub payment_mode_used: PaymentMode,
572    /// Total storage cost paid in token units (atto). "0" if all chunks already existed.
573    pub storage_cost_atto: String,
574    /// Total gas cost in wei. 0 if no on-chain transactions were made.
575    pub gas_cost_wei: u128,
576    /// Chunk address of the serialized `DataMap`, set only for
577    /// [`Visibility::Public`] uploads. **`Some` means this address is
578    /// retrievable from the network (via [`Client::data_map_fetch`])**, not
579    /// necessarily that *this* upload paid to store it — if the serialized
580    /// `DataMap` hashed to a chunk that was already on the network (same
581    /// file uploaded before; deterministic via self-encryption), the address
582    /// is still returned but no storage payment was made for it.
583    pub data_map_address: Option<[u8; 32]>,
584    /// Sum of chunk-store RPC attempts across the upload
585    /// (`>= chunks_stored` on full success; more if any chunk retried).
586    /// `0` for paths that don't run the wave store loop.
587    pub chunk_attempts_total: usize,
588    /// Per-chunk store wall-clock in ms (length == `chunks_stored` on full
589    /// success, empty for paths that don't run the wave store loop).
590    pub store_durations_ms: Vec<u64>,
591    /// Count of stored chunks that succeeded on each retry round
592    /// (index 0 = first attempt, 1 = first retry, etc.). All zeros for
593    /// paths that don't run the wave store loop.
594    pub retries_histogram: [usize; 4],
595}
596
597/// Payment information for external signing — either wave-batch or merkle.
598#[derive(Debug)]
599pub enum ExternalPaymentInfo {
600    /// Wave-batch: individual (quote_hash, rewards_address, amount) tuples.
601    WaveBatch {
602        /// Chunks ready for payment (needed for finalize).
603        prepared_chunks: Vec<PreparedChunk>,
604        /// Payment intent for external signing.
605        payment_intent: PaymentIntent,
606    },
607    /// Merkle: single on-chain call with depth, pool commitments, timestamp.
608    Merkle {
609        /// The prepared merkle batch (public fields sent to frontend, private fields stay in Rust).
610        prepared_batch: PreparedMerkleBatch,
611        /// Raw chunk contents (needed for upload after payment).
612        chunk_contents: Vec<Bytes>,
613        /// Chunk addresses in order (needed for upload after payment).
614        chunk_addresses: Vec<[u8; 32]>,
615    },
616}
617
618/// Prepared upload ready for external payment.
619///
620/// Contains everything needed to construct the on-chain payment transaction
621/// externally (e.g. via WalletConnect in a desktop app) and then finalize
622/// the upload without a Rust-side wallet.
623///
624/// Note: This struct stays in Rust memory — only the public fields of
625/// `payment_info` are sent to the frontend. `PreparedChunk` contains
626/// non-serializable network types, so the full struct cannot derive `Serialize`.
627///
628/// Marked `#[non_exhaustive]` so adding a new field in future is not a
629/// breaking change for downstream consumers.
630#[derive(Debug)]
631#[non_exhaustive]
632pub struct PreparedUpload {
633    /// The data map for later retrieval.
634    pub data_map: DataMap,
635    /// Payment information — either wave-batch or merkle depending on chunk count.
636    pub payment_info: ExternalPaymentInfo,
637    /// Chunk address of the serialized `DataMap` when this upload was
638    /// prepared with [`Visibility::Public`]. `Some` means the address is
639    /// retrievable on the network after finalization — either because this
640    /// upload paid to store the chunk in `payment_info`, or because the
641    /// chunk was already on the network (deterministic self-encryption).
642    /// Carried through to [`FileUploadResult::data_map_address`].
643    pub data_map_address: Option<[u8; 32]>,
644}
645
646/// Return type for [`spawn_file_encryption`]: chunk receiver, `DataMap` oneshot, join handle.
647type EncryptionChannels = (
648    tokio::sync::mpsc::Receiver<Bytes>,
649    tokio::sync::oneshot::Receiver<DataMap>,
650    tokio::task::JoinHandle<Result<()>>,
651);
652
653/// Spawn a blocking task that streams file encryption through a channel.
654fn spawn_file_encryption(path: PathBuf) -> Result<EncryptionChannels> {
655    let metadata = std::fs::metadata(&path)?;
656    let data_size = usize::try_from(metadata.len())
657        .map_err(|e| Error::Encryption(format!("file size exceeds platform usize: {e}")))?;
658
659    let (chunk_tx, chunk_rx) = tokio::sync::mpsc::channel(2);
660    let (datamap_tx, datamap_rx) = tokio::sync::oneshot::channel();
661
662    let handle = tokio::task::spawn_blocking(move || {
663        let file = std::fs::File::open(&path)?;
664        let mut reader = std::io::BufReader::new(file);
665
666        let read_error: Arc<Mutex<Option<std::io::Error>>> = Arc::new(Mutex::new(None));
667        let read_error_clone = Arc::clone(&read_error);
668
669        let data_iter = std::iter::from_fn(move || {
670            let mut buffer = vec![0u8; 8192];
671            match std::io::Read::read(&mut reader, &mut buffer) {
672                Ok(0) => None,
673                Ok(n) => {
674                    buffer.truncate(n);
675                    Some(Bytes::from(buffer))
676                }
677                Err(e) => {
678                    let mut guard = read_error_clone
679                        .lock()
680                        .unwrap_or_else(|poisoned| poisoned.into_inner());
681                    *guard = Some(e);
682                    None
683                }
684            }
685        });
686
687        let mut stream = stream_encrypt(data_size, data_iter)
688            .map_err(|e| Error::Encryption(format!("stream_encrypt failed: {e}")))?;
689
690        for chunk_result in stream.chunks() {
691            // Check for captured read errors immediately after each chunk.
692            // stream_encrypt sees None (EOF) when a read fails, so it stops
693            // producing chunks. We must detect this before sending the
694            // partial results to avoid uploading a truncated DataMap.
695            {
696                let guard = read_error
697                    .lock()
698                    .unwrap_or_else(|poisoned| poisoned.into_inner());
699                if let Some(ref e) = *guard {
700                    return Err(Error::Io(std::io::Error::new(e.kind(), e.to_string())));
701                }
702            }
703
704            let (_hash, content) = chunk_result
705                .map_err(|e| Error::Encryption(format!("chunk encryption failed: {e}")))?;
706            if chunk_tx.blocking_send(content).is_err() {
707                return Err(Error::Encryption("upload receiver dropped".to_string()));
708            }
709        }
710
711        // Final check: read error after last chunk (stream saw EOF).
712        {
713            let guard = read_error
714                .lock()
715                .unwrap_or_else(|poisoned| poisoned.into_inner());
716            if let Some(ref e) = *guard {
717                return Err(Error::Io(std::io::Error::new(e.kind(), e.to_string())));
718            }
719        }
720
721        let datamap = stream
722            .into_datamap()
723            .ok_or_else(|| Error::Encryption("no DataMap after encryption".to_string()))?;
724        if datamap_tx.send(datamap).is_err() {
725            warn!("DataMap receiver dropped — upload may have been cancelled");
726        }
727        Ok(())
728    });
729
730    Ok((chunk_rx, datamap_rx, handle))
731}
732
733impl Client {
734    /// Upload a file to the network using streaming self-encryption.
735    ///
736    /// Automatically selects merkle batch payment for files that produce
737    /// 64+ chunks (saves gas). Encrypted chunks are spilled to a temp
738    /// directory so peak memory stays at ~256 MB regardless of file size.
739    ///
740    /// # Errors
741    ///
742    /// Returns an error if the file cannot be read, encryption fails,
743    /// or any chunk cannot be stored.
744    pub async fn file_upload(&self, path: &Path) -> Result<FileUploadResult> {
745        self.file_upload_with_mode(path, PaymentMode::Auto).await
746    }
747
748    /// Estimate the cost of uploading a file without actually uploading.
749    ///
750    /// Encrypts the file to determine chunk count and sizes, then requests
751    /// a single quote from the network for a representative chunk. The
752    /// per-chunk price is extrapolated to the total chunk count.
753    ///
754    /// The estimate is fast (~2-5s) and does not require a wallet. Spilled
755    /// chunks are cleaned up automatically when the function returns.
756    ///
757    /// Gas cost is an advisory heuristic, not a live gas-oracle query. It is
758    /// derived from realistic per-transaction budgets (`GAS_PER_WAVE_TX`,
759    /// `GAS_PER_MERKLE_TX`) priced at `ARBITRUM_GAS_PRICE_WEI`. Real gas
760    /// varies with network conditions.
761    ///
762    /// If the first sampled chunk is already stored on the network, the
763    /// function retries with subsequent chunk addresses (up to
764    /// `ESTIMATE_SAMPLE_CAP`). If every sampled address reports stored,
765    /// a [`Error::CostEstimationInconclusive`] is returned so callers can
766    /// decide how to react rather than trust a bogus "free" estimate. Only
767    /// when every address in the file is stored do we return a zero-cost
768    /// estimate.
769    ///
770    /// # Errors
771    ///
772    /// Returns an error if the file cannot be read, encryption fails,
773    /// the network cannot provide a quote, or every sampled chunk is
774    /// already stored ([`Error::CostEstimationInconclusive`]).
775    pub async fn estimate_upload_cost(
776        &self,
777        path: &Path,
778        mode: PaymentMode,
779        progress: Option<mpsc::Sender<UploadEvent>>,
780    ) -> Result<UploadCostEstimate> {
781        let file_size = std::fs::metadata(path).map_err(Error::Io)?.len();
782
783        if file_size < 3 {
784            return Err(Error::InvalidData(
785                "File too small: self-encryption requires at least 3 bytes".into(),
786            ));
787        }
788
789        check_disk_space_for_spill(file_size)?;
790
791        info!(
792            "Estimating upload cost for {} ({file_size} bytes)",
793            path.display()
794        );
795
796        let (spill, _data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
797        let chunk_count = spill.len();
798
799        if let Some(ref tx) = progress {
800            let _ = tx
801                .send(UploadEvent::Encrypted {
802                    total_chunks: chunk_count,
803                })
804                .await;
805        }
806
807        info!("Encrypted into {chunk_count} chunks, requesting quote");
808
809        // Sample up to ESTIMATE_SAMPLE_CAP distinct chunk addresses. A single
810        // AlreadyStored result says nothing about the rest of the file — the
811        // first chunk is often a DataMap-adjacent chunk that collides with
812        // prior uploads even when 99% of the file is new. Only treat the
813        // whole file as "fully stored" when every sample comes back stored.
814        let sample_limit = spill.addresses.len().min(ESTIMATE_SAMPLE_CAP);
815        let mut sampled = 0usize;
816        let mut all_already_stored = true;
817        let mut quotes_opt: Option<Vec<QuoteEntry>> = None;
818
819        for addr in spill.addresses.iter().take(sample_limit) {
820            sampled += 1;
821            let chunk_bytes = spill.read_chunk(addr)?;
822            let data_size = u64::try_from(chunk_bytes.len())
823                .map_err(|e| Error::InvalidData(format!("chunk size too large: {e}")))?;
824            match self
825                .get_store_quotes(addr, data_size, DATA_TYPE_CHUNK)
826                .await
827            {
828                Ok(q) => {
829                    quotes_opt = Some(q);
830                    all_already_stored = false;
831                    break;
832                }
833                Err(Error::AlreadyStored) => {
834                    debug!(
835                        "Sample chunk {} already stored; trying next address ({sampled}/{sample_limit})",
836                        hex::encode(addr)
837                    );
838                    continue;
839                }
840                Err(e) => return Err(e),
841            }
842        }
843
844        let uses_merkle = should_use_merkle(chunk_count, mode);
845
846        let quotes = match quotes_opt {
847            Some(q) => q,
848            None if all_already_stored && sampled == chunk_count => {
849                // Every address in the file was sampled and every one is
850                // already on the network — returning a zero-cost estimate is
851                // accurate in this case.
852                info!("All {chunk_count} chunks already stored; returning zero-cost estimate");
853                return Ok(UploadCostEstimate {
854                    file_size,
855                    chunk_count,
856                    storage_cost_atto: "0".into(),
857                    estimated_gas_cost_wei: "0".into(),
858                    payment_mode: if uses_merkle {
859                        PaymentMode::Merkle
860                    } else {
861                        PaymentMode::Single
862                    },
863                });
864            }
865            None => {
866                return Err(Error::CostEstimationInconclusive(format!(
867                    "sampled {sampled} chunk addresses out of {chunk_count} and every \
868                     one reported AlreadyStored; cannot infer a representative price \
869                     for the remaining chunks"
870                )));
871            }
872        };
873
874        // Use the median price × 3 (matches SingleNodePayment::from_quotes
875        // which pays 3x the median to incentivize reliable storage).
876        let mut prices: Vec<Amount> = quotes.iter().map(|(_, _, _, price)| *price).collect();
877        prices.sort();
878        let median_price = prices
879            .get(prices.len() / 2)
880            .copied()
881            .unwrap_or(Amount::ZERO);
882        let per_chunk_cost = median_price * Amount::from(3u64);
883
884        let chunk_count_u64 = u64::try_from(chunk_count).unwrap_or(u64::MAX);
885        let total_storage = per_chunk_cost * Amount::from(chunk_count_u64);
886
887        // Estimate gas cost from realistic per-transaction budgets rather
888        // than a flat per-chunk or per-wave number.
889        //
890        // - Single mode: `batch_pay` packs up to UPLOAD_WAVE_SIZE chunks'
891        //   close-group quotes into one `pay_for_quotes` call on Arbitrum.
892        //   The dominant cost is one SSTORE per entry plus base tx overhead,
893        //   so we use GAS_PER_WAVE_TX (≈1.5M) as a conservative upper bound
894        //   on a full wave and multiply by the number of waves. The previous
895        //   per-wave figure of 150k was closer to a single-entry transfer
896        //   and understated cost by 5–10x for full waves.
897        // - Merkle mode: one tx per sub-batch that verifies a merkle tree
898        //   and posts a pool commitment (GAS_PER_MERKLE_TX ≈ 500k each).
899        //
900        // Gas is priced at ARBITRUM_GAS_PRICE_WEI (~0.1 gwei, a typical
901        // Arbitrum baseline). Treat the result as advisory, not a commitment.
902        let waves = u128::try_from(chunk_count.div_ceil(UPLOAD_WAVE_SIZE)).unwrap_or(u128::MAX);
903        let merkle_batches = u128::try_from(chunk_count.div_ceil(MAX_LEAVES)).unwrap_or(u128::MAX);
904        let estimated_gas: u128 = if uses_merkle {
905            merkle_batches
906                .saturating_mul(GAS_PER_MERKLE_TX)
907                .saturating_mul(ARBITRUM_GAS_PRICE_WEI)
908        } else {
909            waves
910                .saturating_mul(GAS_PER_WAVE_TX)
911                .saturating_mul(ARBITRUM_GAS_PRICE_WEI)
912        };
913
914        info!(
915            "Estimate: {chunk_count} chunks, storage={total_storage} atto, gas~={estimated_gas} wei"
916        );
917
918        Ok(UploadCostEstimate {
919            file_size,
920            chunk_count,
921            storage_cost_atto: total_storage.to_string(),
922            estimated_gas_cost_wei: estimated_gas.to_string(),
923            payment_mode: if uses_merkle {
924                PaymentMode::Merkle
925            } else {
926                PaymentMode::Single
927            },
928        })
929    }
930
931    /// Phase 1 of external-signer upload: encrypt file and prepare chunks.
932    ///
933    /// Equivalent to [`Client::file_prepare_upload_with_visibility`] with
934    /// [`Visibility::Private`] — see that method for details.
935    pub async fn file_prepare_upload(&self, path: &Path) -> Result<PreparedUpload> {
936        self.file_prepare_upload_with_progress(path, Visibility::Private, None)
937            .await
938    }
939
940    /// Phase 1 of external-signer upload with explicit [`Visibility`] control.
941    ///
942    /// Equivalent to [`Client::file_prepare_upload_with_progress`] with
943    /// `progress: None` — see that method for details.
944    pub async fn file_prepare_upload_with_visibility(
945        &self,
946        path: &Path,
947        visibility: Visibility,
948    ) -> Result<PreparedUpload> {
949        self.file_prepare_upload_with_progress(path, visibility, None)
950            .await
951    }
952
953    /// Phase 1 of external-signer upload with progress events.
954    ///
955    /// Requires an EVM network (for contract price queries) but NOT a wallet.
956    /// Returns a [`PreparedUpload`] containing the data map, prepared chunks,
957    /// and a [`PaymentIntent`] that the external signer uses to construct
958    /// and submit the on-chain payment transaction.
959    ///
960    /// When `visibility` is [`Visibility::Public`], the serialized `DataMap`
961    /// is bundled into the payment batch as an additional chunk and its
962    /// address is recorded on the returned [`PreparedUpload`]. After
963    /// [`Client::finalize_upload`] (or `_merkle`) succeeds, that address is
964    /// surfaced via [`FileUploadResult::data_map_address`] so the uploader
965    /// can share a single address from which anyone can retrieve the file.
966    ///
967    /// When `progress` is `Some`, [`UploadEvent`]s are emitted on the channel
968    /// during encryption ([`UploadEvent::Encrypting`] / [`UploadEvent::Encrypted`])
969    /// and per-chunk quoting ([`UploadEvent::ChunkQuoted`]). Storage events are
970    /// emitted later by [`Client::finalize_upload_with_progress`] /
971    /// [`Client::finalize_upload_merkle_with_progress`].
972    ///
973    /// **Memory note:** Encryption uses disk spilling for bounded memory, but
974    /// the returned [`PreparedUpload`] holds all chunk content in memory (each
975    /// [`PreparedChunk`] contains a `Bytes` with the full chunk data). This is
976    /// inherent to the two-phase external-signer protocol — the chunks must
977    /// stay in memory until [`Client::finalize_upload`] stores them. For very
978    /// large files, prefer [`Client::file_upload`] which streams directly.
979    ///
980    /// # Errors
981    ///
982    /// Returns an error if there is insufficient disk space, the file cannot
983    /// be read, encryption fails, or quote collection fails.
984    pub async fn file_prepare_upload_with_progress(
985        &self,
986        path: &Path,
987        visibility: Visibility,
988        progress: Option<mpsc::Sender<UploadEvent>>,
989    ) -> Result<PreparedUpload> {
990        debug!(
991            "Preparing file upload for external signing (visibility={visibility:?}): {}",
992            path.display()
993        );
994
995        let file_size = std::fs::metadata(path)?.len();
996        check_disk_space_for_spill(file_size)?;
997
998        let (spill, data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
999
1000        info!(
1001            "Encrypted {} into {} chunks for external signing (spilled to disk)",
1002            path.display(),
1003            spill.len()
1004        );
1005
1006        // Read each chunk from disk and collect quotes concurrently.
1007        // Note: all PreparedChunks accumulate in memory because the external-signer
1008        // protocol requires them for finalize_upload. NOT memory-bounded for large files.
1009        let mut chunk_data: Vec<Bytes> = spill
1010            .addresses
1011            .iter()
1012            .map(|addr| spill.read_chunk(addr))
1013            .collect::<std::result::Result<Vec<_>, _>>()?;
1014
1015        // For public uploads, bundle the serialized DataMap as an extra chunk
1016        // in the same payment batch. This lets the external signer pay for
1017        // the data chunks and the DataMap chunk in one flow, and lets the
1018        // finalize step return the DataMap's chunk address as the shareable
1019        // retrieval address.
1020        let data_map_address = match visibility {
1021            Visibility::Private => None,
1022            Visibility::Public => {
1023                let serialized = rmp_serde::to_vec(&data_map).map_err(|e| {
1024                    Error::Serialization(format!("Failed to serialize DataMap: {e}"))
1025                })?;
1026                let bytes = Bytes::from(serialized);
1027                let address = compute_address(&bytes);
1028                info!(
1029                    "Public upload: bundling DataMap chunk ({} bytes) at address {}",
1030                    bytes.len(),
1031                    hex::encode(address)
1032                );
1033                chunk_data.push(bytes);
1034                Some(address)
1035            }
1036        };
1037
1038        let chunk_count = chunk_data.len();
1039
1040        if let Some(ref tx) = progress {
1041            let _ = tx
1042                .send(UploadEvent::Encrypted {
1043                    total_chunks: chunk_count,
1044                })
1045                .await;
1046        }
1047
1048        let payment_info = if should_use_merkle(chunk_count, PaymentMode::Auto) {
1049            // Merkle path: build tree, collect candidate pools, return for external payment.
1050            info!("Using merkle batch preparation for {chunk_count} file chunks");
1051
1052            let addresses: Vec<[u8; 32]> = chunk_data.iter().map(|c| compute_address(c)).collect();
1053
1054            let avg_size =
1055                chunk_data.iter().map(bytes::Bytes::len).sum::<usize>() / chunk_count.max(1);
1056            let avg_size_u64 = u64::try_from(avg_size).unwrap_or(0);
1057
1058            let prepared_batch = self
1059                .prepare_merkle_batch_external(&addresses, DATA_TYPE_CHUNK, avg_size_u64)
1060                .await?;
1061
1062            info!(
1063                "File prepared for external merkle signing: {} chunks, depth={} ({})",
1064                chunk_count,
1065                prepared_batch.depth,
1066                path.display()
1067            );
1068
1069            ExternalPaymentInfo::Merkle {
1070                prepared_batch,
1071                chunk_contents: chunk_data,
1072                chunk_addresses: addresses,
1073            }
1074        } else {
1075            // Wave-batch path: collect quotes per chunk concurrently, emitting
1076            // a `ChunkQuoted` event after each completion so callers can drive
1077            // a progress bar through the (slow) quoting phase.
1078            // Clamp fan-out to chunk_count so a partial wave doesn't
1079            // pay for slots it can't fill (see PERF-RESULTS.md).
1080            let quote_limiter = self.controller().quote.clone();
1081            let quote_concurrency = quote_limiter.current().min(chunk_count.max(1));
1082            let mut quote_stream = stream::iter(chunk_data)
1083                .map(|content| {
1084                    let limiter = quote_limiter.clone();
1085                    async move {
1086                        observe_op(
1087                            &limiter,
1088                            || async move { self.prepare_chunk_payment(content).await },
1089                            classify_error,
1090                        )
1091                        .await
1092                    }
1093                })
1094                .buffer_unordered(quote_concurrency);
1095
1096            let mut prepared_chunks = Vec::with_capacity(spill.len());
1097            let mut quoted = 0usize;
1098            while let Some(result) = quote_stream.next().await {
1099                if let Some(prepared) = result? {
1100                    prepared_chunks.push(prepared);
1101                }
1102                quoted += 1;
1103                if let Some(ref tx) = progress {
1104                    let _ = tx.try_send(UploadEvent::ChunkQuoted {
1105                        quoted,
1106                        total: chunk_count,
1107                    });
1108                }
1109            }
1110
1111            // Surface the "DataMap chunk was already on the network" case
1112            // so debugging "why is data_map_address set but no storage cost
1113            // appears for it?" doesn't require reading the source. See the
1114            // `data_map_address` doc comment for why this is still a valid
1115            // `Some(addr)` outcome.
1116            if let Some(addr) = data_map_address {
1117                if !prepared_chunks.iter().any(|c| c.address == addr) {
1118                    info!(
1119                        "Public upload: DataMap chunk {} was already stored \
1120                         on the network — address is retrievable without a \
1121                         new payment",
1122                        hex::encode(addr)
1123                    );
1124                }
1125            }
1126
1127            let payment_intent = PaymentIntent::from_prepared_chunks(&prepared_chunks);
1128
1129            info!(
1130                "File prepared for external signing: {} chunks, total {} atto ({})",
1131                prepared_chunks.len(),
1132                payment_intent.total_amount,
1133                path.display()
1134            );
1135
1136            ExternalPaymentInfo::WaveBatch {
1137                prepared_chunks,
1138                payment_intent,
1139            }
1140        };
1141
1142        Ok(PreparedUpload {
1143            data_map,
1144            payment_info,
1145            data_map_address,
1146        })
1147    }
1148
1149    /// Phase 2 of external-signer upload (wave-batch): finalize with externally-signed tx hashes.
1150    ///
1151    /// Takes a [`PreparedUpload`] that used wave-batch payment and a map
1152    /// of `quote_hash -> tx_hash` provided by the external signer after on-chain
1153    /// payment. Builds payment proofs and stores chunks on the network.
1154    ///
1155    /// # Errors
1156    ///
1157    /// Returns an error if the prepared upload used merkle payment (use
1158    /// [`Client::finalize_upload_merkle`] instead), proof construction fails,
1159    /// or any chunk cannot be stored.
1160    pub async fn finalize_upload(
1161        &self,
1162        prepared: PreparedUpload,
1163        tx_hash_map: &HashMap<QuoteHash, TxHash>,
1164    ) -> Result<FileUploadResult> {
1165        self.finalize_upload_with_progress(prepared, tx_hash_map, None)
1166            .await
1167    }
1168
1169    /// Phase 2 of external-signer upload (wave-batch) with progress events.
1170    ///
1171    /// Same as [`Client::finalize_upload`] but emits [`UploadEvent::ChunkStored`]
1172    /// on the provided channel as each chunk is successfully stored.
1173    ///
1174    /// # Errors
1175    ///
1176    /// Same as [`Client::finalize_upload`].
1177    pub async fn finalize_upload_with_progress(
1178        &self,
1179        prepared: PreparedUpload,
1180        tx_hash_map: &HashMap<QuoteHash, TxHash>,
1181        progress: Option<mpsc::Sender<UploadEvent>>,
1182    ) -> Result<FileUploadResult> {
1183        let data_map_address = prepared.data_map_address;
1184        match prepared.payment_info {
1185            ExternalPaymentInfo::WaveBatch {
1186                prepared_chunks,
1187                payment_intent: _,
1188            } => {
1189                let total_chunks = prepared_chunks.len();
1190                let paid_chunks = finalize_batch_payment(prepared_chunks, tx_hash_map)?;
1191                let wave_result = self
1192                    .store_paid_chunks_with_events(paid_chunks, progress.as_ref(), 0, total_chunks)
1193                    .await;
1194                if !wave_result.failed.is_empty() {
1195                    let failed_count = wave_result.failed.len();
1196                    let stored_count = wave_result.stored.len();
1197                    return Err(Error::PartialUpload {
1198                        stored: wave_result.stored.clone(),
1199                        stored_count,
1200                        failed: wave_result.failed,
1201                        failed_count,
1202                        total_chunks: stored_count + failed_count,
1203                        reason: "finalize_upload: chunk storage failed after retries".into(),
1204                    });
1205                }
1206                let chunks_stored = wave_result.stored.len();
1207
1208                info!("External-signer upload finalized: {chunks_stored} chunks stored");
1209
1210                let mut stats = WaveAggregateStats::default();
1211                stats.absorb(&wave_result);
1212
1213                Ok(FileUploadResult {
1214                    data_map: prepared.data_map,
1215                    chunks_stored,
1216                    chunks_failed: 0,
1217                    total_chunks: chunks_stored,
1218                    payment_mode_used: PaymentMode::Single,
1219                    storage_cost_atto: "0".into(),
1220                    gas_cost_wei: 0,
1221                    data_map_address,
1222                    chunk_attempts_total: stats.chunk_attempts_total,
1223                    store_durations_ms: stats.store_durations_ms,
1224                    retries_histogram: stats.retries_histogram,
1225                })
1226            }
1227            ExternalPaymentInfo::Merkle { .. } => Err(Error::Payment(
1228                "Cannot finalize merkle upload with wave-batch tx hashes. \
1229                 Use finalize_upload_merkle() instead."
1230                    .to_string(),
1231            )),
1232        }
1233    }
1234
1235    /// Phase 2 of external-signer upload (merkle): finalize with winner pool hash.
1236    ///
1237    /// Takes a [`PreparedUpload`] that used merkle payment and the `winner_pool_hash`
1238    /// returned by the on-chain merkle payment transaction. Generates proofs and
1239    /// stores chunks on the network.
1240    ///
1241    /// # Errors
1242    ///
1243    /// Returns an error if the prepared upload used wave-batch payment (use
1244    /// [`Client::finalize_upload`] instead), proof generation fails,
1245    /// or any chunk cannot be stored.
1246    pub async fn finalize_upload_merkle(
1247        &self,
1248        prepared: PreparedUpload,
1249        winner_pool_hash: [u8; 32],
1250    ) -> Result<FileUploadResult> {
1251        self.finalize_upload_merkle_with_progress(prepared, winner_pool_hash, None)
1252            .await
1253    }
1254
1255    /// Phase 2 of external-signer upload (merkle) with progress events.
1256    ///
1257    /// Same as [`Client::finalize_upload_merkle`] but emits [`UploadEvent::ChunkStored`]
1258    /// on the provided channel as each chunk is successfully stored.
1259    ///
1260    /// # Errors
1261    ///
1262    /// Same as [`Client::finalize_upload_merkle`].
1263    pub async fn finalize_upload_merkle_with_progress(
1264        &self,
1265        prepared: PreparedUpload,
1266        winner_pool_hash: [u8; 32],
1267        progress: Option<mpsc::Sender<UploadEvent>>,
1268    ) -> Result<FileUploadResult> {
1269        let data_map_address = prepared.data_map_address;
1270        match prepared.payment_info {
1271            ExternalPaymentInfo::Merkle {
1272                prepared_batch,
1273                chunk_contents,
1274                chunk_addresses,
1275            } => {
1276                let batch_result = finalize_merkle_batch(prepared_batch, winner_pool_hash)?;
1277                let (chunks_stored, stats) = self
1278                    .merkle_upload_chunks(
1279                        chunk_contents,
1280                        chunk_addresses,
1281                        &batch_result,
1282                        progress.as_ref(),
1283                    )
1284                    .await?;
1285
1286                info!("External-signer merkle upload finalized: {chunks_stored} chunks stored");
1287
1288                Ok(FileUploadResult {
1289                    data_map: prepared.data_map,
1290                    chunks_stored,
1291                    chunks_failed: 0,
1292                    total_chunks: chunks_stored,
1293                    payment_mode_used: PaymentMode::Merkle,
1294                    storage_cost_atto: "0".into(),
1295                    gas_cost_wei: 0,
1296                    data_map_address,
1297                    chunk_attempts_total: stats.chunk_attempts_total,
1298                    store_durations_ms: stats.store_durations_ms,
1299                    retries_histogram: stats.retries_histogram,
1300                })
1301            }
1302            ExternalPaymentInfo::WaveBatch { .. } => Err(Error::Payment(
1303                "Cannot finalize wave-batch upload with merkle winner hash. \
1304                 Use finalize_upload() instead."
1305                    .to_string(),
1306            )),
1307        }
1308    }
1309
1310    /// Upload a file with a specific payment mode.
1311    ///
1312    /// Before encryption, checks that the temp directory has enough free
1313    /// disk space for the spilled chunks (~1.1× source file size).
1314    ///
1315    /// Encrypted chunks are spilled to a temp directory during encryption
1316    /// so that only their 32-byte addresses stay in memory. At upload time,
1317    /// chunks are read back one wave at a time (~64 × 4 MB ≈ 256 MB peak).
1318    ///
1319    /// # Errors
1320    ///
1321    /// Returns an error if there is insufficient disk space, the file cannot
1322    /// be read, encryption fails, or any chunk cannot be stored.
1323    #[allow(clippy::too_many_lines)]
1324    pub async fn file_upload_with_mode(
1325        &self,
1326        path: &Path,
1327        mode: PaymentMode,
1328    ) -> Result<FileUploadResult> {
1329        self.file_upload_with_progress(path, mode, None).await
1330    }
1331
1332    /// Upload a file with progress events sent to the given channel.
1333    ///
1334    /// Same as [`Client::file_upload_with_mode`] but sends [`UploadEvent`]s to the
1335    /// provided channel for UI progress feedback.
1336    #[allow(clippy::too_many_lines)]
1337    pub async fn file_upload_with_progress(
1338        &self,
1339        path: &Path,
1340        mode: PaymentMode,
1341        progress: Option<mpsc::Sender<UploadEvent>>,
1342    ) -> Result<FileUploadResult> {
1343        debug!(
1344            "Streaming file upload with mode {mode:?}: {}",
1345            path.display()
1346        );
1347
1348        // Pre-flight: verify enough temp disk space for the chunk spill.
1349        let file_size = std::fs::metadata(path)?.len();
1350        check_disk_space_for_spill(file_size)?;
1351
1352        // Phase 1: Encrypt file and spill chunks to temp directory.
1353        // Only 32-byte addresses stay in memory — chunk data lives on disk.
1354        let (spill, data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
1355
1356        let chunk_count = spill.len();
1357        info!(
1358            "Encrypted {} into {chunk_count} chunks (spilled to disk)",
1359            path.display()
1360        );
1361        if let Some(ref tx) = progress {
1362            let _ = tx
1363                .send(UploadEvent::Encrypted {
1364                    total_chunks: chunk_count,
1365                })
1366                .await;
1367        }
1368
1369        // Phase 2: Decide payment mode and upload in waves from disk.
1370        //
1371        // For the merkle path, attempt to resume from a cached
1372        // receipt before paying again. The cache is keyed by the
1373        // CANONICAL source path so `./foo`, `/abs/foo`, and any
1374        // symlink alias all resolve to the same cache entry — a
1375        // crash-and-retry from a different cwd or via a different
1376        // alias still hits the receipt. Canonicalize may fail (the
1377        // file could have been moved between phase 1 and here); we
1378        // fall back to the display string in that case, which
1379        // preserves pre-fix behaviour rather than dropping cache
1380        // resume entirely.
1381        let file_path_key = std::fs::canonicalize(path)
1382            .map(|p| p.display().to_string())
1383            .unwrap_or_else(|_| path.display().to_string());
1384        let (chunks_stored, actual_mode, storage_cost_atto, gas_cost_wei, stats) = if self
1385            .should_use_merkle(chunk_count, mode)
1386        {
1387            info!("Using merkle batch payment for {chunk_count} file chunks");
1388
1389            let batch_result = if let Some((_cache_path, cached)) =
1390                crate::data::client::cached_merkle::try_load_for_file(&file_path_key)
1391            {
1392                // Validate the cache matches this upload. If the
1393                // file was edited between attempts the cached
1394                // proofs would no longer be valid for the new
1395                // chunk addresses; in that case drop the cache
1396                // and pay fresh.
1397                let addresses_match = spill
1398                    .addresses
1399                    .iter()
1400                    .all(|addr| cached.proofs.contains_key(addr));
1401                if addresses_match && cached.proofs.len() == chunk_count {
1402                    info!(
1403                        "Skipping merkle payment phase; resuming with \
1404                             cached proofs ({} chunks)",
1405                        cached.proofs.len()
1406                    );
1407                    Ok(cached)
1408                } else {
1409                    info!(
1410                        "Cached merkle receipt does not match current file \
1411                             content (cached={}, file={chunk_count}). \
1412                             Discarding cache and paying fresh.",
1413                        cached.proofs.len()
1414                    );
1415                    crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
1416                    self.pay_for_merkle_batch(
1417                        &spill.addresses,
1418                        DATA_TYPE_CHUNK,
1419                        spill.avg_chunk_size(),
1420                    )
1421                    .await
1422                    .inspect(|result| {
1423                        crate::data::client::cached_merkle::try_save(&file_path_key, result);
1424                    })
1425                }
1426            } else {
1427                self.pay_for_merkle_batch(&spill.addresses, DATA_TYPE_CHUNK, spill.avg_chunk_size())
1428                    .await
1429                    .inspect(|result| {
1430                        // Save BEFORE the store phase so a crash
1431                        // mid-upload leaves a resumable receipt.
1432                        crate::data::client::cached_merkle::try_save(&file_path_key, result);
1433                    })
1434            };
1435
1436            let batch_result = match batch_result {
1437                Ok(result) => result,
1438                Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => {
1439                    info!("Merkle needs more peers ({msg}), falling back to wave-batch");
1440                    let (stored, sc, gc, fb_stats) = self
1441                        .upload_waves_single(&spill, progress.as_ref(), Some(&file_path_key))
1442                        .await?;
1443                    // Full file success on the single-node fallback path:
1444                    // the cached single-node receipt (if any) is no longer
1445                    // needed.
1446                    crate::data::client::cached_single::try_delete_for_file(&file_path_key);
1447                    return Ok(FileUploadResult {
1448                        data_map,
1449                        chunks_stored: stored,
1450                        chunks_failed: 0,
1451                        total_chunks: chunk_count,
1452                        payment_mode_used: PaymentMode::Single,
1453                        storage_cost_atto: sc,
1454                        gas_cost_wei: gc,
1455                        data_map_address: None,
1456                        chunk_attempts_total: fb_stats.chunk_attempts_total,
1457                        store_durations_ms: fb_stats.store_durations_ms,
1458                        retries_histogram: fb_stats.retries_histogram,
1459                    });
1460                }
1461                Err(e) => return Err(e),
1462            };
1463
1464            let (stored, sc, gc, stats) = self
1465                .upload_waves_merkle(&spill, &batch_result, progress.as_ref())
1466                .await?;
1467            // Upload succeeded end-to-end; the cached receipt is
1468            // no longer needed.
1469            crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
1470            (stored, PaymentMode::Merkle, sc, gc, stats)
1471        } else {
1472            let (stored, sc, gc, stats) = self
1473                .upload_waves_single(&spill, progress.as_ref(), Some(&file_path_key))
1474                .await?;
1475            // Full file success: drop any cached single-node receipt.
1476            crate::data::client::cached_single::try_delete_for_file(&file_path_key);
1477            (stored, PaymentMode::Single, sc, gc, stats)
1478        };
1479
1480        info!(
1481            "File uploaded with {actual_mode:?}: {chunks_stored} chunks stored ({})",
1482            path.display()
1483        );
1484
1485        Ok(FileUploadResult {
1486            data_map,
1487            chunks_stored,
1488            chunks_failed: 0,
1489            total_chunks: chunk_count,
1490            payment_mode_used: actual_mode,
1491            storage_cost_atto,
1492            gas_cost_wei,
1493            data_map_address: None,
1494            chunk_attempts_total: stats.chunk_attempts_total,
1495            store_durations_ms: stats.store_durations_ms,
1496            retries_histogram: stats.retries_histogram,
1497        })
1498    }
1499
1500    /// Encrypt a file and spill chunks to a temp directory.
1501    ///
1502    /// Logs progress every 100 chunks so users get feedback during
1503    /// multi-GB encryptions.
1504    ///
1505    /// Returns the spill buffer (addresses on disk) and the `DataMap`.
1506    async fn encrypt_file_to_spill(
1507        &self,
1508        path: &Path,
1509        progress: Option<&mpsc::Sender<UploadEvent>>,
1510    ) -> Result<(ChunkSpill, DataMap)> {
1511        let (mut chunk_rx, datamap_rx, handle) = spawn_file_encryption(path.to_path_buf())?;
1512
1513        let mut spill = ChunkSpill::new()?;
1514        while let Some(content) = chunk_rx.recv().await {
1515            spill.push(&content)?;
1516            let chunks_done = spill.len();
1517            if let Some(tx) = progress {
1518                if chunks_done.is_multiple_of(10) {
1519                    let _ = tx.send(UploadEvent::Encrypting { chunks_done }).await;
1520                }
1521            }
1522            if chunks_done % 100 == 0 {
1523                let mb = spill.total_bytes() / (1024 * 1024);
1524                info!(
1525                    "Encryption progress: {chunks_done} chunks spilled ({mb} MB) — {}",
1526                    path.display()
1527                );
1528            }
1529        }
1530
1531        // Await encryption completion to catch errors before paying.
1532        handle
1533            .await
1534            .map_err(|e| Error::Encryption(format!("encryption task panicked: {e}")))?
1535            .map_err(|e| Error::Encryption(format!("encryption failed: {e}")))?;
1536
1537        let data_map = datamap_rx
1538            .await
1539            .map_err(|_| Error::Encryption("no DataMap from encryption thread".to_string()))?;
1540
1541        Ok((spill, data_map))
1542    }
1543
1544    /// Upload chunks from a spill using wave-based per-chunk (single) payments.
1545    ///
1546    /// Reads one wave at a time from disk, prepares quotes, pays, and stores.
1547    /// Peak memory: ~`UPLOAD_WAVE_SIZE × MAX_CHUNK_SIZE` (~256 MB).
1548    ///
1549    /// Returns `(chunks_stored, storage_cost_atto, gas_cost_wei)`.
1550    async fn upload_waves_single(
1551        &self,
1552        spill: &ChunkSpill,
1553        progress: Option<&mpsc::Sender<UploadEvent>>,
1554        resume_key: Option<&str>,
1555    ) -> Result<(usize, String, u128, WaveAggregateStats)> {
1556        let mut total_stored = 0usize;
1557        let mut total_storage = Amount::ZERO;
1558        let mut total_gas: u128 = 0;
1559        let mut agg_stats = WaveAggregateStats::default();
1560        let total_chunks = spill.len();
1561        let waves: Vec<&[[u8; 32]]> = spill.waves().collect();
1562        let wave_count = waves.len();
1563
1564        for (wave_idx, wave_addrs) in waves.into_iter().enumerate() {
1565            let wave_num = wave_idx + 1;
1566            let wave_data: Vec<Bytes> = wave_addrs
1567                .iter()
1568                .map(|addr| spill.read_chunk(addr))
1569                .collect::<Result<Vec<_>>>()?;
1570
1571            info!(
1572                "Wave {wave_num}/{wave_count}: quoting {} chunks — {total_stored}/{total_chunks} stored so far",
1573                wave_data.len()
1574            );
1575            if let Some(tx) = progress {
1576                let _ = tx
1577                    .send(UploadEvent::QuotingChunks {
1578                        wave: wave_num,
1579                        total_waves: wave_count,
1580                        chunks_in_wave: wave_data.len(),
1581                    })
1582                    .await;
1583            }
1584            let (addresses, wave_storage, wave_gas, wave_stats) = self
1585                .batch_upload_chunks_with_events(
1586                    wave_data,
1587                    progress,
1588                    total_stored,
1589                    total_chunks,
1590                    resume_key,
1591                )
1592                .await?;
1593            total_stored += addresses.len();
1594            if let Ok(cost) = wave_storage.parse::<Amount>() {
1595                total_storage += cost;
1596            }
1597            total_gas = total_gas.saturating_add(wave_gas);
1598            // Merge per-call stats (each call already aggregates across the
1599            // waves it ran internally, so a simple sum/extend is correct).
1600            agg_stats.chunk_attempts_total = agg_stats
1601                .chunk_attempts_total
1602                .saturating_add(wave_stats.chunk_attempts_total);
1603            agg_stats
1604                .store_durations_ms
1605                .extend(wave_stats.store_durations_ms);
1606            for (slot, count) in agg_stats
1607                .retries_histogram
1608                .iter_mut()
1609                .zip(wave_stats.retries_histogram.iter())
1610            {
1611                *slot = slot.saturating_add(*count);
1612            }
1613            if let Some(tx) = progress {
1614                let _ = tx
1615                    .send(UploadEvent::WaveComplete {
1616                        wave: wave_num,
1617                        total_waves: wave_count,
1618                        stored_so_far: total_stored,
1619                        total: total_chunks,
1620                    })
1621                    .await;
1622            }
1623        }
1624
1625        Ok((
1626            total_stored,
1627            total_storage.to_string(),
1628            total_gas,
1629            agg_stats,
1630        ))
1631    }
1632
1633    /// Upload chunks from a spill using pre-computed merkle proofs.
1634    ///
1635    /// Reads one wave at a time from disk, pairs each chunk with its proof,
1636    /// and uploads concurrently. Peak memory: ~`UPLOAD_WAVE_SIZE × MAX_CHUNK_SIZE`.
1637    ///
1638    /// Returns `(chunks_stored, storage_cost_atto, gas_cost_wei)`.
1639    /// Costs come from the `batch_result` which was populated during payment.
1640    async fn upload_waves_merkle(
1641        &self,
1642        spill: &ChunkSpill,
1643        batch_result: &MerkleBatchPaymentResult,
1644        progress: Option<&mpsc::Sender<UploadEvent>>,
1645    ) -> Result<(usize, String, u128, WaveAggregateStats)> {
1646        let mut total_stored = 0usize;
1647        let total_chunks = spill.len();
1648        let waves: Vec<&[[u8; 32]]> = spill.waves().collect();
1649        let wave_count = waves.len();
1650        let mut stored_addresses: Vec<[u8; 32]> = Vec::new();
1651        let mut agg_stats = WaveAggregateStats::default();
1652
1653        for (wave_idx, wave_addrs) in waves.into_iter().enumerate() {
1654            let wave_num = wave_idx + 1;
1655            let wave = spill.read_wave(wave_addrs)?;
1656
1657            info!(
1658                "Wave {wave_num}/{wave_count}: storing {} chunks (merkle) — {total_stored}/{total_chunks} stored so far",
1659                wave.len()
1660            );
1661
1662            let store_limiter = self.controller().store.clone();
1663            // Clamp fan-out to wave size — partial last wave should
1664            // not pay for extra slots (see PERF-RESULTS.md).
1665            let store_concurrency = store_limiter.current().min(wave.len().max(1));
1666            let mut upload_stream = stream::iter(wave.into_iter().map(|(content, addr)| {
1667                let proof_bytes = batch_result.proofs.get(&addr).cloned();
1668                let limiter = store_limiter.clone();
1669                async move {
1670                    let started = std::time::Instant::now();
1671                    let proof = proof_bytes.ok_or_else(|| {
1672                        (
1673                            addr,
1674                            Error::Payment(format!(
1675                                "Missing merkle proof for chunk {}",
1676                                hex::encode(addr)
1677                            )),
1678                            started,
1679                        )
1680                    })?;
1681                    let peers = self
1682                        .close_group_peers(&addr)
1683                        .await
1684                        .map_err(|e| (addr, e, started))?;
1685                    observe_op(
1686                        &limiter,
1687                        || async move {
1688                            self.chunk_put_to_close_group(content, proof, &peers).await
1689                        },
1690                        classify_error,
1691                    )
1692                    .await
1693                    .map(|_| (addr, started))
1694                    .map_err(|e| (addr, e, started))
1695                }
1696            }))
1697            .buffer_unordered(store_concurrency);
1698
1699            while let Some(result) = upload_stream.next().await {
1700                match result {
1701                    Ok((addr, started)) => {
1702                        let duration_ms =
1703                            u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX);
1704                        agg_stats.store_durations_ms.push(duration_ms);
1705                        agg_stats.chunk_attempts_total =
1706                            agg_stats.chunk_attempts_total.saturating_add(1);
1707                        agg_stats.retries_histogram[0] =
1708                            agg_stats.retries_histogram[0].saturating_add(1);
1709                        stored_addresses.push(addr);
1710                        total_stored += 1;
1711                        info!("Stored {total_stored}/{total_chunks}");
1712                        if let Some(tx) = progress {
1713                            let _ = tx
1714                                .send(UploadEvent::ChunkStored {
1715                                    stored: total_stored,
1716                                    total: total_chunks,
1717                                })
1718                                .await;
1719                        }
1720                    }
1721                    Err((addr, e, _started)) => {
1722                        warn!("merkle upload failed for chunk {}: {e}", hex::encode(addr));
1723                        return Err(Error::PartialUpload {
1724                            stored: stored_addresses,
1725                            stored_count: total_stored,
1726                            failed: vec![(addr, e.to_string())],
1727                            failed_count: 1,
1728                            total_chunks,
1729                            reason: format!("merkle chunk upload failed: {e}"),
1730                        });
1731                    }
1732                }
1733            }
1734
1735            if let Some(tx) = progress {
1736                let _ = tx
1737                    .send(UploadEvent::WaveComplete {
1738                        wave: wave_num,
1739                        total_waves: wave_count,
1740                        stored_so_far: total_stored,
1741                        total: total_chunks,
1742                    })
1743                    .await;
1744            }
1745        }
1746
1747        Ok((
1748            total_stored,
1749            batch_result.storage_cost_atto.clone(),
1750            batch_result.gas_cost_wei,
1751            agg_stats,
1752        ))
1753    }
1754
1755    /// Download and decrypt a file from the network, writing it to disk.
1756    ///
1757    /// Uses `streaming_decrypt` so that only one batch of chunks lives in
1758    /// memory at a time, avoiding OOM on large files. Chunks are fetched
1759    /// concurrently within each batch, then decrypted data is written to
1760    /// disk incrementally.
1761    ///
1762    /// Returns the number of bytes written.
1763    ///
1764    /// # Panics
1765    ///
1766    /// Requires a multi-threaded Tokio runtime (`flavor = "multi_thread"`).
1767    /// Will panic if called from a `current_thread` runtime because
1768    /// `streaming_decrypt` takes a synchronous callback that must bridge
1769    /// back to async via `block_in_place`.
1770    ///
1771    /// # Errors
1772    ///
1773    /// Returns an error if any chunk cannot be retrieved, decryption fails,
1774    /// or the file cannot be written.
1775    #[allow(clippy::unused_async)]
1776    pub async fn file_download(&self, data_map: &DataMap, output: &Path) -> Result<u64> {
1777        self.file_download_with_progress(data_map, output, None)
1778            .await
1779    }
1780
1781    /// Download and decrypt a file with progress events.
1782    ///
1783    /// Same as [`Client::file_download`] but sends [`DownloadEvent`]s for UI feedback.
1784    ///
1785    /// Progress reporting:
1786    /// 1. Resolves hierarchical DataMaps to the root level first (reports as
1787    ///    `ChunksFetched` with `total: 0` during resolution)
1788    /// 2. Once the root DataMap is known, sends `total_chunks` with accurate count
1789    /// 3. Fetches data chunks with accurate `fetched/total` progress
1790    #[allow(clippy::unused_async)]
1791    pub async fn file_download_with_progress(
1792        &self,
1793        data_map: &DataMap,
1794        output: &Path,
1795        progress: Option<mpsc::Sender<DownloadEvent>>,
1796    ) -> Result<u64> {
1797        debug!("Downloading file to {}", output.display());
1798
1799        let handle = Handle::current();
1800
1801        // Phase 1: Resolve hierarchical DataMap to root level.
1802        // This fetches child DataMap chunks (typically 3) to discover the real chunk count.
1803        let root_map = if data_map.is_child() {
1804            let dm_chunks = data_map.len();
1805            if let Some(ref tx) = progress {
1806                let _ = tx.try_send(DownloadEvent::ResolvingDataMap {
1807                    total_map_chunks: dm_chunks,
1808                });
1809            }
1810
1811            let resolve_progress = progress.clone();
1812            let resolve_counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
1813
1814            let resolved = tokio::task::block_in_place(|| {
1815                let counter_ref = resolve_counter.clone();
1816                let progress_ref = resolve_progress.clone();
1817                let fetch_limiter = self.controller().fetch.clone();
1818                let fetch = |batch: &[(usize, XorName)]| {
1819                    let batch_owned: Vec<(usize, XorName)> = batch.to_vec();
1820                    let counter = counter_ref.clone();
1821                    let prog = progress_ref.clone();
1822                    let limiter = fetch_limiter.clone();
1823                    handle.block_on(async {
1824                        // Clamp fan-out to batch size — self_encryption
1825                        // requests small batches (default 10), so a
1826                        // higher cap from the controller would be slots
1827                        // we never fill (see PERF-RESULTS.md).
1828                        let cap = limiter.current().min(batch_owned.len().max(1));
1829                        let mut stream = futures::stream::iter(batch_owned)
1830                            .map(|(idx, hash)| {
1831                                let addr = hash.0;
1832                                let limiter = limiter.clone();
1833                                async move {
1834                                    let result = observe_op(
1835                                        &limiter,
1836                                        || async move { self.chunk_get(&addr).await },
1837                                        classify_error,
1838                                    )
1839                                    .await;
1840                                    (idx, hash, result)
1841                                }
1842                            })
1843                            .buffer_unordered(cap);
1844                        let mut results = Vec::new();
1845                        while let Some((idx, hash, result)) =
1846                            futures::StreamExt::next(&mut stream).await
1847                        {
1848                            let chunk = result
1849                                .map_err(|e| {
1850                                    self_encryption::Error::Generic(format!(
1851                                        "DataMap resolution failed: {e}"
1852                                    ))
1853                                })?
1854                                .ok_or_else(|| {
1855                                    self_encryption::Error::Generic(format!(
1856                                        "DataMap chunk not found: {}",
1857                                        hex::encode(hash.0)
1858                                    ))
1859                                })?;
1860                            results.push((idx, chunk.content));
1861                            let fetched =
1862                                counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
1863                            if let Some(ref tx) = prog {
1864                                let _ = tx.try_send(DownloadEvent::MapChunkFetched { fetched });
1865                            }
1866                        }
1867                        // CRITICAL: self_encryption::get_root_data_map_parallel
1868                        // pairs the returned Vec POSITIONALLY with the input
1869                        // hashes via .zip() and discards our idx field. We
1870                        // used buffer_unordered, so completions arrive in
1871                        // arbitrary order. Sort by idx to restore the input
1872                        // order before returning, otherwise chunk bytes get
1873                        // paired with the wrong hashes and the root data
1874                        // map is corrupted.
1875                        results.sort_by_key(|(idx, _)| *idx);
1876                        Ok(results)
1877                    })
1878                };
1879                get_root_data_map_parallel(data_map.clone(), &fetch)
1880            })
1881            .map_err(|e| Error::Encryption(format!("DataMap resolution failed: {e}")))?;
1882
1883            info!(
1884                "Resolved hierarchical DataMap: {} data chunks",
1885                resolved.len()
1886            );
1887            resolved
1888        } else {
1889            data_map.clone()
1890        };
1891
1892        // Phase 2: Now we know the real chunk count.
1893        let total_chunks = root_map.len();
1894        if let Some(ref tx) = progress {
1895            let _ = tx.try_send(DownloadEvent::DataMapResolved { total_chunks });
1896        }
1897
1898        // Phase 3: Fetch and decrypt data chunks with accurate progress.
1899        let fetched_counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
1900        let fetched_for_closure = fetched_counter.clone();
1901        let progress_for_closure = progress.clone();
1902
1903        let fetch_limiter_outer = self.controller().fetch.clone();
1904        let usable_memory = usable_memory_bytes();
1905        let configured_batch_floor = stream_decrypt_batch_size();
1906        let fetch_cap = fetch_limiter_outer.current();
1907        let decrypt_batch_size = adaptive_stream_decrypt_batch_size(
1908            total_chunks,
1909            fetch_cap,
1910            configured_batch_floor,
1911            usable_memory,
1912        );
1913        info!(
1914            total_chunks,
1915            fetch_cap,
1916            configured_batch_floor,
1917            ?usable_memory,
1918            decrypt_batch_size,
1919            "Selected adaptive stream decrypt batch size"
1920        );
1921
1922        let stream = streaming_decrypt_with_batch_size(
1923            &root_map,
1924            |batch: &[(usize, XorName)]| {
1925                let batch_owned: Vec<(usize, XorName)> = batch.to_vec();
1926                let fetched_ref = fetched_for_closure.clone();
1927                let progress_ref = progress_for_closure.clone();
1928                let fetch_limiter = fetch_limiter_outer.clone();
1929
1930                tokio::task::block_in_place(|| {
1931                    handle.block_on(async {
1932                        // Clamp fan-out to batch size — see PERF-RESULTS.md.
1933                        let cap = fetch_limiter.current().min(batch_owned.len().max(1));
1934                        let mut stream = futures::stream::iter(batch_owned)
1935                            .map(|(idx, hash)| {
1936                                let addr = hash.0;
1937                                let limiter = fetch_limiter.clone();
1938                                async move {
1939                                    let result = observe_op(
1940                                        &limiter,
1941                                        || async move { self.chunk_get(&addr).await },
1942                                        classify_error,
1943                                    )
1944                                    .await;
1945                                    (idx, hash, result)
1946                                }
1947                            })
1948                            .buffer_unordered(cap);
1949
1950                        let mut results = Vec::new();
1951                        while let Some((idx, hash, result)) =
1952                            futures::StreamExt::next(&mut stream).await
1953                        {
1954                            let addr_hex = hex::encode(hash.0);
1955                            let chunk = result
1956                                .map_err(|e| {
1957                                    self_encryption::Error::Generic(format!(
1958                                        "Network fetch failed for {addr_hex}: {e}"
1959                                    ))
1960                                })?
1961                                .ok_or_else(|| {
1962                                    self_encryption::Error::Generic(format!(
1963                                        "Chunk not found: {addr_hex}"
1964                                    ))
1965                                })?;
1966                            results.push((idx, chunk.content));
1967                            let fetched =
1968                                fetched_ref.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
1969                            info!("Downloaded {fetched}/{total_chunks}");
1970                            if let Some(ref tx) = progress_ref {
1971                                let _ = tx.try_send(DownloadEvent::ChunksFetched {
1972                                    fetched,
1973                                    total: total_chunks,
1974                                });
1975                            }
1976                        }
1977                        // streaming_decrypt itself sort_by_keys before
1978                        // zipping, but the same closure is also passed
1979                        // through get_root_data_map_parallel internally
1980                        // (see self_encryption::stream_decrypt.rs::new), and
1981                        // THAT path zips positionally without sorting. Sort
1982                        // here so both consumers see input order.
1983                        results.sort_by_key(|(idx, _)| *idx);
1984                        Ok(results)
1985                    })
1986                })
1987            },
1988            decrypt_batch_size,
1989        )
1990        .map_err(|e| Error::Encryption(format!("streaming decrypt failed: {e}")))?;
1991
1992        // Write decrypted chunks to a temp file, then rename atomically.
1993        let parent = output.parent().unwrap_or_else(|| Path::new("."));
1994        let unique: u64 = rand::random();
1995        let tmp_path = parent.join(format!(".ant_download_{}_{unique}.tmp", std::process::id()));
1996
1997        let write_result = (|| -> Result<u64> {
1998            let mut file = std::fs::File::create(&tmp_path)?;
1999            let mut bytes_written = 0u64;
2000            for chunk_result in stream {
2001                let chunk_bytes = chunk_result
2002                    .map_err(|e| Error::Encryption(format!("decryption failed: {e}")))?;
2003                file.write_all(&chunk_bytes)?;
2004                bytes_written += chunk_bytes.len() as u64;
2005            }
2006            file.flush()?;
2007            Ok(bytes_written)
2008        })();
2009
2010        match write_result {
2011            Ok(bytes_written) => match std::fs::rename(&tmp_path, output) {
2012                Ok(()) => {
2013                    info!(
2014                        "File downloaded: {bytes_written} bytes written to {}",
2015                        output.display()
2016                    );
2017                    Ok(bytes_written)
2018                }
2019                Err(rename_err) => {
2020                    if let Err(cleanup_err) = std::fs::remove_file(&tmp_path) {
2021                        warn!(
2022                            "Failed to remove temp download file {}: {cleanup_err}",
2023                            tmp_path.display()
2024                        );
2025                    }
2026                    Err(rename_err.into())
2027                }
2028            },
2029            Err(e) => {
2030                if let Err(cleanup_err) = std::fs::remove_file(&tmp_path) {
2031                    warn!(
2032                        "Failed to remove temp download file {}: {cleanup_err}",
2033                        tmp_path.display()
2034                    );
2035                }
2036                Err(e)
2037            }
2038        }
2039    }
2040}
2041
2042#[cfg(test)]
2043#[allow(clippy::unwrap_used)]
2044mod tests {
2045    use super::*;
2046
2047    #[test]
2048    fn disk_space_check_passes_for_small_file() {
2049        // A 1 KB file should always pass the disk space check
2050        check_disk_space_for_spill(1024).unwrap();
2051    }
2052
2053    #[test]
2054    fn disk_space_check_fails_for_absurd_size() {
2055        // Requesting space for a 1 exabyte file should fail on any real system
2056        let result = check_disk_space_for_spill(u64::MAX / 2);
2057        assert!(result.is_err());
2058        let err = result.unwrap_err();
2059        assert!(
2060            matches!(err, Error::InsufficientDiskSpace(_)),
2061            "expected InsufficientDiskSpace, got: {err}"
2062        );
2063    }
2064
2065    #[test]
2066    fn adaptive_stream_decrypt_batch_size_tracks_fetch_headroom() {
2067        let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, Some(u64::MAX));
2068
2069        assert_eq!(batch_size, 64 * DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER);
2070    }
2071
2072    #[test]
2073    fn adaptive_stream_decrypt_batch_size_caps_to_total_chunks() {
2074        let batch_size = adaptive_stream_decrypt_batch_size(12, 64, 10, Some(u64::MAX));
2075
2076        assert_eq!(batch_size, 12);
2077    }
2078
2079    #[test]
2080    fn adaptive_stream_decrypt_batch_size_honours_configured_floor() {
2081        let batch_size = adaptive_stream_decrypt_batch_size(1_000, 1, 32, None);
2082
2083        assert_eq!(batch_size, 32);
2084    }
2085
2086    #[test]
2087    fn adaptive_stream_decrypt_batch_size_does_not_expand_without_memory_reading() {
2088        let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, None);
2089
2090        assert_eq!(batch_size, 10);
2091    }
2092
2093    #[test]
2094    fn adaptive_stream_decrypt_batch_size_caps_to_memory_budget() {
2095        let estimated_bytes_per_chunk = (self_encryption::MAX_CHUNK_SIZE as u64)
2096            .saturating_mul(DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER)
2097            .max(1);
2098        let usable_memory = estimated_bytes_per_chunk
2099            .saturating_mul(16)
2100            .saturating_mul(DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR);
2101        let batch_size = adaptive_stream_decrypt_batch_size(1_000, 256, 10, Some(usable_memory));
2102
2103        assert_eq!(batch_size, 16);
2104    }
2105
2106    #[test]
2107    fn adaptive_stream_decrypt_batch_size_keeps_one_chunk_when_memory_is_tight() {
2108        let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, Some(1));
2109
2110        assert_eq!(batch_size, 1);
2111    }
2112
2113    #[test]
2114    fn chunk_spill_round_trip() {
2115        let mut spill = ChunkSpill::new().unwrap();
2116        let data1 = vec![0xAA; 1024];
2117        let data2 = vec![0xBB; 2048];
2118
2119        spill.push(&data1).unwrap();
2120        spill.push(&data2).unwrap();
2121
2122        assert_eq!(spill.len(), 2);
2123        assert_eq!(spill.total_bytes(), 1024 + 2048);
2124        assert_eq!(spill.avg_chunk_size(), (1024 + 2048) / 2);
2125
2126        // Read back and verify
2127        let chunk1 = spill.read_chunk(spill.addresses.first().unwrap()).unwrap();
2128        assert_eq!(&chunk1[..], &data1[..]);
2129
2130        let chunk2 = spill.read_chunk(spill.addresses.get(1).unwrap()).unwrap();
2131        assert_eq!(&chunk2[..], &data2[..]);
2132
2133        // Verify waves with 1-chunk wave size
2134        let waves: Vec<_> = spill.addresses.chunks(1).collect();
2135        assert_eq!(waves.len(), 2);
2136    }
2137
2138    #[test]
2139    fn chunk_spill_cleanup_on_drop() {
2140        let dir;
2141        {
2142            let spill = ChunkSpill::new().unwrap();
2143            dir = spill.dir.clone();
2144            assert!(dir.exists());
2145        }
2146        // After drop, the directory should be cleaned up
2147        assert!(!dir.exists(), "spill dir should be removed on drop");
2148    }
2149
2150    #[test]
2151    fn chunk_spill_deduplicates_identical_content() {
2152        let mut spill = ChunkSpill::new().unwrap();
2153        let data = vec![0xCC; 512];
2154
2155        spill.push(&data).unwrap();
2156        spill.push(&data).unwrap(); // same content, should be skipped
2157        spill.push(&data).unwrap(); // again
2158
2159        assert_eq!(spill.len(), 1, "duplicate chunks should be deduplicated");
2160        assert_eq!(
2161            spill.total_bytes(),
2162            512,
2163            "total_bytes should count unique only"
2164        );
2165
2166        // Different content should still be added
2167        let data2 = vec![0xDD; 256];
2168        spill.push(&data2).unwrap();
2169        assert_eq!(spill.len(), 2);
2170        assert_eq!(spill.total_bytes(), 512 + 256);
2171    }
2172}
2173
2174/// Compile-time assertions that Client file method futures are Send.
2175#[cfg(test)]
2176mod send_assertions {
2177    use super::*;
2178
2179    fn _assert_send<T: Send>(_: &T) {}
2180
2181    #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
2182    async fn _file_upload_is_send(client: &Client) {
2183        let fut = client.file_upload(Path::new("/dev/null"));
2184        _assert_send(&fut);
2185    }
2186
2187    #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
2188    async fn _file_upload_with_mode_is_send(client: &Client) {
2189        let fut = client.file_upload_with_mode(Path::new("/dev/null"), PaymentMode::Auto);
2190        _assert_send(&fut);
2191    }
2192
2193    #[allow(
2194        dead_code,
2195        unreachable_code,
2196        unused_variables,
2197        clippy::diverging_sub_expression
2198    )]
2199    async fn _file_download_is_send(client: &Client) {
2200        let dm: DataMap = todo!();
2201        let fut = client.file_download(&dm, Path::new("/dev/null"));
2202        _assert_send(&fut);
2203    }
2204}