ant_core/data/client/file.rs
1//! File operations using streaming self-encryption.
2//!
3//! Upload files directly from disk without loading them entirely into memory.
4//! Uses `stream_encrypt` to process files in 8KB chunks, encrypting and
5//! uploading each piece as it's produced.
6//!
7//! Encrypted chunks are spilled to a temporary directory during encryption
8//! so that peak memory usage is bounded to one wave (~256 MB for 64 × 4 MB
9//! chunks) regardless of file size.
10//!
11//! For in-memory data uploads, see the `data` module.
12
13use crate::data::client::adaptive::observe_op;
14use crate::data::client::batch::{
15 finalize_batch_payment, PaymentIntent, PreparedChunk, WaveAggregateStats,
16};
17use crate::data::client::classify_error;
18use crate::data::client::merkle::{
19 finalize_merkle_batch, should_use_merkle, MerkleBatchPaymentResult, PaymentMode,
20 PreparedMerkleBatch,
21};
22use crate::data::client::Client;
23use crate::data::error::{Error, Result};
24use ant_protocol::evm::{Amount, PaymentQuote, QuoteHash, TxHash, MAX_LEAVES};
25use ant_protocol::transport::{MultiAddr, PeerId};
26use ant_protocol::{compute_address, DATA_TYPE_CHUNK};
27use bytes::Bytes;
28use fs2::FileExt;
29use futures::stream::{self, StreamExt};
30use self_encryption::{get_root_data_map_parallel, stream_encrypt, streaming_decrypt, DataMap};
31use std::collections::{HashMap, HashSet};
32use std::io::Write;
33use std::path::{Path, PathBuf};
34use std::sync::{Arc, Mutex};
35use tokio::runtime::Handle;
36use tokio::sync::mpsc;
37use tracing::{debug, info, warn};
38use xor_name::XorName;
39
40/// Progress events emitted during file upload for UI feedback.
41#[derive(Debug, Clone)]
42pub enum UploadEvent {
43 /// A chunk has been encrypted and spilled to disk.
44 Encrypting { chunks_done: usize },
45 /// File encryption complete.
46 Encrypted { total_chunks: usize },
47 /// Starting quote collection for a wave.
48 QuotingChunks {
49 wave: usize,
50 total_waves: usize,
51 chunks_in_wave: usize,
52 },
53 /// A chunk has been quoted (peer discovery + price received).
54 /// This is the slow phase — each quote involves network round-trips.
55 ChunkQuoted { quoted: usize, total: usize },
56 /// A chunk has been stored on the network.
57 ChunkStored { stored: usize, total: usize },
58 /// A wave has completed.
59 WaveComplete {
60 wave: usize,
61 total_waves: usize,
62 stored_so_far: usize,
63 total: usize,
64 },
65}
66
67/// Progress events emitted during file download for UI feedback.
68#[derive(Debug, Clone)]
69pub enum DownloadEvent {
70 /// Resolving hierarchical DataMap to discover real chunk count.
71 ResolvingDataMap { total_map_chunks: usize },
72 /// A DataMap chunk has been fetched during resolution.
73 MapChunkFetched { fetched: usize },
74 /// DataMap resolved — total data chunk count now known.
75 DataMapResolved { total_chunks: usize },
76 /// Data chunks are being fetched from the network.
77 ChunksFetched { fetched: usize, total: usize },
78}
79
80/// One entry in the per-chunk quote list returned by
81/// [`Client::get_store_quotes`]: the responding peer, its addresses, the
82/// signed quote it returned, and the payment amount it is demanding.
83type QuoteEntry = (PeerId, Vec<MultiAddr>, PaymentQuote, Amount);
84
85/// Number of chunks per upload wave (matches batch.rs PAYMENT_WAVE_SIZE).
86const UPLOAD_WAVE_SIZE: usize = 64;
87
88/// Maximum number of distinct chunk addresses to sample when probing for a
89/// representative quote in [`Client::estimate_upload_cost`].
90///
91/// Bounded small so we never spend more than a couple of round-trips on the
92/// `AlreadyStored` retry path, which only matters when many leading chunks
93/// of a file already live on the network.
94const ESTIMATE_SAMPLE_CAP: usize = 5;
95
96/// Gas used by one `pay_for_quotes` transaction that packs up to
97/// `UPLOAD_WAVE_SIZE` (quote_hash, rewards_address, amount) entries.
98///
99/// `batch_pay` in `batch.rs` flattens every chunk's close-group quotes into a
100/// single EVM call, so the dominant cost is the SSTOREs for each entry plus
101/// the base tx overhead. On Arbitrum that is roughly
102/// `21_000 + 64 × (20_000 + small)` ≈ 1.3M; we round up to 1.5M as a
103/// conservative per-wave upper bound.
104const GAS_PER_WAVE_TX: u128 = 1_500_000;
105
106/// Gas used by one merkle batch payment transaction.
107///
108/// One on-chain tx per merkle sub-batch, but each tx verifies a merkle tree
109/// and posts a pool commitment, so budget higher than a plain transfer.
110const GAS_PER_MERKLE_TX: u128 = 500_000;
111
112/// Advisory gas price (wei/gas) used to turn the gas estimate into an ETH
113/// figure when no live gas oracle is consulted.
114///
115/// Arbitrum One typically settles around 0.1 gwei on quiet blocks; we use
116/// that as the default so the CLI prints a sensible order-of-magnitude
117/// number. Users should treat the reported gas cost as an estimate, not a
118/// commitment — real gas is bid at submission time.
119const ARBITRUM_GAS_PRICE_WEI: u128 = 100_000_000;
120
121/// Extra headroom percentage for disk space check.
122///
123/// Encrypted chunks are slightly larger than the source data due to padding
124/// and self-encryption overhead. We require file_size + 10% free space in
125/// the temp directory to account for this.
126const DISK_SPACE_HEADROOM_PERCENT: u64 = 10;
127
128/// Temporary on-disk buffer for encrypted chunks.
129///
130/// During file encryption, chunks are written to a temp directory so that
131/// only their 32-byte addresses stay in memory. At upload time chunks are
132/// read back one wave at a time, keeping peak RAM at ~`UPLOAD_WAVE_SIZE × 4 MB`.
133/// Grace period (in seconds) before a spill dir is eligible for stale cleanup.
134///
135/// This is a small TOCTOU guard covering the sub-millisecond window inside
136/// [`ChunkSpill::new`] between `create_dir` and `try_lock_exclusive`. Once a
137/// dir is older than this and its lockfile is releasable, the owning process
138/// is gone and the dir is safe to reap — regardless of how old it is.
139///
140/// The previous policy waited 24 h before reaping any orphan, which meant
141/// that any non-graceful exit (SIGKILL, kernel OOM, panic abort) leaked its
142/// spill dir until the next day's upload — and on a host being restart-looped
143/// by systemd, orphans could fill the disk well within that window.
144const SPILL_STALE_GRACE_SECS: u64 = 30;
145
146/// Prefix for spill directory names to distinguish from user files.
147const SPILL_DIR_PREFIX: &str = "spill_";
148
149/// Lockfile name inside each spill dir to signal active use.
150const SPILL_LOCK_NAME: &str = ".lock";
151
152struct ChunkSpill {
153 /// Directory holding spilled chunk files (named by hex address).
154 dir: PathBuf,
155 /// Lockfile held for the lifetime of this spill (prevents stale cleanup).
156 _lock: std::fs::File,
157 /// Deduplicated list of chunk addresses.
158 addresses: Vec<[u8; 32]>,
159 /// Tracks seen addresses for deduplication.
160 seen: HashSet<[u8; 32]>,
161 /// Running total of unique chunk byte sizes (for average-size calculation).
162 total_bytes: u64,
163}
164
165impl ChunkSpill {
166 /// Return the parent directory for all spill dirs: `<data_dir>/spill/`.
167 fn spill_root() -> Result<PathBuf> {
168 use crate::config;
169 let root = config::data_dir()
170 .map_err(|e| Error::Config(format!("cannot determine data dir for spill: {e}")))?
171 .join("spill");
172 Ok(root)
173 }
174
175 /// Create a new spill directory under `<data_dir>/spill/`.
176 ///
177 /// Directory name is `spill_<timestamp>_<random>` so orphans can be
178 /// identified by prefix and cleaned up by age. A lockfile inside the
179 /// dir prevents concurrent cleanup from deleting an active spill.
180 fn new() -> Result<Self> {
181 let root = Self::spill_root()?;
182 std::fs::create_dir_all(&root)?;
183
184 // Clean up stale spill dirs from previous crashed runs.
185 Self::cleanup_stale(&root);
186
187 let now = std::time::SystemTime::now()
188 .duration_since(std::time::UNIX_EPOCH)
189 .unwrap_or_default()
190 .as_secs();
191 let unique: u64 = rand::random();
192 let dir = root.join(format!("{SPILL_DIR_PREFIX}{now}_{unique}"));
193 std::fs::create_dir(&dir)?;
194
195 // Create and hold a lockfile for the lifetime of this spill.
196 // cleanup_stale() will skip dirs with locked files.
197 let lock_path = dir.join(SPILL_LOCK_NAME);
198 let lock_file = std::fs::File::create(&lock_path).map_err(|e| {
199 Error::Io(std::io::Error::new(
200 e.kind(),
201 format!("failed to create spill lockfile: {e}"),
202 ))
203 })?;
204 lock_file.try_lock_exclusive().map_err(|e| {
205 Error::Io(std::io::Error::new(
206 e.kind(),
207 format!("failed to lock spill lockfile: {e}"),
208 ))
209 })?;
210
211 Ok(Self {
212 dir,
213 _lock: lock_file,
214 addresses: Vec::new(),
215 seen: HashSet::new(),
216 total_bytes: 0,
217 })
218 }
219
220 /// Clean up stale spill directories. Best-effort, errors are logged.
221 ///
222 /// A spill dir is reaped when:
223 /// 1. Its name starts with `SPILL_DIR_PREFIX` (ignores unrelated files)
224 /// 2. It is an actual directory, not a symlink (prevents symlink attacks)
225 /// 3. Its timestamp is older than `SPILL_STALE_GRACE_SECS` (TOCTOU guard)
226 /// 4. Its lockfile is releasable — i.e. no live process holds it
227 ///
228 /// The lockfile is the primary correctness gate: a releasable lock means
229 /// the owning `ChunkSpill` has been dropped or the process is gone, so
230 /// the dir is fair game. The grace period covers only the brief window
231 /// inside [`Self::new`] between `create_dir` and `try_lock_exclusive`.
232 ///
233 /// Safe to call concurrently from multiple processes.
234 fn cleanup_stale(root: &Path) {
235 let now = std::time::SystemTime::now()
236 .duration_since(std::time::UNIX_EPOCH)
237 .unwrap_or_default()
238 .as_secs();
239
240 if now == 0 {
241 // Clock is broken (before Unix epoch). Skip cleanup to avoid
242 // misidentifying dirs as stale.
243 warn!("System clock before Unix epoch, skipping spill cleanup");
244 return;
245 }
246
247 let entries = match std::fs::read_dir(root) {
248 Ok(entries) => entries,
249 Err(_) => return,
250 };
251
252 for entry in entries.flatten() {
253 let name = entry.file_name();
254 let name_str = name.to_string_lossy();
255
256 // Only process dirs with our prefix.
257 let suffix = match name_str.strip_prefix(SPILL_DIR_PREFIX) {
258 Some(s) => s,
259 None => continue,
260 };
261
262 // Parse timestamp: "spill_<timestamp>_<random>"
263 let timestamp: u64 = match suffix.split('_').next().and_then(|s| s.parse().ok()) {
264 Some(ts) => ts,
265 None => continue,
266 };
267
268 if now.saturating_sub(timestamp) < SPILL_STALE_GRACE_SECS {
269 continue;
270 }
271
272 // Safety: only delete actual directories, not symlinks.
273 let file_type = match entry.file_type() {
274 Ok(ft) => ft,
275 Err(_) => continue,
276 };
277 if !file_type.is_dir() {
278 continue;
279 }
280
281 let path = entry.path();
282
283 // Check lockfile: if locked, the dir is in active use -- skip it.
284 let lock_path = path.join(SPILL_LOCK_NAME);
285 if let Ok(lock_file) = std::fs::File::open(&lock_path) {
286 use fs2::FileExt;
287 if lock_file.try_lock_exclusive().is_err() {
288 // Lock held by another process -- dir is active.
289 debug!("Skipping active spill dir: {}", path.display());
290 continue;
291 }
292 // We acquired the lock, so no one else holds it.
293 // Drop it before deleting.
294 drop(lock_file);
295 }
296
297 info!("Cleaning up stale spill dir: {}", path.display());
298 if let Err(e) = std::fs::remove_dir_all(&path) {
299 warn!("Failed to clean up stale spill dir {}: {e}", path.display());
300 }
301 }
302 }
303
304 /// Run stale spill cleanup. Call at client startup or periodically.
305 #[allow(dead_code)]
306 pub(crate) fn run_cleanup() {
307 if let Ok(root) = Self::spill_root() {
308 Self::cleanup_stale(&root);
309 }
310 }
311
312 /// Write one encrypted chunk to disk and record its address.
313 ///
314 /// Deduplicates by content address: if the same chunk was already
315 /// spilled, the write and accounting are skipped. This prevents
316 /// double-uploads and inflated quoting metrics.
317 fn push(&mut self, content: &[u8]) -> Result<()> {
318 let address = compute_address(content);
319 if !self.seen.insert(address) {
320 return Ok(());
321 }
322 let path = self.dir.join(hex::encode(address));
323 std::fs::write(&path, content)?;
324 self.total_bytes += content.len() as u64;
325 self.addresses.push(address);
326 Ok(())
327 }
328
329 /// Number of chunks stored.
330 fn len(&self) -> usize {
331 self.addresses.len()
332 }
333
334 /// Total bytes of all spilled chunks.
335 fn total_bytes(&self) -> u64 {
336 self.total_bytes
337 }
338
339 /// Average chunk size in bytes (for quoting metrics).
340 fn avg_chunk_size(&self) -> u64 {
341 if self.addresses.is_empty() {
342 return 0;
343 }
344 self.total_bytes / self.addresses.len() as u64
345 }
346
347 /// Read a single chunk back from disk by address.
348 fn read_chunk(&self, address: &[u8; 32]) -> Result<Bytes> {
349 let path = self.dir.join(hex::encode(address));
350 let data = std::fs::read(&path).map_err(|e| {
351 Error::Io(std::io::Error::new(
352 e.kind(),
353 format!("reading spilled chunk {}: {e}", hex::encode(address)),
354 ))
355 })?;
356 Ok(Bytes::from(data))
357 }
358
359 /// Iterate over address slices in wave-sized groups.
360 fn waves(&self) -> std::slice::Chunks<'_, [u8; 32]> {
361 self.addresses.chunks(UPLOAD_WAVE_SIZE)
362 }
363
364 /// Read a wave of chunks from disk.
365 fn read_wave(&self, wave_addrs: &[[u8; 32]]) -> Result<Vec<(Bytes, [u8; 32])>> {
366 let mut out = Vec::with_capacity(wave_addrs.len());
367 for addr in wave_addrs {
368 let content = self.read_chunk(addr)?;
369 out.push((content, *addr));
370 }
371 Ok(out)
372 }
373
374 /// Clean up the spill directory.
375 fn cleanup(&self) {
376 if let Err(e) = std::fs::remove_dir_all(&self.dir) {
377 warn!(
378 "Failed to clean up chunk spill dir {}: {e}",
379 self.dir.display()
380 );
381 }
382 }
383}
384
385impl Drop for ChunkSpill {
386 fn drop(&mut self) {
387 self.cleanup();
388 }
389}
390
391/// Check that the spill directory has enough free space for the spilled chunks.
392///
393/// `file_size` is the source file's byte count. We require
394/// `file_size + 10%` free space to account for self-encryption overhead.
395fn check_disk_space_for_spill(file_size: u64) -> Result<()> {
396 let spill_root = ChunkSpill::spill_root()?;
397
398 // Ensure the root exists so fs2 can query it.
399 std::fs::create_dir_all(&spill_root)?;
400
401 let available = fs2::available_space(&spill_root).map_err(|e| {
402 Error::Io(std::io::Error::new(
403 e.kind(),
404 format!(
405 "failed to query disk space on {}: {e}",
406 spill_root.display()
407 ),
408 ))
409 })?;
410
411 // Use integer arithmetic to avoid f64 precision loss on large file sizes.
412 let headroom = file_size / DISK_SPACE_HEADROOM_PERCENT;
413 let required = file_size.saturating_add(headroom);
414
415 if available < required {
416 let avail_mb = available / (1024 * 1024);
417 let req_mb = required / (1024 * 1024);
418 return Err(Error::InsufficientDiskSpace(format!(
419 "need ~{req_mb} MB in spill dir ({}) but only {avail_mb} MB available",
420 spill_root.display()
421 )));
422 }
423
424 debug!(
425 "Disk space check passed: {available} bytes available, {required} bytes required (spill: {})",
426 spill_root.display()
427 );
428 Ok(())
429}
430
431/// Whether the data map is published to the network for address-based retrieval.
432///
433/// A private upload stores only the data chunks and returns the `DataMap` to
434/// the caller — only someone holding that `DataMap` can reconstruct the file.
435/// A public upload additionally stores the serialized `DataMap` as a chunk on
436/// the network, yielding a single chunk address that anyone can use to
437/// retrieve the `DataMap` (via [`Client::data_map_fetch`]) and then the file.
438#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
439pub enum Visibility {
440 /// Keep the data map local; only the holder can retrieve the file.
441 #[default]
442 Private,
443 /// Publish the data map as a network chunk so anyone with the returned
444 /// address can retrieve and decrypt the file.
445 Public,
446}
447
448/// Estimated cost of uploading a file, returned by
449/// [`Client::estimate_upload_cost`].
450#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
451pub struct UploadCostEstimate {
452 /// Original file size in bytes.
453 pub file_size: u64,
454 /// Number of chunks the file would be split into (data chunks only,
455 /// does not include the DataMap chunk added during public uploads).
456 pub chunk_count: usize,
457 /// Estimated total storage cost in atto (token smallest unit).
458 pub storage_cost_atto: String,
459 /// Estimated gas cost in wei as a string. This is a rough heuristic
460 /// based on chunk count and payment mode, NOT a live gas price query.
461 pub estimated_gas_cost_wei: String,
462 /// Payment mode that would be used.
463 pub payment_mode: PaymentMode,
464}
465
466/// Result of a file upload: the `DataMap` needed to retrieve the file.
467///
468/// Marked `#[non_exhaustive]` so adding a new field in future is not a
469/// breaking change for downstream consumers that construct or pattern-match
470/// on this struct.
471#[derive(Debug, Clone)]
472#[non_exhaustive]
473pub struct FileUploadResult {
474 /// The data map containing chunk metadata for reconstruction.
475 pub data_map: DataMap,
476 /// Number of chunks stored on the network.
477 pub chunks_stored: usize,
478 /// Number of chunks that failed to store. Always 0 for a successful
479 /// upload — partial-failure information is conveyed via
480 /// [`crate::data::Error::PartialUpload`] instead.
481 pub chunks_failed: usize,
482 /// Total number of chunks the upload attempted to store. On full
483 /// success this equals `chunks_stored`.
484 pub total_chunks: usize,
485 /// Which payment mode was actually used (not just requested).
486 pub payment_mode_used: PaymentMode,
487 /// Total storage cost paid in token units (atto). "0" if all chunks already existed.
488 pub storage_cost_atto: String,
489 /// Total gas cost in wei. 0 if no on-chain transactions were made.
490 pub gas_cost_wei: u128,
491 /// Chunk address of the serialized `DataMap`, set only for
492 /// [`Visibility::Public`] uploads. **`Some` means this address is
493 /// retrievable from the network (via [`Client::data_map_fetch`])**, not
494 /// necessarily that *this* upload paid to store it — if the serialized
495 /// `DataMap` hashed to a chunk that was already on the network (same
496 /// file uploaded before; deterministic via self-encryption), the address
497 /// is still returned but no storage payment was made for it.
498 pub data_map_address: Option<[u8; 32]>,
499 /// Sum of chunk-store RPC attempts across the upload
500 /// (`>= chunks_stored` on full success; more if any chunk retried).
501 /// `0` for paths that don't run the wave store loop.
502 pub chunk_attempts_total: usize,
503 /// Per-chunk store wall-clock in ms (length == `chunks_stored` on full
504 /// success, empty for paths that don't run the wave store loop).
505 pub store_durations_ms: Vec<u64>,
506 /// Count of stored chunks that succeeded on each retry round
507 /// (index 0 = first attempt, 1 = first retry, etc.). All zeros for
508 /// paths that don't run the wave store loop.
509 pub retries_histogram: [usize; 4],
510}
511
512/// Payment information for external signing — either wave-batch or merkle.
513#[derive(Debug)]
514pub enum ExternalPaymentInfo {
515 /// Wave-batch: individual (quote_hash, rewards_address, amount) tuples.
516 WaveBatch {
517 /// Chunks ready for payment (needed for finalize).
518 prepared_chunks: Vec<PreparedChunk>,
519 /// Payment intent for external signing.
520 payment_intent: PaymentIntent,
521 },
522 /// Merkle: single on-chain call with depth, pool commitments, timestamp.
523 Merkle {
524 /// The prepared merkle batch (public fields sent to frontend, private fields stay in Rust).
525 prepared_batch: PreparedMerkleBatch,
526 /// Raw chunk contents (needed for upload after payment).
527 chunk_contents: Vec<Bytes>,
528 /// Chunk addresses in order (needed for upload after payment).
529 chunk_addresses: Vec<[u8; 32]>,
530 },
531}
532
533/// Prepared upload ready for external payment.
534///
535/// Contains everything needed to construct the on-chain payment transaction
536/// externally (e.g. via WalletConnect in a desktop app) and then finalize
537/// the upload without a Rust-side wallet.
538///
539/// Note: This struct stays in Rust memory — only the public fields of
540/// `payment_info` are sent to the frontend. `PreparedChunk` contains
541/// non-serializable network types, so the full struct cannot derive `Serialize`.
542///
543/// Marked `#[non_exhaustive]` so adding a new field in future is not a
544/// breaking change for downstream consumers.
545#[derive(Debug)]
546#[non_exhaustive]
547pub struct PreparedUpload {
548 /// The data map for later retrieval.
549 pub data_map: DataMap,
550 /// Payment information — either wave-batch or merkle depending on chunk count.
551 pub payment_info: ExternalPaymentInfo,
552 /// Chunk address of the serialized `DataMap` when this upload was
553 /// prepared with [`Visibility::Public`]. `Some` means the address is
554 /// retrievable on the network after finalization — either because this
555 /// upload paid to store the chunk in `payment_info`, or because the
556 /// chunk was already on the network (deterministic self-encryption).
557 /// Carried through to [`FileUploadResult::data_map_address`].
558 pub data_map_address: Option<[u8; 32]>,
559}
560
561/// Return type for [`spawn_file_encryption`]: chunk receiver, `DataMap` oneshot, join handle.
562type EncryptionChannels = (
563 tokio::sync::mpsc::Receiver<Bytes>,
564 tokio::sync::oneshot::Receiver<DataMap>,
565 tokio::task::JoinHandle<Result<()>>,
566);
567
568/// Spawn a blocking task that streams file encryption through a channel.
569fn spawn_file_encryption(path: PathBuf) -> Result<EncryptionChannels> {
570 let metadata = std::fs::metadata(&path)?;
571 let data_size = usize::try_from(metadata.len())
572 .map_err(|e| Error::Encryption(format!("file size exceeds platform usize: {e}")))?;
573
574 let (chunk_tx, chunk_rx) = tokio::sync::mpsc::channel(2);
575 let (datamap_tx, datamap_rx) = tokio::sync::oneshot::channel();
576
577 let handle = tokio::task::spawn_blocking(move || {
578 let file = std::fs::File::open(&path)?;
579 let mut reader = std::io::BufReader::new(file);
580
581 let read_error: Arc<Mutex<Option<std::io::Error>>> = Arc::new(Mutex::new(None));
582 let read_error_clone = Arc::clone(&read_error);
583
584 let data_iter = std::iter::from_fn(move || {
585 let mut buffer = vec![0u8; 8192];
586 match std::io::Read::read(&mut reader, &mut buffer) {
587 Ok(0) => None,
588 Ok(n) => {
589 buffer.truncate(n);
590 Some(Bytes::from(buffer))
591 }
592 Err(e) => {
593 let mut guard = read_error_clone
594 .lock()
595 .unwrap_or_else(|poisoned| poisoned.into_inner());
596 *guard = Some(e);
597 None
598 }
599 }
600 });
601
602 let mut stream = stream_encrypt(data_size, data_iter)
603 .map_err(|e| Error::Encryption(format!("stream_encrypt failed: {e}")))?;
604
605 for chunk_result in stream.chunks() {
606 // Check for captured read errors immediately after each chunk.
607 // stream_encrypt sees None (EOF) when a read fails, so it stops
608 // producing chunks. We must detect this before sending the
609 // partial results to avoid uploading a truncated DataMap.
610 {
611 let guard = read_error
612 .lock()
613 .unwrap_or_else(|poisoned| poisoned.into_inner());
614 if let Some(ref e) = *guard {
615 return Err(Error::Io(std::io::Error::new(e.kind(), e.to_string())));
616 }
617 }
618
619 let (_hash, content) = chunk_result
620 .map_err(|e| Error::Encryption(format!("chunk encryption failed: {e}")))?;
621 if chunk_tx.blocking_send(content).is_err() {
622 return Err(Error::Encryption("upload receiver dropped".to_string()));
623 }
624 }
625
626 // Final check: read error after last chunk (stream saw EOF).
627 {
628 let guard = read_error
629 .lock()
630 .unwrap_or_else(|poisoned| poisoned.into_inner());
631 if let Some(ref e) = *guard {
632 return Err(Error::Io(std::io::Error::new(e.kind(), e.to_string())));
633 }
634 }
635
636 let datamap = stream
637 .into_datamap()
638 .ok_or_else(|| Error::Encryption("no DataMap after encryption".to_string()))?;
639 if datamap_tx.send(datamap).is_err() {
640 warn!("DataMap receiver dropped — upload may have been cancelled");
641 }
642 Ok(())
643 });
644
645 Ok((chunk_rx, datamap_rx, handle))
646}
647
648impl Client {
649 /// Upload a file to the network using streaming self-encryption.
650 ///
651 /// Automatically selects merkle batch payment for files that produce
652 /// 64+ chunks (saves gas). Encrypted chunks are spilled to a temp
653 /// directory so peak memory stays at ~256 MB regardless of file size.
654 ///
655 /// # Errors
656 ///
657 /// Returns an error if the file cannot be read, encryption fails,
658 /// or any chunk cannot be stored.
659 pub async fn file_upload(&self, path: &Path) -> Result<FileUploadResult> {
660 self.file_upload_with_mode(path, PaymentMode::Auto).await
661 }
662
663 /// Estimate the cost of uploading a file without actually uploading.
664 ///
665 /// Encrypts the file to determine chunk count and sizes, then requests
666 /// a single quote from the network for a representative chunk. The
667 /// per-chunk price is extrapolated to the total chunk count.
668 ///
669 /// The estimate is fast (~2-5s) and does not require a wallet. Spilled
670 /// chunks are cleaned up automatically when the function returns.
671 ///
672 /// Gas cost is an advisory heuristic, not a live gas-oracle query. It is
673 /// derived from realistic per-transaction budgets (`GAS_PER_WAVE_TX`,
674 /// `GAS_PER_MERKLE_TX`) priced at `ARBITRUM_GAS_PRICE_WEI`. Real gas
675 /// varies with network conditions.
676 ///
677 /// If the first sampled chunk is already stored on the network, the
678 /// function retries with subsequent chunk addresses (up to
679 /// `ESTIMATE_SAMPLE_CAP`). If every sampled address reports stored,
680 /// a [`Error::CostEstimationInconclusive`] is returned so callers can
681 /// decide how to react rather than trust a bogus "free" estimate. Only
682 /// when every address in the file is stored do we return a zero-cost
683 /// estimate.
684 ///
685 /// # Errors
686 ///
687 /// Returns an error if the file cannot be read, encryption fails,
688 /// the network cannot provide a quote, or every sampled chunk is
689 /// already stored ([`Error::CostEstimationInconclusive`]).
690 pub async fn estimate_upload_cost(
691 &self,
692 path: &Path,
693 mode: PaymentMode,
694 progress: Option<mpsc::Sender<UploadEvent>>,
695 ) -> Result<UploadCostEstimate> {
696 let file_size = std::fs::metadata(path).map_err(Error::Io)?.len();
697
698 if file_size < 3 {
699 return Err(Error::InvalidData(
700 "File too small: self-encryption requires at least 3 bytes".into(),
701 ));
702 }
703
704 check_disk_space_for_spill(file_size)?;
705
706 info!(
707 "Estimating upload cost for {} ({file_size} bytes)",
708 path.display()
709 );
710
711 let (spill, _data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
712 let chunk_count = spill.len();
713
714 if let Some(ref tx) = progress {
715 let _ = tx
716 .send(UploadEvent::Encrypted {
717 total_chunks: chunk_count,
718 })
719 .await;
720 }
721
722 info!("Encrypted into {chunk_count} chunks, requesting quote");
723
724 // Sample up to ESTIMATE_SAMPLE_CAP distinct chunk addresses. A single
725 // AlreadyStored result says nothing about the rest of the file — the
726 // first chunk is often a DataMap-adjacent chunk that collides with
727 // prior uploads even when 99% of the file is new. Only treat the
728 // whole file as "fully stored" when every sample comes back stored.
729 let sample_limit = spill.addresses.len().min(ESTIMATE_SAMPLE_CAP);
730 let mut sampled = 0usize;
731 let mut all_already_stored = true;
732 let mut quotes_opt: Option<Vec<QuoteEntry>> = None;
733
734 for addr in spill.addresses.iter().take(sample_limit) {
735 sampled += 1;
736 let chunk_bytes = spill.read_chunk(addr)?;
737 let data_size = u64::try_from(chunk_bytes.len())
738 .map_err(|e| Error::InvalidData(format!("chunk size too large: {e}")))?;
739 match self
740 .get_store_quotes(addr, data_size, DATA_TYPE_CHUNK)
741 .await
742 {
743 Ok(q) => {
744 quotes_opt = Some(q);
745 all_already_stored = false;
746 break;
747 }
748 Err(Error::AlreadyStored) => {
749 debug!(
750 "Sample chunk {} already stored; trying next address ({sampled}/{sample_limit})",
751 hex::encode(addr)
752 );
753 continue;
754 }
755 Err(e) => return Err(e),
756 }
757 }
758
759 let uses_merkle = should_use_merkle(chunk_count, mode);
760
761 let quotes = match quotes_opt {
762 Some(q) => q,
763 None if all_already_stored && sampled == chunk_count => {
764 // Every address in the file was sampled and every one is
765 // already on the network — returning a zero-cost estimate is
766 // accurate in this case.
767 info!("All {chunk_count} chunks already stored; returning zero-cost estimate");
768 return Ok(UploadCostEstimate {
769 file_size,
770 chunk_count,
771 storage_cost_atto: "0".into(),
772 estimated_gas_cost_wei: "0".into(),
773 payment_mode: if uses_merkle {
774 PaymentMode::Merkle
775 } else {
776 PaymentMode::Single
777 },
778 });
779 }
780 None => {
781 return Err(Error::CostEstimationInconclusive(format!(
782 "sampled {sampled} chunk addresses out of {chunk_count} and every \
783 one reported AlreadyStored; cannot infer a representative price \
784 for the remaining chunks"
785 )));
786 }
787 };
788
789 // Use the median price × 3 (matches SingleNodePayment::from_quotes
790 // which pays 3x the median to incentivize reliable storage).
791 let mut prices: Vec<Amount> = quotes.iter().map(|(_, _, _, price)| *price).collect();
792 prices.sort();
793 let median_price = prices
794 .get(prices.len() / 2)
795 .copied()
796 .unwrap_or(Amount::ZERO);
797 let per_chunk_cost = median_price * Amount::from(3u64);
798
799 let chunk_count_u64 = u64::try_from(chunk_count).unwrap_or(u64::MAX);
800 let total_storage = per_chunk_cost * Amount::from(chunk_count_u64);
801
802 // Estimate gas cost from realistic per-transaction budgets rather
803 // than a flat per-chunk or per-wave number.
804 //
805 // - Single mode: `batch_pay` packs up to UPLOAD_WAVE_SIZE chunks'
806 // close-group quotes into one `pay_for_quotes` call on Arbitrum.
807 // The dominant cost is one SSTORE per entry plus base tx overhead,
808 // so we use GAS_PER_WAVE_TX (≈1.5M) as a conservative upper bound
809 // on a full wave and multiply by the number of waves. The previous
810 // per-wave figure of 150k was closer to a single-entry transfer
811 // and understated cost by 5–10x for full waves.
812 // - Merkle mode: one tx per sub-batch that verifies a merkle tree
813 // and posts a pool commitment (GAS_PER_MERKLE_TX ≈ 500k each).
814 //
815 // Gas is priced at ARBITRUM_GAS_PRICE_WEI (~0.1 gwei, a typical
816 // Arbitrum baseline). Treat the result as advisory, not a commitment.
817 let waves = u128::try_from(chunk_count.div_ceil(UPLOAD_WAVE_SIZE)).unwrap_or(u128::MAX);
818 let merkle_batches = u128::try_from(chunk_count.div_ceil(MAX_LEAVES)).unwrap_or(u128::MAX);
819 let estimated_gas: u128 = if uses_merkle {
820 merkle_batches
821 .saturating_mul(GAS_PER_MERKLE_TX)
822 .saturating_mul(ARBITRUM_GAS_PRICE_WEI)
823 } else {
824 waves
825 .saturating_mul(GAS_PER_WAVE_TX)
826 .saturating_mul(ARBITRUM_GAS_PRICE_WEI)
827 };
828
829 info!(
830 "Estimate: {chunk_count} chunks, storage={total_storage} atto, gas~={estimated_gas} wei"
831 );
832
833 Ok(UploadCostEstimate {
834 file_size,
835 chunk_count,
836 storage_cost_atto: total_storage.to_string(),
837 estimated_gas_cost_wei: estimated_gas.to_string(),
838 payment_mode: if uses_merkle {
839 PaymentMode::Merkle
840 } else {
841 PaymentMode::Single
842 },
843 })
844 }
845
846 /// Phase 1 of external-signer upload: encrypt file and prepare chunks.
847 ///
848 /// Equivalent to [`Client::file_prepare_upload_with_visibility`] with
849 /// [`Visibility::Private`] — see that method for details.
850 pub async fn file_prepare_upload(&self, path: &Path) -> Result<PreparedUpload> {
851 self.file_prepare_upload_with_progress(path, Visibility::Private, None)
852 .await
853 }
854
855 /// Phase 1 of external-signer upload with explicit [`Visibility`] control.
856 ///
857 /// Equivalent to [`Client::file_prepare_upload_with_progress`] with
858 /// `progress: None` — see that method for details.
859 pub async fn file_prepare_upload_with_visibility(
860 &self,
861 path: &Path,
862 visibility: Visibility,
863 ) -> Result<PreparedUpload> {
864 self.file_prepare_upload_with_progress(path, visibility, None)
865 .await
866 }
867
868 /// Phase 1 of external-signer upload with progress events.
869 ///
870 /// Requires an EVM network (for contract price queries) but NOT a wallet.
871 /// Returns a [`PreparedUpload`] containing the data map, prepared chunks,
872 /// and a [`PaymentIntent`] that the external signer uses to construct
873 /// and submit the on-chain payment transaction.
874 ///
875 /// When `visibility` is [`Visibility::Public`], the serialized `DataMap`
876 /// is bundled into the payment batch as an additional chunk and its
877 /// address is recorded on the returned [`PreparedUpload`]. After
878 /// [`Client::finalize_upload`] (or `_merkle`) succeeds, that address is
879 /// surfaced via [`FileUploadResult::data_map_address`] so the uploader
880 /// can share a single address from which anyone can retrieve the file.
881 ///
882 /// When `progress` is `Some`, [`UploadEvent`]s are emitted on the channel
883 /// during encryption ([`UploadEvent::Encrypting`] / [`UploadEvent::Encrypted`])
884 /// and per-chunk quoting ([`UploadEvent::ChunkQuoted`]). Storage events are
885 /// emitted later by [`Client::finalize_upload_with_progress`] /
886 /// [`Client::finalize_upload_merkle_with_progress`].
887 ///
888 /// **Memory note:** Encryption uses disk spilling for bounded memory, but
889 /// the returned [`PreparedUpload`] holds all chunk content in memory (each
890 /// [`PreparedChunk`] contains a `Bytes` with the full chunk data). This is
891 /// inherent to the two-phase external-signer protocol — the chunks must
892 /// stay in memory until [`Client::finalize_upload`] stores them. For very
893 /// large files, prefer [`Client::file_upload`] which streams directly.
894 ///
895 /// # Errors
896 ///
897 /// Returns an error if there is insufficient disk space, the file cannot
898 /// be read, encryption fails, or quote collection fails.
899 pub async fn file_prepare_upload_with_progress(
900 &self,
901 path: &Path,
902 visibility: Visibility,
903 progress: Option<mpsc::Sender<UploadEvent>>,
904 ) -> Result<PreparedUpload> {
905 debug!(
906 "Preparing file upload for external signing (visibility={visibility:?}): {}",
907 path.display()
908 );
909
910 let file_size = std::fs::metadata(path)?.len();
911 check_disk_space_for_spill(file_size)?;
912
913 let (spill, data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
914
915 info!(
916 "Encrypted {} into {} chunks for external signing (spilled to disk)",
917 path.display(),
918 spill.len()
919 );
920
921 // Read each chunk from disk and collect quotes concurrently.
922 // Note: all PreparedChunks accumulate in memory because the external-signer
923 // protocol requires them for finalize_upload. NOT memory-bounded for large files.
924 let mut chunk_data: Vec<Bytes> = spill
925 .addresses
926 .iter()
927 .map(|addr| spill.read_chunk(addr))
928 .collect::<std::result::Result<Vec<_>, _>>()?;
929
930 // For public uploads, bundle the serialized DataMap as an extra chunk
931 // in the same payment batch. This lets the external signer pay for
932 // the data chunks and the DataMap chunk in one flow, and lets the
933 // finalize step return the DataMap's chunk address as the shareable
934 // retrieval address.
935 let data_map_address = match visibility {
936 Visibility::Private => None,
937 Visibility::Public => {
938 let serialized = rmp_serde::to_vec(&data_map).map_err(|e| {
939 Error::Serialization(format!("Failed to serialize DataMap: {e}"))
940 })?;
941 let bytes = Bytes::from(serialized);
942 let address = compute_address(&bytes);
943 info!(
944 "Public upload: bundling DataMap chunk ({} bytes) at address {}",
945 bytes.len(),
946 hex::encode(address)
947 );
948 chunk_data.push(bytes);
949 Some(address)
950 }
951 };
952
953 let chunk_count = chunk_data.len();
954
955 if let Some(ref tx) = progress {
956 let _ = tx
957 .send(UploadEvent::Encrypted {
958 total_chunks: chunk_count,
959 })
960 .await;
961 }
962
963 let payment_info = if should_use_merkle(chunk_count, PaymentMode::Auto) {
964 // Merkle path: build tree, collect candidate pools, return for external payment.
965 info!("Using merkle batch preparation for {chunk_count} file chunks");
966
967 let addresses: Vec<[u8; 32]> = chunk_data.iter().map(|c| compute_address(c)).collect();
968
969 let avg_size =
970 chunk_data.iter().map(bytes::Bytes::len).sum::<usize>() / chunk_count.max(1);
971 let avg_size_u64 = u64::try_from(avg_size).unwrap_or(0);
972
973 let prepared_batch = self
974 .prepare_merkle_batch_external(&addresses, DATA_TYPE_CHUNK, avg_size_u64)
975 .await?;
976
977 info!(
978 "File prepared for external merkle signing: {} chunks, depth={} ({})",
979 chunk_count,
980 prepared_batch.depth,
981 path.display()
982 );
983
984 ExternalPaymentInfo::Merkle {
985 prepared_batch,
986 chunk_contents: chunk_data,
987 chunk_addresses: addresses,
988 }
989 } else {
990 // Wave-batch path: collect quotes per chunk concurrently, emitting
991 // a `ChunkQuoted` event after each completion so callers can drive
992 // a progress bar through the (slow) quoting phase.
993 // Clamp fan-out to chunk_count so a partial wave doesn't
994 // pay for slots it can't fill (see PERF-RESULTS.md).
995 let quote_limiter = self.controller().quote.clone();
996 let quote_concurrency = quote_limiter.current().min(chunk_count.max(1));
997 let mut quote_stream = stream::iter(chunk_data)
998 .map(|content| {
999 let limiter = quote_limiter.clone();
1000 async move {
1001 observe_op(
1002 &limiter,
1003 || async move { self.prepare_chunk_payment(content).await },
1004 classify_error,
1005 )
1006 .await
1007 }
1008 })
1009 .buffer_unordered(quote_concurrency);
1010
1011 let mut prepared_chunks = Vec::with_capacity(spill.len());
1012 let mut quoted = 0usize;
1013 while let Some(result) = quote_stream.next().await {
1014 if let Some(prepared) = result? {
1015 prepared_chunks.push(prepared);
1016 }
1017 quoted += 1;
1018 if let Some(ref tx) = progress {
1019 let _ = tx.try_send(UploadEvent::ChunkQuoted {
1020 quoted,
1021 total: chunk_count,
1022 });
1023 }
1024 }
1025
1026 // Surface the "DataMap chunk was already on the network" case
1027 // so debugging "why is data_map_address set but no storage cost
1028 // appears for it?" doesn't require reading the source. See the
1029 // `data_map_address` doc comment for why this is still a valid
1030 // `Some(addr)` outcome.
1031 if let Some(addr) = data_map_address {
1032 if !prepared_chunks.iter().any(|c| c.address == addr) {
1033 info!(
1034 "Public upload: DataMap chunk {} was already stored \
1035 on the network — address is retrievable without a \
1036 new payment",
1037 hex::encode(addr)
1038 );
1039 }
1040 }
1041
1042 let payment_intent = PaymentIntent::from_prepared_chunks(&prepared_chunks);
1043
1044 info!(
1045 "File prepared for external signing: {} chunks, total {} atto ({})",
1046 prepared_chunks.len(),
1047 payment_intent.total_amount,
1048 path.display()
1049 );
1050
1051 ExternalPaymentInfo::WaveBatch {
1052 prepared_chunks,
1053 payment_intent,
1054 }
1055 };
1056
1057 Ok(PreparedUpload {
1058 data_map,
1059 payment_info,
1060 data_map_address,
1061 })
1062 }
1063
1064 /// Phase 2 of external-signer upload (wave-batch): finalize with externally-signed tx hashes.
1065 ///
1066 /// Takes a [`PreparedUpload`] that used wave-batch payment and a map
1067 /// of `quote_hash -> tx_hash` provided by the external signer after on-chain
1068 /// payment. Builds payment proofs and stores chunks on the network.
1069 ///
1070 /// # Errors
1071 ///
1072 /// Returns an error if the prepared upload used merkle payment (use
1073 /// [`Client::finalize_upload_merkle`] instead), proof construction fails,
1074 /// or any chunk cannot be stored.
1075 pub async fn finalize_upload(
1076 &self,
1077 prepared: PreparedUpload,
1078 tx_hash_map: &HashMap<QuoteHash, TxHash>,
1079 ) -> Result<FileUploadResult> {
1080 self.finalize_upload_with_progress(prepared, tx_hash_map, None)
1081 .await
1082 }
1083
1084 /// Phase 2 of external-signer upload (wave-batch) with progress events.
1085 ///
1086 /// Same as [`Client::finalize_upload`] but emits [`UploadEvent::ChunkStored`]
1087 /// on the provided channel as each chunk is successfully stored.
1088 ///
1089 /// # Errors
1090 ///
1091 /// Same as [`Client::finalize_upload`].
1092 pub async fn finalize_upload_with_progress(
1093 &self,
1094 prepared: PreparedUpload,
1095 tx_hash_map: &HashMap<QuoteHash, TxHash>,
1096 progress: Option<mpsc::Sender<UploadEvent>>,
1097 ) -> Result<FileUploadResult> {
1098 let data_map_address = prepared.data_map_address;
1099 match prepared.payment_info {
1100 ExternalPaymentInfo::WaveBatch {
1101 prepared_chunks,
1102 payment_intent: _,
1103 } => {
1104 let total_chunks = prepared_chunks.len();
1105 let paid_chunks = finalize_batch_payment(prepared_chunks, tx_hash_map)?;
1106 let wave_result = self
1107 .store_paid_chunks_with_events(paid_chunks, progress.as_ref(), 0, total_chunks)
1108 .await;
1109 if !wave_result.failed.is_empty() {
1110 let failed_count = wave_result.failed.len();
1111 let stored_count = wave_result.stored.len();
1112 return Err(Error::PartialUpload {
1113 stored: wave_result.stored.clone(),
1114 stored_count,
1115 failed: wave_result.failed,
1116 failed_count,
1117 total_chunks: stored_count + failed_count,
1118 reason: "finalize_upload: chunk storage failed after retries".into(),
1119 });
1120 }
1121 let chunks_stored = wave_result.stored.len();
1122
1123 info!("External-signer upload finalized: {chunks_stored} chunks stored");
1124
1125 let mut stats = WaveAggregateStats::default();
1126 stats.absorb(&wave_result);
1127
1128 Ok(FileUploadResult {
1129 data_map: prepared.data_map,
1130 chunks_stored,
1131 chunks_failed: 0,
1132 total_chunks: chunks_stored,
1133 payment_mode_used: PaymentMode::Single,
1134 storage_cost_atto: "0".into(),
1135 gas_cost_wei: 0,
1136 data_map_address,
1137 chunk_attempts_total: stats.chunk_attempts_total,
1138 store_durations_ms: stats.store_durations_ms,
1139 retries_histogram: stats.retries_histogram,
1140 })
1141 }
1142 ExternalPaymentInfo::Merkle { .. } => Err(Error::Payment(
1143 "Cannot finalize merkle upload with wave-batch tx hashes. \
1144 Use finalize_upload_merkle() instead."
1145 .to_string(),
1146 )),
1147 }
1148 }
1149
1150 /// Phase 2 of external-signer upload (merkle): finalize with winner pool hash.
1151 ///
1152 /// Takes a [`PreparedUpload`] that used merkle payment and the `winner_pool_hash`
1153 /// returned by the on-chain merkle payment transaction. Generates proofs and
1154 /// stores chunks on the network.
1155 ///
1156 /// # Errors
1157 ///
1158 /// Returns an error if the prepared upload used wave-batch payment (use
1159 /// [`Client::finalize_upload`] instead), proof generation fails,
1160 /// or any chunk cannot be stored.
1161 pub async fn finalize_upload_merkle(
1162 &self,
1163 prepared: PreparedUpload,
1164 winner_pool_hash: [u8; 32],
1165 ) -> Result<FileUploadResult> {
1166 self.finalize_upload_merkle_with_progress(prepared, winner_pool_hash, None)
1167 .await
1168 }
1169
1170 /// Phase 2 of external-signer upload (merkle) with progress events.
1171 ///
1172 /// Same as [`Client::finalize_upload_merkle`] but emits [`UploadEvent::ChunkStored`]
1173 /// on the provided channel as each chunk is successfully stored.
1174 ///
1175 /// # Errors
1176 ///
1177 /// Same as [`Client::finalize_upload_merkle`].
1178 pub async fn finalize_upload_merkle_with_progress(
1179 &self,
1180 prepared: PreparedUpload,
1181 winner_pool_hash: [u8; 32],
1182 progress: Option<mpsc::Sender<UploadEvent>>,
1183 ) -> Result<FileUploadResult> {
1184 let data_map_address = prepared.data_map_address;
1185 match prepared.payment_info {
1186 ExternalPaymentInfo::Merkle {
1187 prepared_batch,
1188 chunk_contents,
1189 chunk_addresses,
1190 } => {
1191 let batch_result = finalize_merkle_batch(prepared_batch, winner_pool_hash)?;
1192 let (chunks_stored, stats) = self
1193 .merkle_upload_chunks(
1194 chunk_contents,
1195 chunk_addresses,
1196 &batch_result,
1197 progress.as_ref(),
1198 )
1199 .await?;
1200
1201 info!("External-signer merkle upload finalized: {chunks_stored} chunks stored");
1202
1203 Ok(FileUploadResult {
1204 data_map: prepared.data_map,
1205 chunks_stored,
1206 chunks_failed: 0,
1207 total_chunks: chunks_stored,
1208 payment_mode_used: PaymentMode::Merkle,
1209 storage_cost_atto: "0".into(),
1210 gas_cost_wei: 0,
1211 data_map_address,
1212 chunk_attempts_total: stats.chunk_attempts_total,
1213 store_durations_ms: stats.store_durations_ms,
1214 retries_histogram: stats.retries_histogram,
1215 })
1216 }
1217 ExternalPaymentInfo::WaveBatch { .. } => Err(Error::Payment(
1218 "Cannot finalize wave-batch upload with merkle winner hash. \
1219 Use finalize_upload() instead."
1220 .to_string(),
1221 )),
1222 }
1223 }
1224
1225 /// Upload a file with a specific payment mode.
1226 ///
1227 /// Before encryption, checks that the temp directory has enough free
1228 /// disk space for the spilled chunks (~1.1× source file size).
1229 ///
1230 /// Encrypted chunks are spilled to a temp directory during encryption
1231 /// so that only their 32-byte addresses stay in memory. At upload time,
1232 /// chunks are read back one wave at a time (~64 × 4 MB ≈ 256 MB peak).
1233 ///
1234 /// # Errors
1235 ///
1236 /// Returns an error if there is insufficient disk space, the file cannot
1237 /// be read, encryption fails, or any chunk cannot be stored.
1238 #[allow(clippy::too_many_lines)]
1239 pub async fn file_upload_with_mode(
1240 &self,
1241 path: &Path,
1242 mode: PaymentMode,
1243 ) -> Result<FileUploadResult> {
1244 self.file_upload_with_progress(path, mode, None).await
1245 }
1246
1247 /// Upload a file with progress events sent to the given channel.
1248 ///
1249 /// Same as [`Client::file_upload_with_mode`] but sends [`UploadEvent`]s to the
1250 /// provided channel for UI progress feedback.
1251 #[allow(clippy::too_many_lines)]
1252 pub async fn file_upload_with_progress(
1253 &self,
1254 path: &Path,
1255 mode: PaymentMode,
1256 progress: Option<mpsc::Sender<UploadEvent>>,
1257 ) -> Result<FileUploadResult> {
1258 debug!(
1259 "Streaming file upload with mode {mode:?}: {}",
1260 path.display()
1261 );
1262
1263 // Pre-flight: verify enough temp disk space for the chunk spill.
1264 let file_size = std::fs::metadata(path)?.len();
1265 check_disk_space_for_spill(file_size)?;
1266
1267 // Phase 1: Encrypt file and spill chunks to temp directory.
1268 // Only 32-byte addresses stay in memory — chunk data lives on disk.
1269 let (spill, data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
1270
1271 let chunk_count = spill.len();
1272 info!(
1273 "Encrypted {} into {chunk_count} chunks (spilled to disk)",
1274 path.display()
1275 );
1276 if let Some(ref tx) = progress {
1277 let _ = tx
1278 .send(UploadEvent::Encrypted {
1279 total_chunks: chunk_count,
1280 })
1281 .await;
1282 }
1283
1284 // Phase 2: Decide payment mode and upload in waves from disk.
1285 //
1286 // For the merkle path, attempt to resume from a cached
1287 // receipt before paying again. The cache is keyed by the
1288 // source file path; a successful upload deletes the cache so
1289 // a subsequent re-upload of the same path will pay anew.
1290 let file_path_key = path.display().to_string();
1291 let (chunks_stored, actual_mode, storage_cost_atto, gas_cost_wei, stats) = if self
1292 .should_use_merkle(chunk_count, mode)
1293 {
1294 info!("Using merkle batch payment for {chunk_count} file chunks");
1295
1296 let batch_result = if let Some((_cache_path, cached)) =
1297 crate::data::client::cached_merkle::try_load_for_file(&file_path_key)
1298 {
1299 // Validate the cache matches this upload. If the
1300 // file was edited between attempts the cached
1301 // proofs would no longer be valid for the new
1302 // chunk addresses; in that case drop the cache
1303 // and pay fresh.
1304 let addresses_match = spill
1305 .addresses
1306 .iter()
1307 .all(|addr| cached.proofs.contains_key(addr));
1308 if addresses_match && cached.proofs.len() == chunk_count {
1309 info!(
1310 "Skipping merkle payment phase; resuming with \
1311 cached proofs ({} chunks)",
1312 cached.proofs.len()
1313 );
1314 Ok(cached)
1315 } else {
1316 info!(
1317 "Cached merkle receipt does not match current file \
1318 content (cached={}, file={chunk_count}). \
1319 Discarding cache and paying fresh.",
1320 cached.proofs.len()
1321 );
1322 crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
1323 self.pay_for_merkle_batch(
1324 &spill.addresses,
1325 DATA_TYPE_CHUNK,
1326 spill.avg_chunk_size(),
1327 )
1328 .await
1329 .inspect(|result| {
1330 crate::data::client::cached_merkle::try_save(&file_path_key, result);
1331 })
1332 }
1333 } else {
1334 self.pay_for_merkle_batch(&spill.addresses, DATA_TYPE_CHUNK, spill.avg_chunk_size())
1335 .await
1336 .inspect(|result| {
1337 // Save BEFORE the store phase so a crash
1338 // mid-upload leaves a resumable receipt.
1339 crate::data::client::cached_merkle::try_save(&file_path_key, result);
1340 })
1341 };
1342
1343 let batch_result = match batch_result {
1344 Ok(result) => result,
1345 Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => {
1346 info!("Merkle needs more peers ({msg}), falling back to wave-batch");
1347 let (stored, sc, gc, fb_stats) =
1348 self.upload_waves_single(&spill, progress.as_ref()).await?;
1349 return Ok(FileUploadResult {
1350 data_map,
1351 chunks_stored: stored,
1352 chunks_failed: 0,
1353 total_chunks: chunk_count,
1354 payment_mode_used: PaymentMode::Single,
1355 storage_cost_atto: sc,
1356 gas_cost_wei: gc,
1357 data_map_address: None,
1358 chunk_attempts_total: fb_stats.chunk_attempts_total,
1359 store_durations_ms: fb_stats.store_durations_ms,
1360 retries_histogram: fb_stats.retries_histogram,
1361 });
1362 }
1363 Err(e) => return Err(e),
1364 };
1365
1366 let (stored, sc, gc, stats) = self
1367 .upload_waves_merkle(&spill, &batch_result, progress.as_ref())
1368 .await?;
1369 // Upload succeeded end-to-end; the cached receipt is
1370 // no longer needed.
1371 crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
1372 (stored, PaymentMode::Merkle, sc, gc, stats)
1373 } else {
1374 let (stored, sc, gc, stats) =
1375 self.upload_waves_single(&spill, progress.as_ref()).await?;
1376 (stored, PaymentMode::Single, sc, gc, stats)
1377 };
1378
1379 info!(
1380 "File uploaded with {actual_mode:?}: {chunks_stored} chunks stored ({})",
1381 path.display()
1382 );
1383
1384 Ok(FileUploadResult {
1385 data_map,
1386 chunks_stored,
1387 chunks_failed: 0,
1388 total_chunks: chunk_count,
1389 payment_mode_used: actual_mode,
1390 storage_cost_atto,
1391 gas_cost_wei,
1392 data_map_address: None,
1393 chunk_attempts_total: stats.chunk_attempts_total,
1394 store_durations_ms: stats.store_durations_ms,
1395 retries_histogram: stats.retries_histogram,
1396 })
1397 }
1398
1399 /// Encrypt a file and spill chunks to a temp directory.
1400 ///
1401 /// Logs progress every 100 chunks so users get feedback during
1402 /// multi-GB encryptions.
1403 ///
1404 /// Returns the spill buffer (addresses on disk) and the `DataMap`.
1405 async fn encrypt_file_to_spill(
1406 &self,
1407 path: &Path,
1408 progress: Option<&mpsc::Sender<UploadEvent>>,
1409 ) -> Result<(ChunkSpill, DataMap)> {
1410 let (mut chunk_rx, datamap_rx, handle) = spawn_file_encryption(path.to_path_buf())?;
1411
1412 let mut spill = ChunkSpill::new()?;
1413 while let Some(content) = chunk_rx.recv().await {
1414 spill.push(&content)?;
1415 let chunks_done = spill.len();
1416 if let Some(tx) = progress {
1417 if chunks_done.is_multiple_of(10) {
1418 let _ = tx.send(UploadEvent::Encrypting { chunks_done }).await;
1419 }
1420 }
1421 if chunks_done % 100 == 0 {
1422 let mb = spill.total_bytes() / (1024 * 1024);
1423 info!(
1424 "Encryption progress: {chunks_done} chunks spilled ({mb} MB) — {}",
1425 path.display()
1426 );
1427 }
1428 }
1429
1430 // Await encryption completion to catch errors before paying.
1431 handle
1432 .await
1433 .map_err(|e| Error::Encryption(format!("encryption task panicked: {e}")))?
1434 .map_err(|e| Error::Encryption(format!("encryption failed: {e}")))?;
1435
1436 let data_map = datamap_rx
1437 .await
1438 .map_err(|_| Error::Encryption("no DataMap from encryption thread".to_string()))?;
1439
1440 Ok((spill, data_map))
1441 }
1442
1443 /// Upload chunks from a spill using wave-based per-chunk (single) payments.
1444 ///
1445 /// Reads one wave at a time from disk, prepares quotes, pays, and stores.
1446 /// Peak memory: ~`UPLOAD_WAVE_SIZE × MAX_CHUNK_SIZE` (~256 MB).
1447 ///
1448 /// Returns `(chunks_stored, storage_cost_atto, gas_cost_wei)`.
1449 async fn upload_waves_single(
1450 &self,
1451 spill: &ChunkSpill,
1452 progress: Option<&mpsc::Sender<UploadEvent>>,
1453 ) -> Result<(usize, String, u128, WaveAggregateStats)> {
1454 let mut total_stored = 0usize;
1455 let mut total_storage = Amount::ZERO;
1456 let mut total_gas: u128 = 0;
1457 let mut agg_stats = WaveAggregateStats::default();
1458 let total_chunks = spill.len();
1459 let waves: Vec<&[[u8; 32]]> = spill.waves().collect();
1460 let wave_count = waves.len();
1461
1462 for (wave_idx, wave_addrs) in waves.into_iter().enumerate() {
1463 let wave_num = wave_idx + 1;
1464 let wave_data: Vec<Bytes> = wave_addrs
1465 .iter()
1466 .map(|addr| spill.read_chunk(addr))
1467 .collect::<Result<Vec<_>>>()?;
1468
1469 info!(
1470 "Wave {wave_num}/{wave_count}: quoting {} chunks — {total_stored}/{total_chunks} stored so far",
1471 wave_data.len()
1472 );
1473 if let Some(tx) = progress {
1474 let _ = tx
1475 .send(UploadEvent::QuotingChunks {
1476 wave: wave_num,
1477 total_waves: wave_count,
1478 chunks_in_wave: wave_data.len(),
1479 })
1480 .await;
1481 }
1482 let (addresses, wave_storage, wave_gas, wave_stats) = self
1483 .batch_upload_chunks_with_events(wave_data, progress, total_stored, total_chunks)
1484 .await?;
1485 total_stored += addresses.len();
1486 if let Ok(cost) = wave_storage.parse::<Amount>() {
1487 total_storage += cost;
1488 }
1489 total_gas = total_gas.saturating_add(wave_gas);
1490 // Merge per-call stats (each call already aggregates across the
1491 // waves it ran internally, so a simple sum/extend is correct).
1492 agg_stats.chunk_attempts_total = agg_stats
1493 .chunk_attempts_total
1494 .saturating_add(wave_stats.chunk_attempts_total);
1495 agg_stats
1496 .store_durations_ms
1497 .extend(wave_stats.store_durations_ms);
1498 for (slot, count) in agg_stats
1499 .retries_histogram
1500 .iter_mut()
1501 .zip(wave_stats.retries_histogram.iter())
1502 {
1503 *slot = slot.saturating_add(*count);
1504 }
1505 if let Some(tx) = progress {
1506 let _ = tx
1507 .send(UploadEvent::WaveComplete {
1508 wave: wave_num,
1509 total_waves: wave_count,
1510 stored_so_far: total_stored,
1511 total: total_chunks,
1512 })
1513 .await;
1514 }
1515 }
1516
1517 Ok((
1518 total_stored,
1519 total_storage.to_string(),
1520 total_gas,
1521 agg_stats,
1522 ))
1523 }
1524
1525 /// Upload chunks from a spill using pre-computed merkle proofs.
1526 ///
1527 /// Reads one wave at a time from disk, pairs each chunk with its proof,
1528 /// and uploads concurrently. Peak memory: ~`UPLOAD_WAVE_SIZE × MAX_CHUNK_SIZE`.
1529 ///
1530 /// Returns `(chunks_stored, storage_cost_atto, gas_cost_wei)`.
1531 /// Costs come from the `batch_result` which was populated during payment.
1532 async fn upload_waves_merkle(
1533 &self,
1534 spill: &ChunkSpill,
1535 batch_result: &MerkleBatchPaymentResult,
1536 progress: Option<&mpsc::Sender<UploadEvent>>,
1537 ) -> Result<(usize, String, u128, WaveAggregateStats)> {
1538 let mut total_stored = 0usize;
1539 let total_chunks = spill.len();
1540 let waves: Vec<&[[u8; 32]]> = spill.waves().collect();
1541 let wave_count = waves.len();
1542 let mut stored_addresses: Vec<[u8; 32]> = Vec::new();
1543 let mut agg_stats = WaveAggregateStats::default();
1544
1545 for (wave_idx, wave_addrs) in waves.into_iter().enumerate() {
1546 let wave_num = wave_idx + 1;
1547 let wave = spill.read_wave(wave_addrs)?;
1548
1549 info!(
1550 "Wave {wave_num}/{wave_count}: storing {} chunks (merkle) — {total_stored}/{total_chunks} stored so far",
1551 wave.len()
1552 );
1553
1554 let store_limiter = self.controller().store.clone();
1555 // Clamp fan-out to wave size — partial last wave should
1556 // not pay for extra slots (see PERF-RESULTS.md).
1557 let store_concurrency = store_limiter.current().min(wave.len().max(1));
1558 let mut upload_stream = stream::iter(wave.into_iter().map(|(content, addr)| {
1559 let proof_bytes = batch_result.proofs.get(&addr).cloned();
1560 let limiter = store_limiter.clone();
1561 async move {
1562 let started = std::time::Instant::now();
1563 let proof = proof_bytes.ok_or_else(|| {
1564 (
1565 addr,
1566 Error::Payment(format!(
1567 "Missing merkle proof for chunk {}",
1568 hex::encode(addr)
1569 )),
1570 started,
1571 )
1572 })?;
1573 let peers = self
1574 .close_group_peers(&addr)
1575 .await
1576 .map_err(|e| (addr, e, started))?;
1577 observe_op(
1578 &limiter,
1579 || async move {
1580 self.chunk_put_to_close_group(content, proof, &peers).await
1581 },
1582 classify_error,
1583 )
1584 .await
1585 .map(|_| (addr, started))
1586 .map_err(|e| (addr, e, started))
1587 }
1588 }))
1589 .buffer_unordered(store_concurrency);
1590
1591 while let Some(result) = upload_stream.next().await {
1592 match result {
1593 Ok((addr, started)) => {
1594 let duration_ms =
1595 u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX);
1596 agg_stats.store_durations_ms.push(duration_ms);
1597 agg_stats.chunk_attempts_total =
1598 agg_stats.chunk_attempts_total.saturating_add(1);
1599 agg_stats.retries_histogram[0] =
1600 agg_stats.retries_histogram[0].saturating_add(1);
1601 stored_addresses.push(addr);
1602 total_stored += 1;
1603 info!("Stored {total_stored}/{total_chunks}");
1604 if let Some(tx) = progress {
1605 let _ = tx
1606 .send(UploadEvent::ChunkStored {
1607 stored: total_stored,
1608 total: total_chunks,
1609 })
1610 .await;
1611 }
1612 }
1613 Err((addr, e, _started)) => {
1614 warn!("merkle upload failed for chunk {}: {e}", hex::encode(addr));
1615 return Err(Error::PartialUpload {
1616 stored: stored_addresses,
1617 stored_count: total_stored,
1618 failed: vec![(addr, e.to_string())],
1619 failed_count: 1,
1620 total_chunks,
1621 reason: format!("merkle chunk upload failed: {e}"),
1622 });
1623 }
1624 }
1625 }
1626
1627 if let Some(tx) = progress {
1628 let _ = tx
1629 .send(UploadEvent::WaveComplete {
1630 wave: wave_num,
1631 total_waves: wave_count,
1632 stored_so_far: total_stored,
1633 total: total_chunks,
1634 })
1635 .await;
1636 }
1637 }
1638
1639 Ok((
1640 total_stored,
1641 batch_result.storage_cost_atto.clone(),
1642 batch_result.gas_cost_wei,
1643 agg_stats,
1644 ))
1645 }
1646
1647 /// Download and decrypt a file from the network, writing it to disk.
1648 ///
1649 /// Uses `streaming_decrypt` so that only one batch of chunks lives in
1650 /// memory at a time, avoiding OOM on large files. Chunks are fetched
1651 /// concurrently within each batch, then decrypted data is written to
1652 /// disk incrementally.
1653 ///
1654 /// Returns the number of bytes written.
1655 ///
1656 /// # Panics
1657 ///
1658 /// Requires a multi-threaded Tokio runtime (`flavor = "multi_thread"`).
1659 /// Will panic if called from a `current_thread` runtime because
1660 /// `streaming_decrypt` takes a synchronous callback that must bridge
1661 /// back to async via `block_in_place`.
1662 ///
1663 /// # Errors
1664 ///
1665 /// Returns an error if any chunk cannot be retrieved, decryption fails,
1666 /// or the file cannot be written.
1667 #[allow(clippy::unused_async)]
1668 pub async fn file_download(&self, data_map: &DataMap, output: &Path) -> Result<u64> {
1669 self.file_download_with_progress(data_map, output, None)
1670 .await
1671 }
1672
1673 /// Download and decrypt a file with progress events.
1674 ///
1675 /// Same as [`Client::file_download`] but sends [`DownloadEvent`]s for UI feedback.
1676 ///
1677 /// Progress reporting:
1678 /// 1. Resolves hierarchical DataMaps to the root level first (reports as
1679 /// `ChunksFetched` with `total: 0` during resolution)
1680 /// 2. Once the root DataMap is known, sends `total_chunks` with accurate count
1681 /// 3. Fetches data chunks with accurate `fetched/total` progress
1682 #[allow(clippy::unused_async)]
1683 pub async fn file_download_with_progress(
1684 &self,
1685 data_map: &DataMap,
1686 output: &Path,
1687 progress: Option<mpsc::Sender<DownloadEvent>>,
1688 ) -> Result<u64> {
1689 debug!("Downloading file to {}", output.display());
1690
1691 let handle = Handle::current();
1692
1693 // Phase 1: Resolve hierarchical DataMap to root level.
1694 // This fetches child DataMap chunks (typically 3) to discover the real chunk count.
1695 let root_map = if data_map.is_child() {
1696 let dm_chunks = data_map.len();
1697 if let Some(ref tx) = progress {
1698 let _ = tx.try_send(DownloadEvent::ResolvingDataMap {
1699 total_map_chunks: dm_chunks,
1700 });
1701 }
1702
1703 let resolve_progress = progress.clone();
1704 let resolve_counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
1705
1706 let resolved = tokio::task::block_in_place(|| {
1707 let counter_ref = resolve_counter.clone();
1708 let progress_ref = resolve_progress.clone();
1709 let fetch_limiter = self.controller().fetch.clone();
1710 let fetch = |batch: &[(usize, XorName)]| {
1711 let batch_owned: Vec<(usize, XorName)> = batch.to_vec();
1712 let counter = counter_ref.clone();
1713 let prog = progress_ref.clone();
1714 let limiter = fetch_limiter.clone();
1715 handle.block_on(async {
1716 // Clamp fan-out to batch size — self_encryption
1717 // requests small batches (default 10), so a
1718 // higher cap from the controller would be slots
1719 // we never fill (see PERF-RESULTS.md).
1720 let cap = limiter.current().min(batch_owned.len().max(1));
1721 let mut stream = futures::stream::iter(batch_owned)
1722 .map(|(idx, hash)| {
1723 let addr = hash.0;
1724 let limiter = limiter.clone();
1725 async move {
1726 let result = observe_op(
1727 &limiter,
1728 || async move { self.chunk_get(&addr).await },
1729 classify_error,
1730 )
1731 .await;
1732 (idx, hash, result)
1733 }
1734 })
1735 .buffer_unordered(cap);
1736 let mut results = Vec::new();
1737 while let Some((idx, hash, result)) =
1738 futures::StreamExt::next(&mut stream).await
1739 {
1740 let chunk = result
1741 .map_err(|e| {
1742 self_encryption::Error::Generic(format!(
1743 "DataMap resolution failed: {e}"
1744 ))
1745 })?
1746 .ok_or_else(|| {
1747 self_encryption::Error::Generic(format!(
1748 "DataMap chunk not found: {}",
1749 hex::encode(hash.0)
1750 ))
1751 })?;
1752 results.push((idx, chunk.content));
1753 let fetched =
1754 counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
1755 if let Some(ref tx) = prog {
1756 let _ = tx.try_send(DownloadEvent::MapChunkFetched { fetched });
1757 }
1758 }
1759 // CRITICAL: self_encryption::get_root_data_map_parallel
1760 // pairs the returned Vec POSITIONALLY with the input
1761 // hashes via .zip() and discards our idx field. We
1762 // used buffer_unordered, so completions arrive in
1763 // arbitrary order. Sort by idx to restore the input
1764 // order before returning, otherwise chunk bytes get
1765 // paired with the wrong hashes and the root data
1766 // map is corrupted.
1767 results.sort_by_key(|(idx, _)| *idx);
1768 Ok(results)
1769 })
1770 };
1771 get_root_data_map_parallel(data_map.clone(), &fetch)
1772 })
1773 .map_err(|e| Error::Encryption(format!("DataMap resolution failed: {e}")))?;
1774
1775 info!(
1776 "Resolved hierarchical DataMap: {} data chunks",
1777 resolved.len()
1778 );
1779 resolved
1780 } else {
1781 data_map.clone()
1782 };
1783
1784 // Phase 2: Now we know the real chunk count.
1785 let total_chunks = root_map.len();
1786 if let Some(ref tx) = progress {
1787 let _ = tx.try_send(DownloadEvent::DataMapResolved { total_chunks });
1788 }
1789
1790 // Phase 3: Fetch and decrypt data chunks with accurate progress.
1791 let fetched_counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
1792 let fetched_for_closure = fetched_counter.clone();
1793 let progress_for_closure = progress.clone();
1794
1795 let fetch_limiter_outer = self.controller().fetch.clone();
1796 let stream = streaming_decrypt(&root_map, |batch: &[(usize, XorName)]| {
1797 let batch_owned: Vec<(usize, XorName)> = batch.to_vec();
1798 let fetched_ref = fetched_for_closure.clone();
1799 let progress_ref = progress_for_closure.clone();
1800 let fetch_limiter = fetch_limiter_outer.clone();
1801
1802 tokio::task::block_in_place(|| {
1803 handle.block_on(async {
1804 // Clamp fan-out to batch size — see PERF-RESULTS.md.
1805 let cap = fetch_limiter.current().min(batch_owned.len().max(1));
1806 let mut stream = futures::stream::iter(batch_owned)
1807 .map(|(idx, hash)| {
1808 let addr = hash.0;
1809 let limiter = fetch_limiter.clone();
1810 async move {
1811 let result = observe_op(
1812 &limiter,
1813 || async move { self.chunk_get(&addr).await },
1814 classify_error,
1815 )
1816 .await;
1817 (idx, hash, result)
1818 }
1819 })
1820 .buffer_unordered(cap);
1821
1822 let mut results = Vec::new();
1823 while let Some((idx, hash, result)) =
1824 futures::StreamExt::next(&mut stream).await
1825 {
1826 let addr_hex = hex::encode(hash.0);
1827 let chunk = result
1828 .map_err(|e| {
1829 self_encryption::Error::Generic(format!(
1830 "Network fetch failed for {addr_hex}: {e}"
1831 ))
1832 })?
1833 .ok_or_else(|| {
1834 self_encryption::Error::Generic(format!(
1835 "Chunk not found: {addr_hex}"
1836 ))
1837 })?;
1838 results.push((idx, chunk.content));
1839 let fetched =
1840 fetched_ref.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
1841 info!("Downloaded {fetched}/{total_chunks}");
1842 if let Some(ref tx) = progress_ref {
1843 let _ = tx.try_send(DownloadEvent::ChunksFetched {
1844 fetched,
1845 total: total_chunks,
1846 });
1847 }
1848 }
1849 // streaming_decrypt itself sort_by_keys before
1850 // zipping, but the same closure is also passed
1851 // through get_root_data_map_parallel internally
1852 // (see self_encryption::stream_decrypt.rs::new), and
1853 // THAT path zips positionally without sorting. Sort
1854 // here so both consumers see input order.
1855 results.sort_by_key(|(idx, _)| *idx);
1856 Ok(results)
1857 })
1858 })
1859 })
1860 .map_err(|e| Error::Encryption(format!("streaming decrypt failed: {e}")))?;
1861
1862 // Write decrypted chunks to a temp file, then rename atomically.
1863 let parent = output.parent().unwrap_or_else(|| Path::new("."));
1864 let unique: u64 = rand::random();
1865 let tmp_path = parent.join(format!(".ant_download_{}_{unique}.tmp", std::process::id()));
1866
1867 let write_result = (|| -> Result<u64> {
1868 let mut file = std::fs::File::create(&tmp_path)?;
1869 let mut bytes_written = 0u64;
1870 for chunk_result in stream {
1871 let chunk_bytes = chunk_result
1872 .map_err(|e| Error::Encryption(format!("decryption failed: {e}")))?;
1873 file.write_all(&chunk_bytes)?;
1874 bytes_written += chunk_bytes.len() as u64;
1875 }
1876 file.flush()?;
1877 Ok(bytes_written)
1878 })();
1879
1880 match write_result {
1881 Ok(bytes_written) => match std::fs::rename(&tmp_path, output) {
1882 Ok(()) => {
1883 info!(
1884 "File downloaded: {bytes_written} bytes written to {}",
1885 output.display()
1886 );
1887 Ok(bytes_written)
1888 }
1889 Err(rename_err) => {
1890 if let Err(cleanup_err) = std::fs::remove_file(&tmp_path) {
1891 warn!(
1892 "Failed to remove temp download file {}: {cleanup_err}",
1893 tmp_path.display()
1894 );
1895 }
1896 Err(rename_err.into())
1897 }
1898 },
1899 Err(e) => {
1900 if let Err(cleanup_err) = std::fs::remove_file(&tmp_path) {
1901 warn!(
1902 "Failed to remove temp download file {}: {cleanup_err}",
1903 tmp_path.display()
1904 );
1905 }
1906 Err(e)
1907 }
1908 }
1909 }
1910}
1911
1912#[cfg(test)]
1913#[allow(clippy::unwrap_used)]
1914mod tests {
1915 use super::*;
1916
1917 #[test]
1918 fn disk_space_check_passes_for_small_file() {
1919 // A 1 KB file should always pass the disk space check
1920 check_disk_space_for_spill(1024).unwrap();
1921 }
1922
1923 #[test]
1924 fn disk_space_check_fails_for_absurd_size() {
1925 // Requesting space for a 1 exabyte file should fail on any real system
1926 let result = check_disk_space_for_spill(u64::MAX / 2);
1927 assert!(result.is_err());
1928 let err = result.unwrap_err();
1929 assert!(
1930 matches!(err, Error::InsufficientDiskSpace(_)),
1931 "expected InsufficientDiskSpace, got: {err}"
1932 );
1933 }
1934
1935 #[test]
1936 fn chunk_spill_round_trip() {
1937 let mut spill = ChunkSpill::new().unwrap();
1938 let data1 = vec![0xAA; 1024];
1939 let data2 = vec![0xBB; 2048];
1940
1941 spill.push(&data1).unwrap();
1942 spill.push(&data2).unwrap();
1943
1944 assert_eq!(spill.len(), 2);
1945 assert_eq!(spill.total_bytes(), 1024 + 2048);
1946 assert_eq!(spill.avg_chunk_size(), (1024 + 2048) / 2);
1947
1948 // Read back and verify
1949 let chunk1 = spill.read_chunk(spill.addresses.first().unwrap()).unwrap();
1950 assert_eq!(&chunk1[..], &data1[..]);
1951
1952 let chunk2 = spill.read_chunk(spill.addresses.get(1).unwrap()).unwrap();
1953 assert_eq!(&chunk2[..], &data2[..]);
1954
1955 // Verify waves with 1-chunk wave size
1956 let waves: Vec<_> = spill.addresses.chunks(1).collect();
1957 assert_eq!(waves.len(), 2);
1958 }
1959
1960 #[test]
1961 fn chunk_spill_cleanup_on_drop() {
1962 let dir;
1963 {
1964 let spill = ChunkSpill::new().unwrap();
1965 dir = spill.dir.clone();
1966 assert!(dir.exists());
1967 }
1968 // After drop, the directory should be cleaned up
1969 assert!(!dir.exists(), "spill dir should be removed on drop");
1970 }
1971
1972 #[test]
1973 fn chunk_spill_deduplicates_identical_content() {
1974 let mut spill = ChunkSpill::new().unwrap();
1975 let data = vec![0xCC; 512];
1976
1977 spill.push(&data).unwrap();
1978 spill.push(&data).unwrap(); // same content, should be skipped
1979 spill.push(&data).unwrap(); // again
1980
1981 assert_eq!(spill.len(), 1, "duplicate chunks should be deduplicated");
1982 assert_eq!(
1983 spill.total_bytes(),
1984 512,
1985 "total_bytes should count unique only"
1986 );
1987
1988 // Different content should still be added
1989 let data2 = vec![0xDD; 256];
1990 spill.push(&data2).unwrap();
1991 assert_eq!(spill.len(), 2);
1992 assert_eq!(spill.total_bytes(), 512 + 256);
1993 }
1994}
1995
1996/// Compile-time assertions that Client file method futures are Send.
1997#[cfg(test)]
1998mod send_assertions {
1999 use super::*;
2000
2001 fn _assert_send<T: Send>(_: &T) {}
2002
2003 #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
2004 async fn _file_upload_is_send(client: &Client) {
2005 let fut = client.file_upload(Path::new("/dev/null"));
2006 _assert_send(&fut);
2007 }
2008
2009 #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
2010 async fn _file_upload_with_mode_is_send(client: &Client) {
2011 let fut = client.file_upload_with_mode(Path::new("/dev/null"), PaymentMode::Auto);
2012 _assert_send(&fut);
2013 }
2014
2015 #[allow(
2016 dead_code,
2017 unreachable_code,
2018 unused_variables,
2019 clippy::diverging_sub_expression
2020 )]
2021 async fn _file_download_is_send(client: &Client) {
2022 let dm: DataMap = todo!();
2023 let fut = client.file_download(&dm, Path::new("/dev/null"));
2024 _assert_send(&fut);
2025 }
2026}