Skip to main content

ant_core/data/client/
file.rs

1//! File operations using streaming self-encryption.
2//!
3//! Upload files directly from disk without loading them entirely into memory.
4//! Uses `stream_encrypt` to process files in 8KB chunks, encrypting and
5//! uploading each piece as it's produced.
6//!
7//! Encrypted chunks are spilled to a temporary directory during encryption
8//! so that peak memory usage is bounded to one wave (~256 MB for 64 × 4 MB
9//! chunks) regardless of file size.
10//!
11//! For in-memory data uploads, see the `data` module.
12
13use crate::data::client::batch::{finalize_batch_payment, PaymentIntent, PreparedChunk};
14use crate::data::client::merkle::{
15    finalize_merkle_batch, should_use_merkle, MerkleBatchPaymentResult, PaymentMode,
16    PreparedMerkleBatch,
17};
18use crate::data::client::Client;
19use crate::data::error::{Error, Result};
20use ant_protocol::evm::{Amount, PaymentQuote, QuoteHash, TxHash, MAX_LEAVES};
21use ant_protocol::transport::{MultiAddr, PeerId};
22use ant_protocol::{compute_address, DATA_TYPE_CHUNK};
23use bytes::Bytes;
24use fs2::FileExt;
25use futures::stream::{self, StreamExt};
26use self_encryption::{get_root_data_map_parallel, stream_encrypt, streaming_decrypt, DataMap};
27use std::collections::{HashMap, HashSet};
28use std::io::Write;
29use std::path::{Path, PathBuf};
30use std::sync::{Arc, Mutex};
31use tokio::runtime::Handle;
32use tokio::sync::mpsc;
33use tracing::{debug, info, warn};
34use xor_name::XorName;
35
36/// Progress events emitted during file upload for UI feedback.
37#[derive(Debug, Clone)]
38pub enum UploadEvent {
39    /// A chunk has been encrypted and spilled to disk.
40    Encrypting { chunks_done: usize },
41    /// File encryption complete.
42    Encrypted { total_chunks: usize },
43    /// Starting quote collection for a wave.
44    QuotingChunks {
45        wave: usize,
46        total_waves: usize,
47        chunks_in_wave: usize,
48    },
49    /// A chunk has been quoted (peer discovery + price received).
50    /// This is the slow phase — each quote involves network round-trips.
51    ChunkQuoted { quoted: usize, total: usize },
52    /// A chunk has been stored on the network.
53    ChunkStored { stored: usize, total: usize },
54    /// A wave has completed.
55    WaveComplete {
56        wave: usize,
57        total_waves: usize,
58        stored_so_far: usize,
59        total: usize,
60    },
61}
62
63/// Progress events emitted during file download for UI feedback.
64#[derive(Debug, Clone)]
65pub enum DownloadEvent {
66    /// Resolving hierarchical DataMap to discover real chunk count.
67    ResolvingDataMap { total_map_chunks: usize },
68    /// A DataMap chunk has been fetched during resolution.
69    MapChunkFetched { fetched: usize },
70    /// DataMap resolved — total data chunk count now known.
71    DataMapResolved { total_chunks: usize },
72    /// Data chunks are being fetched from the network.
73    ChunksFetched { fetched: usize, total: usize },
74}
75
76/// One entry in the per-chunk quote list returned by
77/// [`Client::get_store_quotes`]: the responding peer, its addresses, the
78/// signed quote it returned, and the payment amount it is demanding.
79type QuoteEntry = (PeerId, Vec<MultiAddr>, PaymentQuote, Amount);
80
81/// Number of chunks per upload wave (matches batch.rs PAYMENT_WAVE_SIZE).
82const UPLOAD_WAVE_SIZE: usize = 64;
83
84/// Maximum number of distinct chunk addresses to sample when probing for a
85/// representative quote in [`Client::estimate_upload_cost`].
86///
87/// Bounded small so we never spend more than a couple of round-trips on the
88/// `AlreadyStored` retry path, which only matters when many leading chunks
89/// of a file already live on the network.
90const ESTIMATE_SAMPLE_CAP: usize = 5;
91
92/// Gas used by one `pay_for_quotes` transaction that packs up to
93/// `UPLOAD_WAVE_SIZE` (quote_hash, rewards_address, amount) entries.
94///
95/// `batch_pay` in `batch.rs` flattens every chunk's close-group quotes into a
96/// single EVM call, so the dominant cost is the SSTOREs for each entry plus
97/// the base tx overhead. On Arbitrum that is roughly
98/// `21_000 + 64 × (20_000 + small)` ≈ 1.3M; we round up to 1.5M as a
99/// conservative per-wave upper bound.
100const GAS_PER_WAVE_TX: u128 = 1_500_000;
101
102/// Gas used by one merkle batch payment transaction.
103///
104/// One on-chain tx per merkle sub-batch, but each tx verifies a merkle tree
105/// and posts a pool commitment, so budget higher than a plain transfer.
106const GAS_PER_MERKLE_TX: u128 = 500_000;
107
108/// Advisory gas price (wei/gas) used to turn the gas estimate into an ETH
109/// figure when no live gas oracle is consulted.
110///
111/// Arbitrum One typically settles around 0.1 gwei on quiet blocks; we use
112/// that as the default so the CLI prints a sensible order-of-magnitude
113/// number. Users should treat the reported gas cost as an estimate, not a
114/// commitment — real gas is bid at submission time.
115const ARBITRUM_GAS_PRICE_WEI: u128 = 100_000_000;
116
117/// Extra headroom percentage for disk space check.
118///
119/// Encrypted chunks are slightly larger than the source data due to padding
120/// and self-encryption overhead. We require file_size + 10% free space in
121/// the temp directory to account for this.
122const DISK_SPACE_HEADROOM_PERCENT: u64 = 10;
123
124/// Temporary on-disk buffer for encrypted chunks.
125///
126/// During file encryption, chunks are written to a temp directory so that
127/// only their 32-byte addresses stay in memory. At upload time chunks are
128/// read back one wave at a time, keeping peak RAM at ~`UPLOAD_WAVE_SIZE × 4 MB`.
129/// Maximum age (in seconds) for orphaned spill directories.
130/// Dirs older than this are cleaned up if they have no active lockfile.
131const SPILL_MAX_AGE_SECS: u64 = 24 * 60 * 60; // 24 hours
132
133/// Prefix for spill directory names to distinguish from user files.
134const SPILL_DIR_PREFIX: &str = "spill_";
135
136/// Lockfile name inside each spill dir to signal active use.
137const SPILL_LOCK_NAME: &str = ".lock";
138
139struct ChunkSpill {
140    /// Directory holding spilled chunk files (named by hex address).
141    dir: PathBuf,
142    /// Lockfile held for the lifetime of this spill (prevents stale cleanup).
143    _lock: std::fs::File,
144    /// Deduplicated list of chunk addresses.
145    addresses: Vec<[u8; 32]>,
146    /// Tracks seen addresses for deduplication.
147    seen: HashSet<[u8; 32]>,
148    /// Running total of unique chunk byte sizes (for average-size calculation).
149    total_bytes: u64,
150}
151
152impl ChunkSpill {
153    /// Return the parent directory for all spill dirs: `<data_dir>/spill/`.
154    fn spill_root() -> Result<PathBuf> {
155        use crate::config;
156        let root = config::data_dir()
157            .map_err(|e| Error::Config(format!("cannot determine data dir for spill: {e}")))?
158            .join("spill");
159        Ok(root)
160    }
161
162    /// Create a new spill directory under `<data_dir>/spill/`.
163    ///
164    /// Directory name is `spill_<timestamp>_<random>` so orphans can be
165    /// identified by prefix and cleaned up by age. A lockfile inside the
166    /// dir prevents concurrent cleanup from deleting an active spill.
167    fn new() -> Result<Self> {
168        let root = Self::spill_root()?;
169        std::fs::create_dir_all(&root)?;
170
171        // Clean up stale spill dirs from previous crashed runs.
172        Self::cleanup_stale(&root);
173
174        let now = std::time::SystemTime::now()
175            .duration_since(std::time::UNIX_EPOCH)
176            .unwrap_or_default()
177            .as_secs();
178        let unique: u64 = rand::random();
179        let dir = root.join(format!("{SPILL_DIR_PREFIX}{now}_{unique}"));
180        std::fs::create_dir(&dir)?;
181
182        // Create and hold a lockfile for the lifetime of this spill.
183        // cleanup_stale() will skip dirs with locked files.
184        let lock_path = dir.join(SPILL_LOCK_NAME);
185        let lock_file = std::fs::File::create(&lock_path).map_err(|e| {
186            Error::Io(std::io::Error::new(
187                e.kind(),
188                format!("failed to create spill lockfile: {e}"),
189            ))
190        })?;
191        lock_file.try_lock_exclusive().map_err(|e| {
192            Error::Io(std::io::Error::new(
193                e.kind(),
194                format!("failed to lock spill lockfile: {e}"),
195            ))
196        })?;
197
198        Ok(Self {
199            dir,
200            _lock: lock_file,
201            addresses: Vec::new(),
202            seen: HashSet::new(),
203            total_bytes: 0,
204        })
205    }
206
207    /// Clean up stale spill directories. Best-effort, errors are logged.
208    ///
209    /// Only removes directories that:
210    /// 1. Start with `SPILL_DIR_PREFIX` (ignores unrelated files)
211    /// 2. Are actual directories (not symlinks -- prevents symlink attacks)
212    /// 3. Have a timestamp older than `SPILL_MAX_AGE_SECS`
213    /// 4. Do NOT have an active lockfile (prevents deleting in-progress uploads)
214    ///
215    /// Safe to call concurrently from multiple processes.
216    fn cleanup_stale(root: &Path) {
217        let now = std::time::SystemTime::now()
218            .duration_since(std::time::UNIX_EPOCH)
219            .unwrap_or_default()
220            .as_secs();
221
222        if now == 0 {
223            // Clock is broken (before Unix epoch). Skip cleanup to avoid
224            // misidentifying dirs as stale.
225            warn!("System clock before Unix epoch, skipping spill cleanup");
226            return;
227        }
228
229        let entries = match std::fs::read_dir(root) {
230            Ok(entries) => entries,
231            Err(_) => return,
232        };
233
234        for entry in entries.flatten() {
235            let name = entry.file_name();
236            let name_str = name.to_string_lossy();
237
238            // Only process dirs with our prefix.
239            let suffix = match name_str.strip_prefix(SPILL_DIR_PREFIX) {
240                Some(s) => s,
241                None => continue,
242            };
243
244            // Parse timestamp: "spill_<timestamp>_<random>"
245            let timestamp: u64 = match suffix.split('_').next().and_then(|s| s.parse().ok()) {
246                Some(ts) => ts,
247                None => continue,
248            };
249
250            if now.saturating_sub(timestamp) <= SPILL_MAX_AGE_SECS {
251                continue;
252            }
253
254            // Safety: only delete actual directories, not symlinks.
255            let file_type = match entry.file_type() {
256                Ok(ft) => ft,
257                Err(_) => continue,
258            };
259            if !file_type.is_dir() {
260                continue;
261            }
262
263            let path = entry.path();
264
265            // Check lockfile: if locked, the dir is in active use -- skip it.
266            let lock_path = path.join(SPILL_LOCK_NAME);
267            if let Ok(lock_file) = std::fs::File::open(&lock_path) {
268                use fs2::FileExt;
269                if lock_file.try_lock_exclusive().is_err() {
270                    // Lock held by another process -- dir is active.
271                    debug!("Skipping active spill dir: {}", path.display());
272                    continue;
273                }
274                // We acquired the lock, so no one else holds it.
275                // Drop it before deleting.
276                drop(lock_file);
277            }
278
279            info!("Cleaning up stale spill dir: {}", path.display());
280            if let Err(e) = std::fs::remove_dir_all(&path) {
281                warn!("Failed to clean up stale spill dir {}: {e}", path.display());
282            }
283        }
284    }
285
286    /// Run stale spill cleanup. Call at client startup or periodically.
287    #[allow(dead_code)]
288    pub(crate) fn run_cleanup() {
289        if let Ok(root) = Self::spill_root() {
290            Self::cleanup_stale(&root);
291        }
292    }
293
294    /// Write one encrypted chunk to disk and record its address.
295    ///
296    /// Deduplicates by content address: if the same chunk was already
297    /// spilled, the write and accounting are skipped. This prevents
298    /// double-uploads and inflated quoting metrics.
299    fn push(&mut self, content: &[u8]) -> Result<()> {
300        let address = compute_address(content);
301        if !self.seen.insert(address) {
302            return Ok(());
303        }
304        let path = self.dir.join(hex::encode(address));
305        std::fs::write(&path, content)?;
306        self.total_bytes += content.len() as u64;
307        self.addresses.push(address);
308        Ok(())
309    }
310
311    /// Number of chunks stored.
312    fn len(&self) -> usize {
313        self.addresses.len()
314    }
315
316    /// Total bytes of all spilled chunks.
317    fn total_bytes(&self) -> u64 {
318        self.total_bytes
319    }
320
321    /// Average chunk size in bytes (for quoting metrics).
322    fn avg_chunk_size(&self) -> u64 {
323        if self.addresses.is_empty() {
324            return 0;
325        }
326        self.total_bytes / self.addresses.len() as u64
327    }
328
329    /// Read a single chunk back from disk by address.
330    fn read_chunk(&self, address: &[u8; 32]) -> Result<Bytes> {
331        let path = self.dir.join(hex::encode(address));
332        let data = std::fs::read(&path).map_err(|e| {
333            Error::Io(std::io::Error::new(
334                e.kind(),
335                format!("reading spilled chunk {}: {e}", hex::encode(address)),
336            ))
337        })?;
338        Ok(Bytes::from(data))
339    }
340
341    /// Iterate over address slices in wave-sized groups.
342    fn waves(&self) -> std::slice::Chunks<'_, [u8; 32]> {
343        self.addresses.chunks(UPLOAD_WAVE_SIZE)
344    }
345
346    /// Read a wave of chunks from disk.
347    fn read_wave(&self, wave_addrs: &[[u8; 32]]) -> Result<Vec<(Bytes, [u8; 32])>> {
348        let mut out = Vec::with_capacity(wave_addrs.len());
349        for addr in wave_addrs {
350            let content = self.read_chunk(addr)?;
351            out.push((content, *addr));
352        }
353        Ok(out)
354    }
355
356    /// Clean up the spill directory.
357    fn cleanup(&self) {
358        if let Err(e) = std::fs::remove_dir_all(&self.dir) {
359            warn!(
360                "Failed to clean up chunk spill dir {}: {e}",
361                self.dir.display()
362            );
363        }
364    }
365}
366
367impl Drop for ChunkSpill {
368    fn drop(&mut self) {
369        self.cleanup();
370    }
371}
372
373/// Check that the spill directory has enough free space for the spilled chunks.
374///
375/// `file_size` is the source file's byte count. We require
376/// `file_size + 10%` free space to account for self-encryption overhead.
377fn check_disk_space_for_spill(file_size: u64) -> Result<()> {
378    let spill_root = ChunkSpill::spill_root()?;
379
380    // Ensure the root exists so fs2 can query it.
381    std::fs::create_dir_all(&spill_root)?;
382
383    let available = fs2::available_space(&spill_root).map_err(|e| {
384        Error::Io(std::io::Error::new(
385            e.kind(),
386            format!(
387                "failed to query disk space on {}: {e}",
388                spill_root.display()
389            ),
390        ))
391    })?;
392
393    // Use integer arithmetic to avoid f64 precision loss on large file sizes.
394    let headroom = file_size / DISK_SPACE_HEADROOM_PERCENT;
395    let required = file_size.saturating_add(headroom);
396
397    if available < required {
398        let avail_mb = available / (1024 * 1024);
399        let req_mb = required / (1024 * 1024);
400        return Err(Error::InsufficientDiskSpace(format!(
401            "need ~{req_mb} MB in spill dir ({}) but only {avail_mb} MB available",
402            spill_root.display()
403        )));
404    }
405
406    debug!(
407        "Disk space check passed: {available} bytes available, {required} bytes required (spill: {})",
408        spill_root.display()
409    );
410    Ok(())
411}
412
413/// Whether the data map is published to the network for address-based retrieval.
414///
415/// A private upload stores only the data chunks and returns the `DataMap` to
416/// the caller — only someone holding that `DataMap` can reconstruct the file.
417/// A public upload additionally stores the serialized `DataMap` as a chunk on
418/// the network, yielding a single chunk address that anyone can use to
419/// retrieve the `DataMap` (via [`Client::data_map_fetch`]) and then the file.
420#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
421pub enum Visibility {
422    /// Keep the data map local; only the holder can retrieve the file.
423    #[default]
424    Private,
425    /// Publish the data map as a network chunk so anyone with the returned
426    /// address can retrieve and decrypt the file.
427    Public,
428}
429
430/// Estimated cost of uploading a file, returned by
431/// [`Client::estimate_upload_cost`].
432#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
433pub struct UploadCostEstimate {
434    /// Original file size in bytes.
435    pub file_size: u64,
436    /// Number of chunks the file would be split into (data chunks only,
437    /// does not include the DataMap chunk added during public uploads).
438    pub chunk_count: usize,
439    /// Estimated total storage cost in atto (token smallest unit).
440    pub storage_cost_atto: String,
441    /// Estimated gas cost in wei as a string. This is a rough heuristic
442    /// based on chunk count and payment mode, NOT a live gas price query.
443    pub estimated_gas_cost_wei: String,
444    /// Payment mode that would be used.
445    pub payment_mode: PaymentMode,
446}
447
448/// Result of a file upload: the `DataMap` needed to retrieve the file.
449///
450/// Marked `#[non_exhaustive]` so adding a new field in future is not a
451/// breaking change for downstream consumers that construct or pattern-match
452/// on this struct.
453#[derive(Debug, Clone)]
454#[non_exhaustive]
455pub struct FileUploadResult {
456    /// The data map containing chunk metadata for reconstruction.
457    pub data_map: DataMap,
458    /// Number of chunks stored on the network.
459    pub chunks_stored: usize,
460    /// Number of chunks that failed to store. Always 0 for a successful
461    /// upload — partial-failure information is conveyed via
462    /// [`crate::data::Error::PartialUpload`] instead.
463    pub chunks_failed: usize,
464    /// Total number of chunks the upload attempted to store. On full
465    /// success this equals `chunks_stored`.
466    pub total_chunks: usize,
467    /// Which payment mode was actually used (not just requested).
468    pub payment_mode_used: PaymentMode,
469    /// Total storage cost paid in token units (atto). "0" if all chunks already existed.
470    pub storage_cost_atto: String,
471    /// Total gas cost in wei. 0 if no on-chain transactions were made.
472    pub gas_cost_wei: u128,
473    /// Chunk address of the serialized `DataMap`, set only for
474    /// [`Visibility::Public`] uploads. **`Some` means this address is
475    /// retrievable from the network (via [`Client::data_map_fetch`])**, not
476    /// necessarily that *this* upload paid to store it — if the serialized
477    /// `DataMap` hashed to a chunk that was already on the network (same
478    /// file uploaded before; deterministic via self-encryption), the address
479    /// is still returned but no storage payment was made for it.
480    pub data_map_address: Option<[u8; 32]>,
481}
482
483/// Payment information for external signing — either wave-batch or merkle.
484#[derive(Debug)]
485pub enum ExternalPaymentInfo {
486    /// Wave-batch: individual (quote_hash, rewards_address, amount) tuples.
487    WaveBatch {
488        /// Chunks ready for payment (needed for finalize).
489        prepared_chunks: Vec<PreparedChunk>,
490        /// Payment intent for external signing.
491        payment_intent: PaymentIntent,
492    },
493    /// Merkle: single on-chain call with depth, pool commitments, timestamp.
494    Merkle {
495        /// The prepared merkle batch (public fields sent to frontend, private fields stay in Rust).
496        prepared_batch: PreparedMerkleBatch,
497        /// Raw chunk contents (needed for upload after payment).
498        chunk_contents: Vec<Bytes>,
499        /// Chunk addresses in order (needed for upload after payment).
500        chunk_addresses: Vec<[u8; 32]>,
501    },
502}
503
504/// Prepared upload ready for external payment.
505///
506/// Contains everything needed to construct the on-chain payment transaction
507/// externally (e.g. via WalletConnect in a desktop app) and then finalize
508/// the upload without a Rust-side wallet.
509///
510/// Note: This struct stays in Rust memory — only the public fields of
511/// `payment_info` are sent to the frontend. `PreparedChunk` contains
512/// non-serializable network types, so the full struct cannot derive `Serialize`.
513///
514/// Marked `#[non_exhaustive]` so adding a new field in future is not a
515/// breaking change for downstream consumers.
516#[derive(Debug)]
517#[non_exhaustive]
518pub struct PreparedUpload {
519    /// The data map for later retrieval.
520    pub data_map: DataMap,
521    /// Payment information — either wave-batch or merkle depending on chunk count.
522    pub payment_info: ExternalPaymentInfo,
523    /// Chunk address of the serialized `DataMap` when this upload was
524    /// prepared with [`Visibility::Public`]. `Some` means the address is
525    /// retrievable on the network after finalization — either because this
526    /// upload paid to store the chunk in `payment_info`, or because the
527    /// chunk was already on the network (deterministic self-encryption).
528    /// Carried through to [`FileUploadResult::data_map_address`].
529    pub data_map_address: Option<[u8; 32]>,
530}
531
532/// Return type for [`spawn_file_encryption`]: chunk receiver, `DataMap` oneshot, join handle.
533type EncryptionChannels = (
534    tokio::sync::mpsc::Receiver<Bytes>,
535    tokio::sync::oneshot::Receiver<DataMap>,
536    tokio::task::JoinHandle<Result<()>>,
537);
538
539/// Spawn a blocking task that streams file encryption through a channel.
540fn spawn_file_encryption(path: PathBuf) -> Result<EncryptionChannels> {
541    let metadata = std::fs::metadata(&path)?;
542    let data_size = usize::try_from(metadata.len())
543        .map_err(|e| Error::Encryption(format!("file size exceeds platform usize: {e}")))?;
544
545    let (chunk_tx, chunk_rx) = tokio::sync::mpsc::channel(2);
546    let (datamap_tx, datamap_rx) = tokio::sync::oneshot::channel();
547
548    let handle = tokio::task::spawn_blocking(move || {
549        let file = std::fs::File::open(&path)?;
550        let mut reader = std::io::BufReader::new(file);
551
552        let read_error: Arc<Mutex<Option<std::io::Error>>> = Arc::new(Mutex::new(None));
553        let read_error_clone = Arc::clone(&read_error);
554
555        let data_iter = std::iter::from_fn(move || {
556            let mut buffer = vec![0u8; 8192];
557            match std::io::Read::read(&mut reader, &mut buffer) {
558                Ok(0) => None,
559                Ok(n) => {
560                    buffer.truncate(n);
561                    Some(Bytes::from(buffer))
562                }
563                Err(e) => {
564                    let mut guard = read_error_clone
565                        .lock()
566                        .unwrap_or_else(|poisoned| poisoned.into_inner());
567                    *guard = Some(e);
568                    None
569                }
570            }
571        });
572
573        let mut stream = stream_encrypt(data_size, data_iter)
574            .map_err(|e| Error::Encryption(format!("stream_encrypt failed: {e}")))?;
575
576        for chunk_result in stream.chunks() {
577            // Check for captured read errors immediately after each chunk.
578            // stream_encrypt sees None (EOF) when a read fails, so it stops
579            // producing chunks. We must detect this before sending the
580            // partial results to avoid uploading a truncated DataMap.
581            {
582                let guard = read_error
583                    .lock()
584                    .unwrap_or_else(|poisoned| poisoned.into_inner());
585                if let Some(ref e) = *guard {
586                    return Err(Error::Io(std::io::Error::new(e.kind(), e.to_string())));
587                }
588            }
589
590            let (_hash, content) = chunk_result
591                .map_err(|e| Error::Encryption(format!("chunk encryption failed: {e}")))?;
592            if chunk_tx.blocking_send(content).is_err() {
593                return Err(Error::Encryption("upload receiver dropped".to_string()));
594            }
595        }
596
597        // Final check: read error after last chunk (stream saw EOF).
598        {
599            let guard = read_error
600                .lock()
601                .unwrap_or_else(|poisoned| poisoned.into_inner());
602            if let Some(ref e) = *guard {
603                return Err(Error::Io(std::io::Error::new(e.kind(), e.to_string())));
604            }
605        }
606
607        let datamap = stream
608            .into_datamap()
609            .ok_or_else(|| Error::Encryption("no DataMap after encryption".to_string()))?;
610        if datamap_tx.send(datamap).is_err() {
611            warn!("DataMap receiver dropped — upload may have been cancelled");
612        }
613        Ok(())
614    });
615
616    Ok((chunk_rx, datamap_rx, handle))
617}
618
619impl Client {
620    /// Upload a file to the network using streaming self-encryption.
621    ///
622    /// Automatically selects merkle batch payment for files that produce
623    /// 64+ chunks (saves gas). Encrypted chunks are spilled to a temp
624    /// directory so peak memory stays at ~256 MB regardless of file size.
625    ///
626    /// # Errors
627    ///
628    /// Returns an error if the file cannot be read, encryption fails,
629    /// or any chunk cannot be stored.
630    pub async fn file_upload(&self, path: &Path) -> Result<FileUploadResult> {
631        self.file_upload_with_mode(path, PaymentMode::Auto).await
632    }
633
634    /// Estimate the cost of uploading a file without actually uploading.
635    ///
636    /// Encrypts the file to determine chunk count and sizes, then requests
637    /// a single quote from the network for a representative chunk. The
638    /// per-chunk price is extrapolated to the total chunk count.
639    ///
640    /// The estimate is fast (~2-5s) and does not require a wallet. Spilled
641    /// chunks are cleaned up automatically when the function returns.
642    ///
643    /// Gas cost is an advisory heuristic, not a live gas-oracle query. It is
644    /// derived from realistic per-transaction budgets (`GAS_PER_WAVE_TX`,
645    /// `GAS_PER_MERKLE_TX`) priced at `ARBITRUM_GAS_PRICE_WEI`. Real gas
646    /// varies with network conditions.
647    ///
648    /// If the first sampled chunk is already stored on the network, the
649    /// function retries with subsequent chunk addresses (up to
650    /// `ESTIMATE_SAMPLE_CAP`). If every sampled address reports stored,
651    /// a [`Error::CostEstimationInconclusive`] is returned so callers can
652    /// decide how to react rather than trust a bogus "free" estimate. Only
653    /// when every address in the file is stored do we return a zero-cost
654    /// estimate.
655    ///
656    /// # Errors
657    ///
658    /// Returns an error if the file cannot be read, encryption fails,
659    /// the network cannot provide a quote, or every sampled chunk is
660    /// already stored ([`Error::CostEstimationInconclusive`]).
661    pub async fn estimate_upload_cost(
662        &self,
663        path: &Path,
664        mode: PaymentMode,
665        progress: Option<mpsc::Sender<UploadEvent>>,
666    ) -> Result<UploadCostEstimate> {
667        let file_size = std::fs::metadata(path).map_err(Error::Io)?.len();
668
669        if file_size < 3 {
670            return Err(Error::InvalidData(
671                "File too small: self-encryption requires at least 3 bytes".into(),
672            ));
673        }
674
675        check_disk_space_for_spill(file_size)?;
676
677        info!(
678            "Estimating upload cost for {} ({file_size} bytes)",
679            path.display()
680        );
681
682        let (spill, _data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
683        let chunk_count = spill.len();
684
685        if let Some(ref tx) = progress {
686            let _ = tx
687                .send(UploadEvent::Encrypted {
688                    total_chunks: chunk_count,
689                })
690                .await;
691        }
692
693        info!("Encrypted into {chunk_count} chunks, requesting quote");
694
695        // Sample up to ESTIMATE_SAMPLE_CAP distinct chunk addresses. A single
696        // AlreadyStored result says nothing about the rest of the file — the
697        // first chunk is often a DataMap-adjacent chunk that collides with
698        // prior uploads even when 99% of the file is new. Only treat the
699        // whole file as "fully stored" when every sample comes back stored.
700        let sample_limit = spill.addresses.len().min(ESTIMATE_SAMPLE_CAP);
701        let mut sampled = 0usize;
702        let mut all_already_stored = true;
703        let mut quotes_opt: Option<Vec<QuoteEntry>> = None;
704
705        for addr in spill.addresses.iter().take(sample_limit) {
706            sampled += 1;
707            let chunk_bytes = spill.read_chunk(addr)?;
708            let data_size = u64::try_from(chunk_bytes.len())
709                .map_err(|e| Error::InvalidData(format!("chunk size too large: {e}")))?;
710            match self
711                .get_store_quotes(addr, data_size, DATA_TYPE_CHUNK)
712                .await
713            {
714                Ok(q) => {
715                    quotes_opt = Some(q);
716                    all_already_stored = false;
717                    break;
718                }
719                Err(Error::AlreadyStored) => {
720                    debug!(
721                        "Sample chunk {} already stored; trying next address ({sampled}/{sample_limit})",
722                        hex::encode(addr)
723                    );
724                    continue;
725                }
726                Err(e) => return Err(e),
727            }
728        }
729
730        let uses_merkle = should_use_merkle(chunk_count, mode);
731
732        let quotes = match quotes_opt {
733            Some(q) => q,
734            None if all_already_stored && sampled == chunk_count => {
735                // Every address in the file was sampled and every one is
736                // already on the network — returning a zero-cost estimate is
737                // accurate in this case.
738                info!("All {chunk_count} chunks already stored; returning zero-cost estimate");
739                return Ok(UploadCostEstimate {
740                    file_size,
741                    chunk_count,
742                    storage_cost_atto: "0".into(),
743                    estimated_gas_cost_wei: "0".into(),
744                    payment_mode: if uses_merkle {
745                        PaymentMode::Merkle
746                    } else {
747                        PaymentMode::Single
748                    },
749                });
750            }
751            None => {
752                return Err(Error::CostEstimationInconclusive(format!(
753                    "sampled {sampled} chunk addresses out of {chunk_count} and every \
754                     one reported AlreadyStored; cannot infer a representative price \
755                     for the remaining chunks"
756                )));
757            }
758        };
759
760        // Use the median price × 3 (matches SingleNodePayment::from_quotes
761        // which pays 3x the median to incentivize reliable storage).
762        let mut prices: Vec<Amount> = quotes.iter().map(|(_, _, _, price)| *price).collect();
763        prices.sort();
764        let median_price = prices
765            .get(prices.len() / 2)
766            .copied()
767            .unwrap_or(Amount::ZERO);
768        let per_chunk_cost = median_price * Amount::from(3u64);
769
770        let chunk_count_u64 = u64::try_from(chunk_count).unwrap_or(u64::MAX);
771        let total_storage = per_chunk_cost * Amount::from(chunk_count_u64);
772
773        // Estimate gas cost from realistic per-transaction budgets rather
774        // than a flat per-chunk or per-wave number.
775        //
776        // - Single mode: `batch_pay` packs up to UPLOAD_WAVE_SIZE chunks'
777        //   close-group quotes into one `pay_for_quotes` call on Arbitrum.
778        //   The dominant cost is one SSTORE per entry plus base tx overhead,
779        //   so we use GAS_PER_WAVE_TX (≈1.5M) as a conservative upper bound
780        //   on a full wave and multiply by the number of waves. The previous
781        //   per-wave figure of 150k was closer to a single-entry transfer
782        //   and understated cost by 5–10x for full waves.
783        // - Merkle mode: one tx per sub-batch that verifies a merkle tree
784        //   and posts a pool commitment (GAS_PER_MERKLE_TX ≈ 500k each).
785        //
786        // Gas is priced at ARBITRUM_GAS_PRICE_WEI (~0.1 gwei, a typical
787        // Arbitrum baseline). Treat the result as advisory, not a commitment.
788        let waves = u128::try_from(chunk_count.div_ceil(UPLOAD_WAVE_SIZE)).unwrap_or(u128::MAX);
789        let merkle_batches = u128::try_from(chunk_count.div_ceil(MAX_LEAVES)).unwrap_or(u128::MAX);
790        let estimated_gas: u128 = if uses_merkle {
791            merkle_batches
792                .saturating_mul(GAS_PER_MERKLE_TX)
793                .saturating_mul(ARBITRUM_GAS_PRICE_WEI)
794        } else {
795            waves
796                .saturating_mul(GAS_PER_WAVE_TX)
797                .saturating_mul(ARBITRUM_GAS_PRICE_WEI)
798        };
799
800        info!(
801            "Estimate: {chunk_count} chunks, storage={total_storage} atto, gas~={estimated_gas} wei"
802        );
803
804        Ok(UploadCostEstimate {
805            file_size,
806            chunk_count,
807            storage_cost_atto: total_storage.to_string(),
808            estimated_gas_cost_wei: estimated_gas.to_string(),
809            payment_mode: if uses_merkle {
810                PaymentMode::Merkle
811            } else {
812                PaymentMode::Single
813            },
814        })
815    }
816
817    /// Phase 1 of external-signer upload: encrypt file and prepare chunks.
818    ///
819    /// Equivalent to [`Client::file_prepare_upload_with_visibility`] with
820    /// [`Visibility::Private`] — see that method for details.
821    pub async fn file_prepare_upload(&self, path: &Path) -> Result<PreparedUpload> {
822        self.file_prepare_upload_with_visibility(path, Visibility::Private)
823            .await
824    }
825
826    /// Phase 1 of external-signer upload with explicit [`Visibility`] control.
827    ///
828    /// Requires an EVM network (for contract price queries) but NOT a wallet.
829    /// Returns a [`PreparedUpload`] containing the data map, prepared chunks,
830    /// and a [`PaymentIntent`] that the external signer uses to construct
831    /// and submit the on-chain payment transaction.
832    ///
833    /// When `visibility` is [`Visibility::Public`], the serialized `DataMap`
834    /// is bundled into the payment batch as an additional chunk and its
835    /// address is recorded on the returned [`PreparedUpload`]. After
836    /// [`Client::finalize_upload`] (or `_merkle`) succeeds, that address is
837    /// surfaced via [`FileUploadResult::data_map_address`] so the uploader
838    /// can share a single address from which anyone can retrieve the file.
839    ///
840    /// **Memory note:** Encryption uses disk spilling for bounded memory, but
841    /// the returned [`PreparedUpload`] holds all chunk content in memory (each
842    /// [`PreparedChunk`] contains a `Bytes` with the full chunk data). This is
843    /// inherent to the two-phase external-signer protocol — the chunks must
844    /// stay in memory until [`Client::finalize_upload`] stores them. For very
845    /// large files, prefer [`Client::file_upload`] which streams directly.
846    ///
847    /// # Errors
848    ///
849    /// Returns an error if there is insufficient disk space, the file cannot
850    /// be read, encryption fails, or quote collection fails.
851    pub async fn file_prepare_upload_with_visibility(
852        &self,
853        path: &Path,
854        visibility: Visibility,
855    ) -> Result<PreparedUpload> {
856        debug!(
857            "Preparing file upload for external signing (visibility={visibility:?}): {}",
858            path.display()
859        );
860
861        let file_size = std::fs::metadata(path)?.len();
862        check_disk_space_for_spill(file_size)?;
863
864        let (spill, data_map) = self.encrypt_file_to_spill(path, None).await?;
865
866        info!(
867            "Encrypted {} into {} chunks for external signing (spilled to disk)",
868            path.display(),
869            spill.len()
870        );
871
872        // Read each chunk from disk and collect quotes concurrently.
873        // Note: all PreparedChunks accumulate in memory because the external-signer
874        // protocol requires them for finalize_upload. NOT memory-bounded for large files.
875        let mut chunk_data: Vec<Bytes> = spill
876            .addresses
877            .iter()
878            .map(|addr| spill.read_chunk(addr))
879            .collect::<std::result::Result<Vec<_>, _>>()?;
880
881        // For public uploads, bundle the serialized DataMap as an extra chunk
882        // in the same payment batch. This lets the external signer pay for
883        // the data chunks and the DataMap chunk in one flow, and lets the
884        // finalize step return the DataMap's chunk address as the shareable
885        // retrieval address.
886        let data_map_address = match visibility {
887            Visibility::Private => None,
888            Visibility::Public => {
889                let serialized = rmp_serde::to_vec(&data_map).map_err(|e| {
890                    Error::Serialization(format!("Failed to serialize DataMap: {e}"))
891                })?;
892                let bytes = Bytes::from(serialized);
893                let address = compute_address(&bytes);
894                info!(
895                    "Public upload: bundling DataMap chunk ({} bytes) at address {}",
896                    bytes.len(),
897                    hex::encode(address)
898                );
899                chunk_data.push(bytes);
900                Some(address)
901            }
902        };
903
904        let chunk_count = chunk_data.len();
905
906        let payment_info = if should_use_merkle(chunk_count, PaymentMode::Auto) {
907            // Merkle path: build tree, collect candidate pools, return for external payment.
908            info!("Using merkle batch preparation for {chunk_count} file chunks");
909
910            let addresses: Vec<[u8; 32]> = chunk_data.iter().map(|c| compute_address(c)).collect();
911
912            let avg_size =
913                chunk_data.iter().map(bytes::Bytes::len).sum::<usize>() / chunk_count.max(1);
914            let avg_size_u64 = u64::try_from(avg_size).unwrap_or(0);
915
916            let prepared_batch = self
917                .prepare_merkle_batch_external(&addresses, DATA_TYPE_CHUNK, avg_size_u64)
918                .await?;
919
920            info!(
921                "File prepared for external merkle signing: {} chunks, depth={} ({})",
922                chunk_count,
923                prepared_batch.depth,
924                path.display()
925            );
926
927            ExternalPaymentInfo::Merkle {
928                prepared_batch,
929                chunk_contents: chunk_data,
930                chunk_addresses: addresses,
931            }
932        } else {
933            // Wave-batch path: collect quotes per chunk concurrently.
934            let quote_concurrency = self.config().quote_concurrency;
935            let results: Vec<Result<Option<PreparedChunk>>> = stream::iter(chunk_data)
936                .map(|content| async move { self.prepare_chunk_payment(content).await })
937                .buffer_unordered(quote_concurrency)
938                .collect()
939                .await;
940
941            let mut prepared_chunks = Vec::with_capacity(spill.len());
942            for result in results {
943                if let Some(prepared) = result? {
944                    prepared_chunks.push(prepared);
945                }
946            }
947
948            // Surface the "DataMap chunk was already on the network" case
949            // so debugging "why is data_map_address set but no storage cost
950            // appears for it?" doesn't require reading the source. See the
951            // `data_map_address` doc comment for why this is still a valid
952            // `Some(addr)` outcome.
953            if let Some(addr) = data_map_address {
954                if !prepared_chunks.iter().any(|c| c.address == addr) {
955                    info!(
956                        "Public upload: DataMap chunk {} was already stored \
957                         on the network — address is retrievable without a \
958                         new payment",
959                        hex::encode(addr)
960                    );
961                }
962            }
963
964            let payment_intent = PaymentIntent::from_prepared_chunks(&prepared_chunks);
965
966            info!(
967                "File prepared for external signing: {} chunks, total {} atto ({})",
968                prepared_chunks.len(),
969                payment_intent.total_amount,
970                path.display()
971            );
972
973            ExternalPaymentInfo::WaveBatch {
974                prepared_chunks,
975                payment_intent,
976            }
977        };
978
979        Ok(PreparedUpload {
980            data_map,
981            payment_info,
982            data_map_address,
983        })
984    }
985
986    /// Phase 2 of external-signer upload (wave-batch): finalize with externally-signed tx hashes.
987    ///
988    /// Takes a [`PreparedUpload`] that used wave-batch payment and a map
989    /// of `quote_hash -> tx_hash` provided by the external signer after on-chain
990    /// payment. Builds payment proofs and stores chunks on the network.
991    ///
992    /// # Errors
993    ///
994    /// Returns an error if the prepared upload used merkle payment (use
995    /// [`Client::finalize_upload_merkle`] instead), proof construction fails,
996    /// or any chunk cannot be stored.
997    pub async fn finalize_upload(
998        &self,
999        prepared: PreparedUpload,
1000        tx_hash_map: &HashMap<QuoteHash, TxHash>,
1001    ) -> Result<FileUploadResult> {
1002        let data_map_address = prepared.data_map_address;
1003        match prepared.payment_info {
1004            ExternalPaymentInfo::WaveBatch {
1005                prepared_chunks,
1006                payment_intent: _,
1007            } => {
1008                let paid_chunks = finalize_batch_payment(prepared_chunks, tx_hash_map)?;
1009                let wave_result = self.store_paid_chunks(paid_chunks).await;
1010                if !wave_result.failed.is_empty() {
1011                    let failed_count = wave_result.failed.len();
1012                    let stored_count = wave_result.stored.len();
1013                    return Err(Error::PartialUpload {
1014                        stored: wave_result.stored.clone(),
1015                        stored_count,
1016                        failed: wave_result.failed,
1017                        failed_count,
1018                        total_chunks: stored_count + failed_count,
1019                        reason: "finalize_upload: chunk storage failed after retries".into(),
1020                    });
1021                }
1022                let chunks_stored = wave_result.stored.len();
1023
1024                info!("External-signer upload finalized: {chunks_stored} chunks stored");
1025
1026                Ok(FileUploadResult {
1027                    data_map: prepared.data_map,
1028                    chunks_stored,
1029                    chunks_failed: 0,
1030                    total_chunks: chunks_stored,
1031                    payment_mode_used: PaymentMode::Single,
1032                    storage_cost_atto: "0".into(),
1033                    gas_cost_wei: 0,
1034                    data_map_address,
1035                })
1036            }
1037            ExternalPaymentInfo::Merkle { .. } => Err(Error::Payment(
1038                "Cannot finalize merkle upload with wave-batch tx hashes. \
1039                 Use finalize_upload_merkle() instead."
1040                    .to_string(),
1041            )),
1042        }
1043    }
1044
1045    /// Phase 2 of external-signer upload (merkle): finalize with winner pool hash.
1046    ///
1047    /// Takes a [`PreparedUpload`] that used merkle payment and the `winner_pool_hash`
1048    /// returned by the on-chain merkle payment transaction. Generates proofs and
1049    /// stores chunks on the network.
1050    ///
1051    /// # Errors
1052    ///
1053    /// Returns an error if the prepared upload used wave-batch payment (use
1054    /// [`Client::finalize_upload`] instead), proof generation fails,
1055    /// or any chunk cannot be stored.
1056    pub async fn finalize_upload_merkle(
1057        &self,
1058        prepared: PreparedUpload,
1059        winner_pool_hash: [u8; 32],
1060    ) -> Result<FileUploadResult> {
1061        let data_map_address = prepared.data_map_address;
1062        match prepared.payment_info {
1063            ExternalPaymentInfo::Merkle {
1064                prepared_batch,
1065                chunk_contents,
1066                chunk_addresses,
1067            } => {
1068                let batch_result = finalize_merkle_batch(prepared_batch, winner_pool_hash)?;
1069                let chunks_stored = self
1070                    .merkle_upload_chunks(chunk_contents, chunk_addresses, &batch_result)
1071                    .await?;
1072
1073                info!("External-signer merkle upload finalized: {chunks_stored} chunks stored");
1074
1075                Ok(FileUploadResult {
1076                    data_map: prepared.data_map,
1077                    chunks_stored,
1078                    chunks_failed: 0,
1079                    total_chunks: chunks_stored,
1080                    payment_mode_used: PaymentMode::Merkle,
1081                    storage_cost_atto: "0".into(),
1082                    gas_cost_wei: 0,
1083                    data_map_address,
1084                })
1085            }
1086            ExternalPaymentInfo::WaveBatch { .. } => Err(Error::Payment(
1087                "Cannot finalize wave-batch upload with merkle winner hash. \
1088                 Use finalize_upload() instead."
1089                    .to_string(),
1090            )),
1091        }
1092    }
1093
1094    /// Upload a file with a specific payment mode.
1095    ///
1096    /// Before encryption, checks that the temp directory has enough free
1097    /// disk space for the spilled chunks (~1.1× source file size).
1098    ///
1099    /// Encrypted chunks are spilled to a temp directory during encryption
1100    /// so that only their 32-byte addresses stay in memory. At upload time,
1101    /// chunks are read back one wave at a time (~64 × 4 MB ≈ 256 MB peak).
1102    ///
1103    /// # Errors
1104    ///
1105    /// Returns an error if there is insufficient disk space, the file cannot
1106    /// be read, encryption fails, or any chunk cannot be stored.
1107    #[allow(clippy::too_many_lines)]
1108    pub async fn file_upload_with_mode(
1109        &self,
1110        path: &Path,
1111        mode: PaymentMode,
1112    ) -> Result<FileUploadResult> {
1113        self.file_upload_with_progress(path, mode, None).await
1114    }
1115
1116    /// Upload a file with progress events sent to the given channel.
1117    ///
1118    /// Same as [`Client::file_upload_with_mode`] but sends [`UploadEvent`]s to the
1119    /// provided channel for UI progress feedback.
1120    #[allow(clippy::too_many_lines)]
1121    pub async fn file_upload_with_progress(
1122        &self,
1123        path: &Path,
1124        mode: PaymentMode,
1125        progress: Option<mpsc::Sender<UploadEvent>>,
1126    ) -> Result<FileUploadResult> {
1127        debug!(
1128            "Streaming file upload with mode {mode:?}: {}",
1129            path.display()
1130        );
1131
1132        // Pre-flight: verify enough temp disk space for the chunk spill.
1133        let file_size = std::fs::metadata(path)?.len();
1134        check_disk_space_for_spill(file_size)?;
1135
1136        // Phase 1: Encrypt file and spill chunks to temp directory.
1137        // Only 32-byte addresses stay in memory — chunk data lives on disk.
1138        let (spill, data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
1139
1140        let chunk_count = spill.len();
1141        info!(
1142            "Encrypted {} into {chunk_count} chunks (spilled to disk)",
1143            path.display()
1144        );
1145        if let Some(ref tx) = progress {
1146            let _ = tx
1147                .send(UploadEvent::Encrypted {
1148                    total_chunks: chunk_count,
1149                })
1150                .await;
1151        }
1152
1153        // Phase 2: Decide payment mode and upload in waves from disk.
1154        let (chunks_stored, actual_mode, storage_cost_atto, gas_cost_wei) =
1155            if self.should_use_merkle(chunk_count, mode) {
1156                info!("Using merkle batch payment for {chunk_count} file chunks");
1157
1158                let batch_result = match self
1159                    .pay_for_merkle_batch(&spill.addresses, DATA_TYPE_CHUNK, spill.avg_chunk_size())
1160                    .await
1161                {
1162                    Ok(result) => result,
1163                    Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => {
1164                        info!("Merkle needs more peers ({msg}), falling back to wave-batch");
1165                        let (stored, sc, gc) =
1166                            self.upload_waves_single(&spill, progress.as_ref()).await?;
1167                        return Ok(FileUploadResult {
1168                            data_map,
1169                            chunks_stored: stored,
1170                            chunks_failed: 0,
1171                            total_chunks: chunk_count,
1172                            payment_mode_used: PaymentMode::Single,
1173                            storage_cost_atto: sc,
1174                            gas_cost_wei: gc,
1175                            data_map_address: None,
1176                        });
1177                    }
1178                    Err(e) => return Err(e),
1179                };
1180
1181                let (stored, sc, gc) = self
1182                    .upload_waves_merkle(&spill, &batch_result, progress.as_ref())
1183                    .await?;
1184                (stored, PaymentMode::Merkle, sc, gc)
1185            } else {
1186                let (stored, sc, gc) = self.upload_waves_single(&spill, progress.as_ref()).await?;
1187                (stored, PaymentMode::Single, sc, gc)
1188            };
1189
1190        info!(
1191            "File uploaded with {actual_mode:?}: {chunks_stored} chunks stored ({})",
1192            path.display()
1193        );
1194
1195        Ok(FileUploadResult {
1196            data_map,
1197            chunks_stored,
1198            chunks_failed: 0,
1199            total_chunks: chunk_count,
1200            payment_mode_used: actual_mode,
1201            storage_cost_atto,
1202            gas_cost_wei,
1203            data_map_address: None,
1204        })
1205    }
1206
1207    /// Encrypt a file and spill chunks to a temp directory.
1208    ///
1209    /// Logs progress every 100 chunks so users get feedback during
1210    /// multi-GB encryptions.
1211    ///
1212    /// Returns the spill buffer (addresses on disk) and the `DataMap`.
1213    async fn encrypt_file_to_spill(
1214        &self,
1215        path: &Path,
1216        progress: Option<&mpsc::Sender<UploadEvent>>,
1217    ) -> Result<(ChunkSpill, DataMap)> {
1218        let (mut chunk_rx, datamap_rx, handle) = spawn_file_encryption(path.to_path_buf())?;
1219
1220        let mut spill = ChunkSpill::new()?;
1221        while let Some(content) = chunk_rx.recv().await {
1222            spill.push(&content)?;
1223            let chunks_done = spill.len();
1224            if let Some(tx) = progress {
1225                if chunks_done.is_multiple_of(10) {
1226                    let _ = tx.send(UploadEvent::Encrypting { chunks_done }).await;
1227                }
1228            }
1229            if chunks_done % 100 == 0 {
1230                let mb = spill.total_bytes() / (1024 * 1024);
1231                info!(
1232                    "Encryption progress: {chunks_done} chunks spilled ({mb} MB) — {}",
1233                    path.display()
1234                );
1235            }
1236        }
1237
1238        // Await encryption completion to catch errors before paying.
1239        handle
1240            .await
1241            .map_err(|e| Error::Encryption(format!("encryption task panicked: {e}")))?
1242            .map_err(|e| Error::Encryption(format!("encryption failed: {e}")))?;
1243
1244        let data_map = datamap_rx
1245            .await
1246            .map_err(|_| Error::Encryption("no DataMap from encryption thread".to_string()))?;
1247
1248        Ok((spill, data_map))
1249    }
1250
1251    /// Upload chunks from a spill using wave-based per-chunk (single) payments.
1252    ///
1253    /// Reads one wave at a time from disk, prepares quotes, pays, and stores.
1254    /// Peak memory: ~`UPLOAD_WAVE_SIZE × MAX_CHUNK_SIZE` (~256 MB).
1255    ///
1256    /// Returns `(chunks_stored, storage_cost_atto, gas_cost_wei)`.
1257    async fn upload_waves_single(
1258        &self,
1259        spill: &ChunkSpill,
1260        progress: Option<&mpsc::Sender<UploadEvent>>,
1261    ) -> Result<(usize, String, u128)> {
1262        let mut total_stored = 0usize;
1263        let mut total_storage = Amount::ZERO;
1264        let mut total_gas: u128 = 0;
1265        let total_chunks = spill.len();
1266        let waves: Vec<&[[u8; 32]]> = spill.waves().collect();
1267        let wave_count = waves.len();
1268
1269        for (wave_idx, wave_addrs) in waves.into_iter().enumerate() {
1270            let wave_num = wave_idx + 1;
1271            let wave_data: Vec<Bytes> = wave_addrs
1272                .iter()
1273                .map(|addr| spill.read_chunk(addr))
1274                .collect::<Result<Vec<_>>>()?;
1275
1276            info!(
1277                "Wave {wave_num}/{wave_count}: quoting {} chunks — {total_stored}/{total_chunks} stored so far",
1278                wave_data.len()
1279            );
1280            if let Some(tx) = progress {
1281                let _ = tx
1282                    .send(UploadEvent::QuotingChunks {
1283                        wave: wave_num,
1284                        total_waves: wave_count,
1285                        chunks_in_wave: wave_data.len(),
1286                    })
1287                    .await;
1288            }
1289            let (addresses, wave_storage, wave_gas) = self
1290                .batch_upload_chunks_with_events(wave_data, progress, total_stored, total_chunks)
1291                .await?;
1292            total_stored += addresses.len();
1293            if let Ok(cost) = wave_storage.parse::<Amount>() {
1294                total_storage += cost;
1295            }
1296            total_gas = total_gas.saturating_add(wave_gas);
1297            if let Some(tx) = progress {
1298                let _ = tx
1299                    .send(UploadEvent::WaveComplete {
1300                        wave: wave_num,
1301                        total_waves: wave_count,
1302                        stored_so_far: total_stored,
1303                        total: total_chunks,
1304                    })
1305                    .await;
1306            }
1307        }
1308
1309        Ok((total_stored, total_storage.to_string(), total_gas))
1310    }
1311
1312    /// Upload chunks from a spill using pre-computed merkle proofs.
1313    ///
1314    /// Reads one wave at a time from disk, pairs each chunk with its proof,
1315    /// and uploads concurrently. Peak memory: ~`UPLOAD_WAVE_SIZE × MAX_CHUNK_SIZE`.
1316    ///
1317    /// Returns `(chunks_stored, storage_cost_atto, gas_cost_wei)`.
1318    /// Costs come from the `batch_result` which was populated during payment.
1319    async fn upload_waves_merkle(
1320        &self,
1321        spill: &ChunkSpill,
1322        batch_result: &MerkleBatchPaymentResult,
1323        progress: Option<&mpsc::Sender<UploadEvent>>,
1324    ) -> Result<(usize, String, u128)> {
1325        let mut total_stored = 0usize;
1326        let total_chunks = spill.len();
1327        let waves: Vec<&[[u8; 32]]> = spill.waves().collect();
1328        let wave_count = waves.len();
1329        let mut stored_addresses: Vec<[u8; 32]> = Vec::new();
1330
1331        for (wave_idx, wave_addrs) in waves.into_iter().enumerate() {
1332            let wave_num = wave_idx + 1;
1333            let wave = spill.read_wave(wave_addrs)?;
1334
1335            info!(
1336                "Wave {wave_num}/{wave_count}: storing {} chunks (merkle) — {total_stored}/{total_chunks} stored so far",
1337                wave.len()
1338            );
1339
1340            let mut upload_stream = stream::iter(wave.into_iter().map(|(content, addr)| {
1341                let proof_bytes = batch_result.proofs.get(&addr).cloned();
1342                async move {
1343                    let proof = proof_bytes.ok_or_else(|| {
1344                        (
1345                            addr,
1346                            Error::Payment(format!(
1347                                "Missing merkle proof for chunk {}",
1348                                hex::encode(addr)
1349                            )),
1350                        )
1351                    })?;
1352                    let peers = self.close_group_peers(&addr).await.map_err(|e| (addr, e))?;
1353                    self.chunk_put_to_close_group(content, proof, &peers)
1354                        .await
1355                        .map(|_| addr)
1356                        .map_err(|e| (addr, e))
1357                }
1358            }))
1359            .buffer_unordered(self.config().store_concurrency);
1360
1361            while let Some(result) = upload_stream.next().await {
1362                match result {
1363                    Ok(addr) => {
1364                        stored_addresses.push(addr);
1365                        total_stored += 1;
1366                        info!("Stored {total_stored}/{total_chunks}");
1367                        if let Some(tx) = progress {
1368                            let _ = tx
1369                                .send(UploadEvent::ChunkStored {
1370                                    stored: total_stored,
1371                                    total: total_chunks,
1372                                })
1373                                .await;
1374                        }
1375                    }
1376                    Err((addr, e)) => {
1377                        warn!("merkle upload failed for chunk {}: {e}", hex::encode(addr));
1378                        return Err(Error::PartialUpload {
1379                            stored: stored_addresses,
1380                            stored_count: total_stored,
1381                            failed: vec![(addr, e.to_string())],
1382                            failed_count: 1,
1383                            total_chunks,
1384                            reason: format!("merkle chunk upload failed: {e}"),
1385                        });
1386                    }
1387                }
1388            }
1389
1390            if let Some(tx) = progress {
1391                let _ = tx
1392                    .send(UploadEvent::WaveComplete {
1393                        wave: wave_num,
1394                        total_waves: wave_count,
1395                        stored_so_far: total_stored,
1396                        total: total_chunks,
1397                    })
1398                    .await;
1399            }
1400        }
1401
1402        Ok((
1403            total_stored,
1404            batch_result.storage_cost_atto.clone(),
1405            batch_result.gas_cost_wei,
1406        ))
1407    }
1408
1409    /// Download and decrypt a file from the network, writing it to disk.
1410    ///
1411    /// Uses `streaming_decrypt` so that only one batch of chunks lives in
1412    /// memory at a time, avoiding OOM on large files. Chunks are fetched
1413    /// concurrently within each batch, then decrypted data is written to
1414    /// disk incrementally.
1415    ///
1416    /// Returns the number of bytes written.
1417    ///
1418    /// # Panics
1419    ///
1420    /// Requires a multi-threaded Tokio runtime (`flavor = "multi_thread"`).
1421    /// Will panic if called from a `current_thread` runtime because
1422    /// `streaming_decrypt` takes a synchronous callback that must bridge
1423    /// back to async via `block_in_place`.
1424    ///
1425    /// # Errors
1426    ///
1427    /// Returns an error if any chunk cannot be retrieved, decryption fails,
1428    /// or the file cannot be written.
1429    #[allow(clippy::unused_async)]
1430    pub async fn file_download(&self, data_map: &DataMap, output: &Path) -> Result<u64> {
1431        self.file_download_with_progress(data_map, output, None)
1432            .await
1433    }
1434
1435    /// Download and decrypt a file with progress events.
1436    ///
1437    /// Same as [`Client::file_download`] but sends [`DownloadEvent`]s for UI feedback.
1438    ///
1439    /// Progress reporting:
1440    /// 1. Resolves hierarchical DataMaps to the root level first (reports as
1441    ///    `ChunksFetched` with `total: 0` during resolution)
1442    /// 2. Once the root DataMap is known, sends `total_chunks` with accurate count
1443    /// 3. Fetches data chunks with accurate `fetched/total` progress
1444    #[allow(clippy::unused_async)]
1445    pub async fn file_download_with_progress(
1446        &self,
1447        data_map: &DataMap,
1448        output: &Path,
1449        progress: Option<mpsc::Sender<DownloadEvent>>,
1450    ) -> Result<u64> {
1451        debug!("Downloading file to {}", output.display());
1452
1453        let handle = Handle::current();
1454
1455        // Phase 1: Resolve hierarchical DataMap to root level.
1456        // This fetches child DataMap chunks (typically 3) to discover the real chunk count.
1457        let root_map = if data_map.is_child() {
1458            let dm_chunks = data_map.len();
1459            if let Some(ref tx) = progress {
1460                let _ = tx.try_send(DownloadEvent::ResolvingDataMap {
1461                    total_map_chunks: dm_chunks,
1462                });
1463            }
1464
1465            let resolve_progress = progress.clone();
1466            let resolve_counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
1467
1468            let resolved = tokio::task::block_in_place(|| {
1469                let counter_ref = resolve_counter.clone();
1470                let progress_ref = resolve_progress.clone();
1471                let fetch = |batch: &[(usize, XorName)]| {
1472                    let batch_owned: Vec<(usize, XorName)> = batch.to_vec();
1473                    let counter = counter_ref.clone();
1474                    let prog = progress_ref.clone();
1475                    handle.block_on(async {
1476                        let mut futs = futures::stream::FuturesUnordered::new();
1477                        for (idx, hash) in batch_owned {
1478                            let addr = hash.0;
1479                            futs.push(async move {
1480                                let result = self.chunk_get(&addr).await;
1481                                (idx, hash, result)
1482                            });
1483                        }
1484                        let mut results = Vec::with_capacity(futs.len());
1485                        while let Some((idx, hash, result)) =
1486                            futures::StreamExt::next(&mut futs).await
1487                        {
1488                            let chunk = result
1489                                .map_err(|e| {
1490                                    self_encryption::Error::Generic(format!(
1491                                        "DataMap resolution failed: {e}"
1492                                    ))
1493                                })?
1494                                .ok_or_else(|| {
1495                                    self_encryption::Error::Generic(format!(
1496                                        "DataMap chunk not found: {}",
1497                                        hex::encode(hash.0)
1498                                    ))
1499                                })?;
1500                            results.push((idx, chunk.content));
1501                            let fetched =
1502                                counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
1503                            if let Some(ref tx) = prog {
1504                                let _ = tx.try_send(DownloadEvent::MapChunkFetched { fetched });
1505                            }
1506                        }
1507                        Ok(results)
1508                    })
1509                };
1510                get_root_data_map_parallel(data_map.clone(), &fetch)
1511            })
1512            .map_err(|e| Error::Encryption(format!("DataMap resolution failed: {e}")))?;
1513
1514            info!(
1515                "Resolved hierarchical DataMap: {} data chunks",
1516                resolved.len()
1517            );
1518            resolved
1519        } else {
1520            data_map.clone()
1521        };
1522
1523        // Phase 2: Now we know the real chunk count.
1524        let total_chunks = root_map.len();
1525        if let Some(ref tx) = progress {
1526            let _ = tx.try_send(DownloadEvent::DataMapResolved { total_chunks });
1527        }
1528
1529        // Phase 3: Fetch and decrypt data chunks with accurate progress.
1530        let fetched_counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
1531        let fetched_for_closure = fetched_counter.clone();
1532        let progress_for_closure = progress.clone();
1533
1534        let stream = streaming_decrypt(&root_map, |batch: &[(usize, XorName)]| {
1535            let batch_owned: Vec<(usize, XorName)> = batch.to_vec();
1536            let fetched_ref = fetched_for_closure.clone();
1537            let progress_ref = progress_for_closure.clone();
1538
1539            tokio::task::block_in_place(|| {
1540                handle.block_on(async {
1541                    let mut futs = futures::stream::FuturesUnordered::new();
1542                    for (idx, hash) in batch_owned {
1543                        let addr = hash.0;
1544                        futs.push(async move {
1545                            let result = self.chunk_get(&addr).await;
1546                            (idx, hash, result)
1547                        });
1548                    }
1549
1550                    let mut results = Vec::with_capacity(futs.len());
1551                    while let Some((idx, hash, result)) = futures::StreamExt::next(&mut futs).await
1552                    {
1553                        let addr_hex = hex::encode(hash.0);
1554                        let chunk = result
1555                            .map_err(|e| {
1556                                self_encryption::Error::Generic(format!(
1557                                    "Network fetch failed for {addr_hex}: {e}"
1558                                ))
1559                            })?
1560                            .ok_or_else(|| {
1561                                self_encryption::Error::Generic(format!(
1562                                    "Chunk not found: {addr_hex}"
1563                                ))
1564                            })?;
1565                        results.push((idx, chunk.content));
1566                        let fetched =
1567                            fetched_ref.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
1568                        info!("Downloaded {fetched}/{total_chunks}");
1569                        if let Some(ref tx) = progress_ref {
1570                            let _ = tx.try_send(DownloadEvent::ChunksFetched {
1571                                fetched,
1572                                total: total_chunks,
1573                            });
1574                        }
1575                    }
1576                    Ok(results)
1577                })
1578            })
1579        })
1580        .map_err(|e| Error::Encryption(format!("streaming decrypt failed: {e}")))?;
1581
1582        // Write decrypted chunks to a temp file, then rename atomically.
1583        let parent = output.parent().unwrap_or_else(|| Path::new("."));
1584        let unique: u64 = rand::random();
1585        let tmp_path = parent.join(format!(".ant_download_{}_{unique}.tmp", std::process::id()));
1586
1587        let write_result = (|| -> Result<u64> {
1588            let mut file = std::fs::File::create(&tmp_path)?;
1589            let mut bytes_written = 0u64;
1590            for chunk_result in stream {
1591                let chunk_bytes = chunk_result
1592                    .map_err(|e| Error::Encryption(format!("decryption failed: {e}")))?;
1593                file.write_all(&chunk_bytes)?;
1594                bytes_written += chunk_bytes.len() as u64;
1595            }
1596            file.flush()?;
1597            Ok(bytes_written)
1598        })();
1599
1600        match write_result {
1601            Ok(bytes_written) => match std::fs::rename(&tmp_path, output) {
1602                Ok(()) => {
1603                    info!(
1604                        "File downloaded: {bytes_written} bytes written to {}",
1605                        output.display()
1606                    );
1607                    Ok(bytes_written)
1608                }
1609                Err(rename_err) => {
1610                    if let Err(cleanup_err) = std::fs::remove_file(&tmp_path) {
1611                        warn!(
1612                            "Failed to remove temp download file {}: {cleanup_err}",
1613                            tmp_path.display()
1614                        );
1615                    }
1616                    Err(rename_err.into())
1617                }
1618            },
1619            Err(e) => {
1620                if let Err(cleanup_err) = std::fs::remove_file(&tmp_path) {
1621                    warn!(
1622                        "Failed to remove temp download file {}: {cleanup_err}",
1623                        tmp_path.display()
1624                    );
1625                }
1626                Err(e)
1627            }
1628        }
1629    }
1630}
1631
1632#[cfg(test)]
1633#[allow(clippy::unwrap_used)]
1634mod tests {
1635    use super::*;
1636
1637    #[test]
1638    fn disk_space_check_passes_for_small_file() {
1639        // A 1 KB file should always pass the disk space check
1640        check_disk_space_for_spill(1024).unwrap();
1641    }
1642
1643    #[test]
1644    fn disk_space_check_fails_for_absurd_size() {
1645        // Requesting space for a 1 exabyte file should fail on any real system
1646        let result = check_disk_space_for_spill(u64::MAX / 2);
1647        assert!(result.is_err());
1648        let err = result.unwrap_err();
1649        assert!(
1650            matches!(err, Error::InsufficientDiskSpace(_)),
1651            "expected InsufficientDiskSpace, got: {err}"
1652        );
1653    }
1654
1655    #[test]
1656    fn chunk_spill_round_trip() {
1657        let mut spill = ChunkSpill::new().unwrap();
1658        let data1 = vec![0xAA; 1024];
1659        let data2 = vec![0xBB; 2048];
1660
1661        spill.push(&data1).unwrap();
1662        spill.push(&data2).unwrap();
1663
1664        assert_eq!(spill.len(), 2);
1665        assert_eq!(spill.total_bytes(), 1024 + 2048);
1666        assert_eq!(spill.avg_chunk_size(), (1024 + 2048) / 2);
1667
1668        // Read back and verify
1669        let chunk1 = spill.read_chunk(spill.addresses.first().unwrap()).unwrap();
1670        assert_eq!(&chunk1[..], &data1[..]);
1671
1672        let chunk2 = spill.read_chunk(spill.addresses.get(1).unwrap()).unwrap();
1673        assert_eq!(&chunk2[..], &data2[..]);
1674
1675        // Verify waves with 1-chunk wave size
1676        let waves: Vec<_> = spill.addresses.chunks(1).collect();
1677        assert_eq!(waves.len(), 2);
1678    }
1679
1680    #[test]
1681    fn chunk_spill_cleanup_on_drop() {
1682        let dir;
1683        {
1684            let spill = ChunkSpill::new().unwrap();
1685            dir = spill.dir.clone();
1686            assert!(dir.exists());
1687        }
1688        // After drop, the directory should be cleaned up
1689        assert!(!dir.exists(), "spill dir should be removed on drop");
1690    }
1691
1692    #[test]
1693    fn chunk_spill_deduplicates_identical_content() {
1694        let mut spill = ChunkSpill::new().unwrap();
1695        let data = vec![0xCC; 512];
1696
1697        spill.push(&data).unwrap();
1698        spill.push(&data).unwrap(); // same content, should be skipped
1699        spill.push(&data).unwrap(); // again
1700
1701        assert_eq!(spill.len(), 1, "duplicate chunks should be deduplicated");
1702        assert_eq!(
1703            spill.total_bytes(),
1704            512,
1705            "total_bytes should count unique only"
1706        );
1707
1708        // Different content should still be added
1709        let data2 = vec![0xDD; 256];
1710        spill.push(&data2).unwrap();
1711        assert_eq!(spill.len(), 2);
1712        assert_eq!(spill.total_bytes(), 512 + 256);
1713    }
1714}
1715
1716/// Compile-time assertions that Client file method futures are Send.
1717#[cfg(test)]
1718mod send_assertions {
1719    use super::*;
1720
1721    fn _assert_send<T: Send>(_: &T) {}
1722
1723    #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
1724    async fn _file_upload_is_send(client: &Client) {
1725        let fut = client.file_upload(Path::new("/dev/null"));
1726        _assert_send(&fut);
1727    }
1728
1729    #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
1730    async fn _file_upload_with_mode_is_send(client: &Client) {
1731        let fut = client.file_upload_with_mode(Path::new("/dev/null"), PaymentMode::Auto);
1732        _assert_send(&fut);
1733    }
1734
1735    #[allow(
1736        dead_code,
1737        unreachable_code,
1738        unused_variables,
1739        clippy::diverging_sub_expression
1740    )]
1741    async fn _file_download_is_send(client: &Client) {
1742        let dm: DataMap = todo!();
1743        let fut = client.file_download(&dm, Path::new("/dev/null"));
1744        _assert_send(&fut);
1745    }
1746}