ant_core/data/client/file.rs
1//! File operations using streaming self-encryption.
2//!
3//! Upload files directly from disk without loading them entirely into memory.
4//! Uses `stream_encrypt` to process files in 8KB chunks, encrypting and
5//! uploading each piece as it's produced.
6//!
7//! Encrypted chunks are spilled to a temporary directory during encryption
8//! so that peak memory usage is bounded to one wave (~256 MB for 64 × 4 MB
9//! chunks) regardless of file size.
10//!
11//! For in-memory data uploads, see the `data` module.
12
13use crate::data::client::adaptive::observe_op;
14use crate::data::client::batch::{finalize_batch_payment, PaymentIntent, PreparedChunk};
15use crate::data::client::classify_error;
16use crate::data::client::merkle::{
17 finalize_merkle_batch, should_use_merkle, MerkleBatchPaymentResult, PaymentMode,
18 PreparedMerkleBatch,
19};
20use crate::data::client::Client;
21use crate::data::error::{Error, Result};
22use ant_protocol::evm::{Amount, PaymentQuote, QuoteHash, TxHash, MAX_LEAVES};
23use ant_protocol::transport::{MultiAddr, PeerId};
24use ant_protocol::{compute_address, DATA_TYPE_CHUNK};
25use bytes::Bytes;
26use fs2::FileExt;
27use futures::stream::{self, StreamExt};
28use self_encryption::{get_root_data_map_parallel, stream_encrypt, streaming_decrypt, DataMap};
29use std::collections::{HashMap, HashSet};
30use std::io::Write;
31use std::path::{Path, PathBuf};
32use std::sync::{Arc, Mutex};
33use tokio::runtime::Handle;
34use tokio::sync::mpsc;
35use tracing::{debug, info, warn};
36use xor_name::XorName;
37
38/// Progress events emitted during file upload for UI feedback.
39#[derive(Debug, Clone)]
40pub enum UploadEvent {
41 /// A chunk has been encrypted and spilled to disk.
42 Encrypting { chunks_done: usize },
43 /// File encryption complete.
44 Encrypted { total_chunks: usize },
45 /// Starting quote collection for a wave.
46 QuotingChunks {
47 wave: usize,
48 total_waves: usize,
49 chunks_in_wave: usize,
50 },
51 /// A chunk has been quoted (peer discovery + price received).
52 /// This is the slow phase — each quote involves network round-trips.
53 ChunkQuoted { quoted: usize, total: usize },
54 /// A chunk has been stored on the network.
55 ChunkStored { stored: usize, total: usize },
56 /// A wave has completed.
57 WaveComplete {
58 wave: usize,
59 total_waves: usize,
60 stored_so_far: usize,
61 total: usize,
62 },
63}
64
65/// Progress events emitted during file download for UI feedback.
66#[derive(Debug, Clone)]
67pub enum DownloadEvent {
68 /// Resolving hierarchical DataMap to discover real chunk count.
69 ResolvingDataMap { total_map_chunks: usize },
70 /// A DataMap chunk has been fetched during resolution.
71 MapChunkFetched { fetched: usize },
72 /// DataMap resolved — total data chunk count now known.
73 DataMapResolved { total_chunks: usize },
74 /// Data chunks are being fetched from the network.
75 ChunksFetched { fetched: usize, total: usize },
76}
77
78/// One entry in the per-chunk quote list returned by
79/// [`Client::get_store_quotes`]: the responding peer, its addresses, the
80/// signed quote it returned, and the payment amount it is demanding.
81type QuoteEntry = (PeerId, Vec<MultiAddr>, PaymentQuote, Amount);
82
83/// Number of chunks per upload wave (matches batch.rs PAYMENT_WAVE_SIZE).
84const UPLOAD_WAVE_SIZE: usize = 64;
85
86/// Maximum number of distinct chunk addresses to sample when probing for a
87/// representative quote in [`Client::estimate_upload_cost`].
88///
89/// Bounded small so we never spend more than a couple of round-trips on the
90/// `AlreadyStored` retry path, which only matters when many leading chunks
91/// of a file already live on the network.
92const ESTIMATE_SAMPLE_CAP: usize = 5;
93
94/// Gas used by one `pay_for_quotes` transaction that packs up to
95/// `UPLOAD_WAVE_SIZE` (quote_hash, rewards_address, amount) entries.
96///
97/// `batch_pay` in `batch.rs` flattens every chunk's close-group quotes into a
98/// single EVM call, so the dominant cost is the SSTOREs for each entry plus
99/// the base tx overhead. On Arbitrum that is roughly
100/// `21_000 + 64 × (20_000 + small)` ≈ 1.3M; we round up to 1.5M as a
101/// conservative per-wave upper bound.
102const GAS_PER_WAVE_TX: u128 = 1_500_000;
103
104/// Gas used by one merkle batch payment transaction.
105///
106/// One on-chain tx per merkle sub-batch, but each tx verifies a merkle tree
107/// and posts a pool commitment, so budget higher than a plain transfer.
108const GAS_PER_MERKLE_TX: u128 = 500_000;
109
110/// Advisory gas price (wei/gas) used to turn the gas estimate into an ETH
111/// figure when no live gas oracle is consulted.
112///
113/// Arbitrum One typically settles around 0.1 gwei on quiet blocks; we use
114/// that as the default so the CLI prints a sensible order-of-magnitude
115/// number. Users should treat the reported gas cost as an estimate, not a
116/// commitment — real gas is bid at submission time.
117const ARBITRUM_GAS_PRICE_WEI: u128 = 100_000_000;
118
119/// Extra headroom percentage for disk space check.
120///
121/// Encrypted chunks are slightly larger than the source data due to padding
122/// and self-encryption overhead. We require file_size + 10% free space in
123/// the temp directory to account for this.
124const DISK_SPACE_HEADROOM_PERCENT: u64 = 10;
125
126/// Temporary on-disk buffer for encrypted chunks.
127///
128/// During file encryption, chunks are written to a temp directory so that
129/// only their 32-byte addresses stay in memory. At upload time chunks are
130/// read back one wave at a time, keeping peak RAM at ~`UPLOAD_WAVE_SIZE × 4 MB`.
131/// Maximum age (in seconds) for orphaned spill directories.
132/// Dirs older than this are cleaned up if they have no active lockfile.
133const SPILL_MAX_AGE_SECS: u64 = 24 * 60 * 60; // 24 hours
134
135/// Prefix for spill directory names to distinguish from user files.
136const SPILL_DIR_PREFIX: &str = "spill_";
137
138/// Lockfile name inside each spill dir to signal active use.
139const SPILL_LOCK_NAME: &str = ".lock";
140
141struct ChunkSpill {
142 /// Directory holding spilled chunk files (named by hex address).
143 dir: PathBuf,
144 /// Lockfile held for the lifetime of this spill (prevents stale cleanup).
145 _lock: std::fs::File,
146 /// Deduplicated list of chunk addresses.
147 addresses: Vec<[u8; 32]>,
148 /// Tracks seen addresses for deduplication.
149 seen: HashSet<[u8; 32]>,
150 /// Running total of unique chunk byte sizes (for average-size calculation).
151 total_bytes: u64,
152}
153
154impl ChunkSpill {
155 /// Return the parent directory for all spill dirs: `<data_dir>/spill/`.
156 fn spill_root() -> Result<PathBuf> {
157 use crate::config;
158 let root = config::data_dir()
159 .map_err(|e| Error::Config(format!("cannot determine data dir for spill: {e}")))?
160 .join("spill");
161 Ok(root)
162 }
163
164 /// Create a new spill directory under `<data_dir>/spill/`.
165 ///
166 /// Directory name is `spill_<timestamp>_<random>` so orphans can be
167 /// identified by prefix and cleaned up by age. A lockfile inside the
168 /// dir prevents concurrent cleanup from deleting an active spill.
169 fn new() -> Result<Self> {
170 let root = Self::spill_root()?;
171 std::fs::create_dir_all(&root)?;
172
173 // Clean up stale spill dirs from previous crashed runs.
174 Self::cleanup_stale(&root);
175
176 let now = std::time::SystemTime::now()
177 .duration_since(std::time::UNIX_EPOCH)
178 .unwrap_or_default()
179 .as_secs();
180 let unique: u64 = rand::random();
181 let dir = root.join(format!("{SPILL_DIR_PREFIX}{now}_{unique}"));
182 std::fs::create_dir(&dir)?;
183
184 // Create and hold a lockfile for the lifetime of this spill.
185 // cleanup_stale() will skip dirs with locked files.
186 let lock_path = dir.join(SPILL_LOCK_NAME);
187 let lock_file = std::fs::File::create(&lock_path).map_err(|e| {
188 Error::Io(std::io::Error::new(
189 e.kind(),
190 format!("failed to create spill lockfile: {e}"),
191 ))
192 })?;
193 lock_file.try_lock_exclusive().map_err(|e| {
194 Error::Io(std::io::Error::new(
195 e.kind(),
196 format!("failed to lock spill lockfile: {e}"),
197 ))
198 })?;
199
200 Ok(Self {
201 dir,
202 _lock: lock_file,
203 addresses: Vec::new(),
204 seen: HashSet::new(),
205 total_bytes: 0,
206 })
207 }
208
209 /// Clean up stale spill directories. Best-effort, errors are logged.
210 ///
211 /// Only removes directories that:
212 /// 1. Start with `SPILL_DIR_PREFIX` (ignores unrelated files)
213 /// 2. Are actual directories (not symlinks -- prevents symlink attacks)
214 /// 3. Have a timestamp older than `SPILL_MAX_AGE_SECS`
215 /// 4. Do NOT have an active lockfile (prevents deleting in-progress uploads)
216 ///
217 /// Safe to call concurrently from multiple processes.
218 fn cleanup_stale(root: &Path) {
219 let now = std::time::SystemTime::now()
220 .duration_since(std::time::UNIX_EPOCH)
221 .unwrap_or_default()
222 .as_secs();
223
224 if now == 0 {
225 // Clock is broken (before Unix epoch). Skip cleanup to avoid
226 // misidentifying dirs as stale.
227 warn!("System clock before Unix epoch, skipping spill cleanup");
228 return;
229 }
230
231 let entries = match std::fs::read_dir(root) {
232 Ok(entries) => entries,
233 Err(_) => return,
234 };
235
236 for entry in entries.flatten() {
237 let name = entry.file_name();
238 let name_str = name.to_string_lossy();
239
240 // Only process dirs with our prefix.
241 let suffix = match name_str.strip_prefix(SPILL_DIR_PREFIX) {
242 Some(s) => s,
243 None => continue,
244 };
245
246 // Parse timestamp: "spill_<timestamp>_<random>"
247 let timestamp: u64 = match suffix.split('_').next().and_then(|s| s.parse().ok()) {
248 Some(ts) => ts,
249 None => continue,
250 };
251
252 if now.saturating_sub(timestamp) <= SPILL_MAX_AGE_SECS {
253 continue;
254 }
255
256 // Safety: only delete actual directories, not symlinks.
257 let file_type = match entry.file_type() {
258 Ok(ft) => ft,
259 Err(_) => continue,
260 };
261 if !file_type.is_dir() {
262 continue;
263 }
264
265 let path = entry.path();
266
267 // Check lockfile: if locked, the dir is in active use -- skip it.
268 let lock_path = path.join(SPILL_LOCK_NAME);
269 if let Ok(lock_file) = std::fs::File::open(&lock_path) {
270 use fs2::FileExt;
271 if lock_file.try_lock_exclusive().is_err() {
272 // Lock held by another process -- dir is active.
273 debug!("Skipping active spill dir: {}", path.display());
274 continue;
275 }
276 // We acquired the lock, so no one else holds it.
277 // Drop it before deleting.
278 drop(lock_file);
279 }
280
281 info!("Cleaning up stale spill dir: {}", path.display());
282 if let Err(e) = std::fs::remove_dir_all(&path) {
283 warn!("Failed to clean up stale spill dir {}: {e}", path.display());
284 }
285 }
286 }
287
288 /// Run stale spill cleanup. Call at client startup or periodically.
289 #[allow(dead_code)]
290 pub(crate) fn run_cleanup() {
291 if let Ok(root) = Self::spill_root() {
292 Self::cleanup_stale(&root);
293 }
294 }
295
296 /// Write one encrypted chunk to disk and record its address.
297 ///
298 /// Deduplicates by content address: if the same chunk was already
299 /// spilled, the write and accounting are skipped. This prevents
300 /// double-uploads and inflated quoting metrics.
301 fn push(&mut self, content: &[u8]) -> Result<()> {
302 let address = compute_address(content);
303 if !self.seen.insert(address) {
304 return Ok(());
305 }
306 let path = self.dir.join(hex::encode(address));
307 std::fs::write(&path, content)?;
308 self.total_bytes += content.len() as u64;
309 self.addresses.push(address);
310 Ok(())
311 }
312
313 /// Number of chunks stored.
314 fn len(&self) -> usize {
315 self.addresses.len()
316 }
317
318 /// Total bytes of all spilled chunks.
319 fn total_bytes(&self) -> u64 {
320 self.total_bytes
321 }
322
323 /// Average chunk size in bytes (for quoting metrics).
324 fn avg_chunk_size(&self) -> u64 {
325 if self.addresses.is_empty() {
326 return 0;
327 }
328 self.total_bytes / self.addresses.len() as u64
329 }
330
331 /// Read a single chunk back from disk by address.
332 fn read_chunk(&self, address: &[u8; 32]) -> Result<Bytes> {
333 let path = self.dir.join(hex::encode(address));
334 let data = std::fs::read(&path).map_err(|e| {
335 Error::Io(std::io::Error::new(
336 e.kind(),
337 format!("reading spilled chunk {}: {e}", hex::encode(address)),
338 ))
339 })?;
340 Ok(Bytes::from(data))
341 }
342
343 /// Iterate over address slices in wave-sized groups.
344 fn waves(&self) -> std::slice::Chunks<'_, [u8; 32]> {
345 self.addresses.chunks(UPLOAD_WAVE_SIZE)
346 }
347
348 /// Read a wave of chunks from disk.
349 fn read_wave(&self, wave_addrs: &[[u8; 32]]) -> Result<Vec<(Bytes, [u8; 32])>> {
350 let mut out = Vec::with_capacity(wave_addrs.len());
351 for addr in wave_addrs {
352 let content = self.read_chunk(addr)?;
353 out.push((content, *addr));
354 }
355 Ok(out)
356 }
357
358 /// Clean up the spill directory.
359 fn cleanup(&self) {
360 if let Err(e) = std::fs::remove_dir_all(&self.dir) {
361 warn!(
362 "Failed to clean up chunk spill dir {}: {e}",
363 self.dir.display()
364 );
365 }
366 }
367}
368
369impl Drop for ChunkSpill {
370 fn drop(&mut self) {
371 self.cleanup();
372 }
373}
374
375/// Check that the spill directory has enough free space for the spilled chunks.
376///
377/// `file_size` is the source file's byte count. We require
378/// `file_size + 10%` free space to account for self-encryption overhead.
379fn check_disk_space_for_spill(file_size: u64) -> Result<()> {
380 let spill_root = ChunkSpill::spill_root()?;
381
382 // Ensure the root exists so fs2 can query it.
383 std::fs::create_dir_all(&spill_root)?;
384
385 let available = fs2::available_space(&spill_root).map_err(|e| {
386 Error::Io(std::io::Error::new(
387 e.kind(),
388 format!(
389 "failed to query disk space on {}: {e}",
390 spill_root.display()
391 ),
392 ))
393 })?;
394
395 // Use integer arithmetic to avoid f64 precision loss on large file sizes.
396 let headroom = file_size / DISK_SPACE_HEADROOM_PERCENT;
397 let required = file_size.saturating_add(headroom);
398
399 if available < required {
400 let avail_mb = available / (1024 * 1024);
401 let req_mb = required / (1024 * 1024);
402 return Err(Error::InsufficientDiskSpace(format!(
403 "need ~{req_mb} MB in spill dir ({}) but only {avail_mb} MB available",
404 spill_root.display()
405 )));
406 }
407
408 debug!(
409 "Disk space check passed: {available} bytes available, {required} bytes required (spill: {})",
410 spill_root.display()
411 );
412 Ok(())
413}
414
415/// Whether the data map is published to the network for address-based retrieval.
416///
417/// A private upload stores only the data chunks and returns the `DataMap` to
418/// the caller — only someone holding that `DataMap` can reconstruct the file.
419/// A public upload additionally stores the serialized `DataMap` as a chunk on
420/// the network, yielding a single chunk address that anyone can use to
421/// retrieve the `DataMap` (via [`Client::data_map_fetch`]) and then the file.
422#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
423pub enum Visibility {
424 /// Keep the data map local; only the holder can retrieve the file.
425 #[default]
426 Private,
427 /// Publish the data map as a network chunk so anyone with the returned
428 /// address can retrieve and decrypt the file.
429 Public,
430}
431
432/// Estimated cost of uploading a file, returned by
433/// [`Client::estimate_upload_cost`].
434#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
435pub struct UploadCostEstimate {
436 /// Original file size in bytes.
437 pub file_size: u64,
438 /// Number of chunks the file would be split into (data chunks only,
439 /// does not include the DataMap chunk added during public uploads).
440 pub chunk_count: usize,
441 /// Estimated total storage cost in atto (token smallest unit).
442 pub storage_cost_atto: String,
443 /// Estimated gas cost in wei as a string. This is a rough heuristic
444 /// based on chunk count and payment mode, NOT a live gas price query.
445 pub estimated_gas_cost_wei: String,
446 /// Payment mode that would be used.
447 pub payment_mode: PaymentMode,
448}
449
450/// Result of a file upload: the `DataMap` needed to retrieve the file.
451///
452/// Marked `#[non_exhaustive]` so adding a new field in future is not a
453/// breaking change for downstream consumers that construct or pattern-match
454/// on this struct.
455#[derive(Debug, Clone)]
456#[non_exhaustive]
457pub struct FileUploadResult {
458 /// The data map containing chunk metadata for reconstruction.
459 pub data_map: DataMap,
460 /// Number of chunks stored on the network.
461 pub chunks_stored: usize,
462 /// Number of chunks that failed to store. Always 0 for a successful
463 /// upload — partial-failure information is conveyed via
464 /// [`crate::data::Error::PartialUpload`] instead.
465 pub chunks_failed: usize,
466 /// Total number of chunks the upload attempted to store. On full
467 /// success this equals `chunks_stored`.
468 pub total_chunks: usize,
469 /// Which payment mode was actually used (not just requested).
470 pub payment_mode_used: PaymentMode,
471 /// Total storage cost paid in token units (atto). "0" if all chunks already existed.
472 pub storage_cost_atto: String,
473 /// Total gas cost in wei. 0 if no on-chain transactions were made.
474 pub gas_cost_wei: u128,
475 /// Chunk address of the serialized `DataMap`, set only for
476 /// [`Visibility::Public`] uploads. **`Some` means this address is
477 /// retrievable from the network (via [`Client::data_map_fetch`])**, not
478 /// necessarily that *this* upload paid to store it — if the serialized
479 /// `DataMap` hashed to a chunk that was already on the network (same
480 /// file uploaded before; deterministic via self-encryption), the address
481 /// is still returned but no storage payment was made for it.
482 pub data_map_address: Option<[u8; 32]>,
483}
484
485/// Payment information for external signing — either wave-batch or merkle.
486#[derive(Debug)]
487pub enum ExternalPaymentInfo {
488 /// Wave-batch: individual (quote_hash, rewards_address, amount) tuples.
489 WaveBatch {
490 /// Chunks ready for payment (needed for finalize).
491 prepared_chunks: Vec<PreparedChunk>,
492 /// Payment intent for external signing.
493 payment_intent: PaymentIntent,
494 },
495 /// Merkle: single on-chain call with depth, pool commitments, timestamp.
496 Merkle {
497 /// The prepared merkle batch (public fields sent to frontend, private fields stay in Rust).
498 prepared_batch: PreparedMerkleBatch,
499 /// Raw chunk contents (needed for upload after payment).
500 chunk_contents: Vec<Bytes>,
501 /// Chunk addresses in order (needed for upload after payment).
502 chunk_addresses: Vec<[u8; 32]>,
503 },
504}
505
506/// Prepared upload ready for external payment.
507///
508/// Contains everything needed to construct the on-chain payment transaction
509/// externally (e.g. via WalletConnect in a desktop app) and then finalize
510/// the upload without a Rust-side wallet.
511///
512/// Note: This struct stays in Rust memory — only the public fields of
513/// `payment_info` are sent to the frontend. `PreparedChunk` contains
514/// non-serializable network types, so the full struct cannot derive `Serialize`.
515///
516/// Marked `#[non_exhaustive]` so adding a new field in future is not a
517/// breaking change for downstream consumers.
518#[derive(Debug)]
519#[non_exhaustive]
520pub struct PreparedUpload {
521 /// The data map for later retrieval.
522 pub data_map: DataMap,
523 /// Payment information — either wave-batch or merkle depending on chunk count.
524 pub payment_info: ExternalPaymentInfo,
525 /// Chunk address of the serialized `DataMap` when this upload was
526 /// prepared with [`Visibility::Public`]. `Some` means the address is
527 /// retrievable on the network after finalization — either because this
528 /// upload paid to store the chunk in `payment_info`, or because the
529 /// chunk was already on the network (deterministic self-encryption).
530 /// Carried through to [`FileUploadResult::data_map_address`].
531 pub data_map_address: Option<[u8; 32]>,
532}
533
534/// Return type for [`spawn_file_encryption`]: chunk receiver, `DataMap` oneshot, join handle.
535type EncryptionChannels = (
536 tokio::sync::mpsc::Receiver<Bytes>,
537 tokio::sync::oneshot::Receiver<DataMap>,
538 tokio::task::JoinHandle<Result<()>>,
539);
540
541/// Spawn a blocking task that streams file encryption through a channel.
542fn spawn_file_encryption(path: PathBuf) -> Result<EncryptionChannels> {
543 let metadata = std::fs::metadata(&path)?;
544 let data_size = usize::try_from(metadata.len())
545 .map_err(|e| Error::Encryption(format!("file size exceeds platform usize: {e}")))?;
546
547 let (chunk_tx, chunk_rx) = tokio::sync::mpsc::channel(2);
548 let (datamap_tx, datamap_rx) = tokio::sync::oneshot::channel();
549
550 let handle = tokio::task::spawn_blocking(move || {
551 let file = std::fs::File::open(&path)?;
552 let mut reader = std::io::BufReader::new(file);
553
554 let read_error: Arc<Mutex<Option<std::io::Error>>> = Arc::new(Mutex::new(None));
555 let read_error_clone = Arc::clone(&read_error);
556
557 let data_iter = std::iter::from_fn(move || {
558 let mut buffer = vec![0u8; 8192];
559 match std::io::Read::read(&mut reader, &mut buffer) {
560 Ok(0) => None,
561 Ok(n) => {
562 buffer.truncate(n);
563 Some(Bytes::from(buffer))
564 }
565 Err(e) => {
566 let mut guard = read_error_clone
567 .lock()
568 .unwrap_or_else(|poisoned| poisoned.into_inner());
569 *guard = Some(e);
570 None
571 }
572 }
573 });
574
575 let mut stream = stream_encrypt(data_size, data_iter)
576 .map_err(|e| Error::Encryption(format!("stream_encrypt failed: {e}")))?;
577
578 for chunk_result in stream.chunks() {
579 // Check for captured read errors immediately after each chunk.
580 // stream_encrypt sees None (EOF) when a read fails, so it stops
581 // producing chunks. We must detect this before sending the
582 // partial results to avoid uploading a truncated DataMap.
583 {
584 let guard = read_error
585 .lock()
586 .unwrap_or_else(|poisoned| poisoned.into_inner());
587 if let Some(ref e) = *guard {
588 return Err(Error::Io(std::io::Error::new(e.kind(), e.to_string())));
589 }
590 }
591
592 let (_hash, content) = chunk_result
593 .map_err(|e| Error::Encryption(format!("chunk encryption failed: {e}")))?;
594 if chunk_tx.blocking_send(content).is_err() {
595 return Err(Error::Encryption("upload receiver dropped".to_string()));
596 }
597 }
598
599 // Final check: read error after last chunk (stream saw EOF).
600 {
601 let guard = read_error
602 .lock()
603 .unwrap_or_else(|poisoned| poisoned.into_inner());
604 if let Some(ref e) = *guard {
605 return Err(Error::Io(std::io::Error::new(e.kind(), e.to_string())));
606 }
607 }
608
609 let datamap = stream
610 .into_datamap()
611 .ok_or_else(|| Error::Encryption("no DataMap after encryption".to_string()))?;
612 if datamap_tx.send(datamap).is_err() {
613 warn!("DataMap receiver dropped — upload may have been cancelled");
614 }
615 Ok(())
616 });
617
618 Ok((chunk_rx, datamap_rx, handle))
619}
620
621impl Client {
622 /// Upload a file to the network using streaming self-encryption.
623 ///
624 /// Automatically selects merkle batch payment for files that produce
625 /// 64+ chunks (saves gas). Encrypted chunks are spilled to a temp
626 /// directory so peak memory stays at ~256 MB regardless of file size.
627 ///
628 /// # Errors
629 ///
630 /// Returns an error if the file cannot be read, encryption fails,
631 /// or any chunk cannot be stored.
632 pub async fn file_upload(&self, path: &Path) -> Result<FileUploadResult> {
633 self.file_upload_with_mode(path, PaymentMode::Auto).await
634 }
635
636 /// Estimate the cost of uploading a file without actually uploading.
637 ///
638 /// Encrypts the file to determine chunk count and sizes, then requests
639 /// a single quote from the network for a representative chunk. The
640 /// per-chunk price is extrapolated to the total chunk count.
641 ///
642 /// The estimate is fast (~2-5s) and does not require a wallet. Spilled
643 /// chunks are cleaned up automatically when the function returns.
644 ///
645 /// Gas cost is an advisory heuristic, not a live gas-oracle query. It is
646 /// derived from realistic per-transaction budgets (`GAS_PER_WAVE_TX`,
647 /// `GAS_PER_MERKLE_TX`) priced at `ARBITRUM_GAS_PRICE_WEI`. Real gas
648 /// varies with network conditions.
649 ///
650 /// If the first sampled chunk is already stored on the network, the
651 /// function retries with subsequent chunk addresses (up to
652 /// `ESTIMATE_SAMPLE_CAP`). If every sampled address reports stored,
653 /// a [`Error::CostEstimationInconclusive`] is returned so callers can
654 /// decide how to react rather than trust a bogus "free" estimate. Only
655 /// when every address in the file is stored do we return a zero-cost
656 /// estimate.
657 ///
658 /// # Errors
659 ///
660 /// Returns an error if the file cannot be read, encryption fails,
661 /// the network cannot provide a quote, or every sampled chunk is
662 /// already stored ([`Error::CostEstimationInconclusive`]).
663 pub async fn estimate_upload_cost(
664 &self,
665 path: &Path,
666 mode: PaymentMode,
667 progress: Option<mpsc::Sender<UploadEvent>>,
668 ) -> Result<UploadCostEstimate> {
669 let file_size = std::fs::metadata(path).map_err(Error::Io)?.len();
670
671 if file_size < 3 {
672 return Err(Error::InvalidData(
673 "File too small: self-encryption requires at least 3 bytes".into(),
674 ));
675 }
676
677 check_disk_space_for_spill(file_size)?;
678
679 info!(
680 "Estimating upload cost for {} ({file_size} bytes)",
681 path.display()
682 );
683
684 let (spill, _data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
685 let chunk_count = spill.len();
686
687 if let Some(ref tx) = progress {
688 let _ = tx
689 .send(UploadEvent::Encrypted {
690 total_chunks: chunk_count,
691 })
692 .await;
693 }
694
695 info!("Encrypted into {chunk_count} chunks, requesting quote");
696
697 // Sample up to ESTIMATE_SAMPLE_CAP distinct chunk addresses. A single
698 // AlreadyStored result says nothing about the rest of the file — the
699 // first chunk is often a DataMap-adjacent chunk that collides with
700 // prior uploads even when 99% of the file is new. Only treat the
701 // whole file as "fully stored" when every sample comes back stored.
702 let sample_limit = spill.addresses.len().min(ESTIMATE_SAMPLE_CAP);
703 let mut sampled = 0usize;
704 let mut all_already_stored = true;
705 let mut quotes_opt: Option<Vec<QuoteEntry>> = None;
706
707 for addr in spill.addresses.iter().take(sample_limit) {
708 sampled += 1;
709 let chunk_bytes = spill.read_chunk(addr)?;
710 let data_size = u64::try_from(chunk_bytes.len())
711 .map_err(|e| Error::InvalidData(format!("chunk size too large: {e}")))?;
712 match self
713 .get_store_quotes(addr, data_size, DATA_TYPE_CHUNK)
714 .await
715 {
716 Ok(q) => {
717 quotes_opt = Some(q);
718 all_already_stored = false;
719 break;
720 }
721 Err(Error::AlreadyStored) => {
722 debug!(
723 "Sample chunk {} already stored; trying next address ({sampled}/{sample_limit})",
724 hex::encode(addr)
725 );
726 continue;
727 }
728 Err(e) => return Err(e),
729 }
730 }
731
732 let uses_merkle = should_use_merkle(chunk_count, mode);
733
734 let quotes = match quotes_opt {
735 Some(q) => q,
736 None if all_already_stored && sampled == chunk_count => {
737 // Every address in the file was sampled and every one is
738 // already on the network — returning a zero-cost estimate is
739 // accurate in this case.
740 info!("All {chunk_count} chunks already stored; returning zero-cost estimate");
741 return Ok(UploadCostEstimate {
742 file_size,
743 chunk_count,
744 storage_cost_atto: "0".into(),
745 estimated_gas_cost_wei: "0".into(),
746 payment_mode: if uses_merkle {
747 PaymentMode::Merkle
748 } else {
749 PaymentMode::Single
750 },
751 });
752 }
753 None => {
754 return Err(Error::CostEstimationInconclusive(format!(
755 "sampled {sampled} chunk addresses out of {chunk_count} and every \
756 one reported AlreadyStored; cannot infer a representative price \
757 for the remaining chunks"
758 )));
759 }
760 };
761
762 // Use the median price × 3 (matches SingleNodePayment::from_quotes
763 // which pays 3x the median to incentivize reliable storage).
764 let mut prices: Vec<Amount> = quotes.iter().map(|(_, _, _, price)| *price).collect();
765 prices.sort();
766 let median_price = prices
767 .get(prices.len() / 2)
768 .copied()
769 .unwrap_or(Amount::ZERO);
770 let per_chunk_cost = median_price * Amount::from(3u64);
771
772 let chunk_count_u64 = u64::try_from(chunk_count).unwrap_or(u64::MAX);
773 let total_storage = per_chunk_cost * Amount::from(chunk_count_u64);
774
775 // Estimate gas cost from realistic per-transaction budgets rather
776 // than a flat per-chunk or per-wave number.
777 //
778 // - Single mode: `batch_pay` packs up to UPLOAD_WAVE_SIZE chunks'
779 // close-group quotes into one `pay_for_quotes` call on Arbitrum.
780 // The dominant cost is one SSTORE per entry plus base tx overhead,
781 // so we use GAS_PER_WAVE_TX (≈1.5M) as a conservative upper bound
782 // on a full wave and multiply by the number of waves. The previous
783 // per-wave figure of 150k was closer to a single-entry transfer
784 // and understated cost by 5–10x for full waves.
785 // - Merkle mode: one tx per sub-batch that verifies a merkle tree
786 // and posts a pool commitment (GAS_PER_MERKLE_TX ≈ 500k each).
787 //
788 // Gas is priced at ARBITRUM_GAS_PRICE_WEI (~0.1 gwei, a typical
789 // Arbitrum baseline). Treat the result as advisory, not a commitment.
790 let waves = u128::try_from(chunk_count.div_ceil(UPLOAD_WAVE_SIZE)).unwrap_or(u128::MAX);
791 let merkle_batches = u128::try_from(chunk_count.div_ceil(MAX_LEAVES)).unwrap_or(u128::MAX);
792 let estimated_gas: u128 = if uses_merkle {
793 merkle_batches
794 .saturating_mul(GAS_PER_MERKLE_TX)
795 .saturating_mul(ARBITRUM_GAS_PRICE_WEI)
796 } else {
797 waves
798 .saturating_mul(GAS_PER_WAVE_TX)
799 .saturating_mul(ARBITRUM_GAS_PRICE_WEI)
800 };
801
802 info!(
803 "Estimate: {chunk_count} chunks, storage={total_storage} atto, gas~={estimated_gas} wei"
804 );
805
806 Ok(UploadCostEstimate {
807 file_size,
808 chunk_count,
809 storage_cost_atto: total_storage.to_string(),
810 estimated_gas_cost_wei: estimated_gas.to_string(),
811 payment_mode: if uses_merkle {
812 PaymentMode::Merkle
813 } else {
814 PaymentMode::Single
815 },
816 })
817 }
818
819 /// Phase 1 of external-signer upload: encrypt file and prepare chunks.
820 ///
821 /// Equivalent to [`Client::file_prepare_upload_with_visibility`] with
822 /// [`Visibility::Private`] — see that method for details.
823 pub async fn file_prepare_upload(&self, path: &Path) -> Result<PreparedUpload> {
824 self.file_prepare_upload_with_progress(path, Visibility::Private, None)
825 .await
826 }
827
828 /// Phase 1 of external-signer upload with explicit [`Visibility`] control.
829 ///
830 /// Equivalent to [`Client::file_prepare_upload_with_progress`] with
831 /// `progress: None` — see that method for details.
832 pub async fn file_prepare_upload_with_visibility(
833 &self,
834 path: &Path,
835 visibility: Visibility,
836 ) -> Result<PreparedUpload> {
837 self.file_prepare_upload_with_progress(path, visibility, None)
838 .await
839 }
840
841 /// Phase 1 of external-signer upload with progress events.
842 ///
843 /// Requires an EVM network (for contract price queries) but NOT a wallet.
844 /// Returns a [`PreparedUpload`] containing the data map, prepared chunks,
845 /// and a [`PaymentIntent`] that the external signer uses to construct
846 /// and submit the on-chain payment transaction.
847 ///
848 /// When `visibility` is [`Visibility::Public`], the serialized `DataMap`
849 /// is bundled into the payment batch as an additional chunk and its
850 /// address is recorded on the returned [`PreparedUpload`]. After
851 /// [`Client::finalize_upload`] (or `_merkle`) succeeds, that address is
852 /// surfaced via [`FileUploadResult::data_map_address`] so the uploader
853 /// can share a single address from which anyone can retrieve the file.
854 ///
855 /// When `progress` is `Some`, [`UploadEvent`]s are emitted on the channel
856 /// during encryption ([`UploadEvent::Encrypting`] / [`UploadEvent::Encrypted`])
857 /// and per-chunk quoting ([`UploadEvent::ChunkQuoted`]). Storage events are
858 /// emitted later by [`Client::finalize_upload_with_progress`] /
859 /// [`Client::finalize_upload_merkle_with_progress`].
860 ///
861 /// **Memory note:** Encryption uses disk spilling for bounded memory, but
862 /// the returned [`PreparedUpload`] holds all chunk content in memory (each
863 /// [`PreparedChunk`] contains a `Bytes` with the full chunk data). This is
864 /// inherent to the two-phase external-signer protocol — the chunks must
865 /// stay in memory until [`Client::finalize_upload`] stores them. For very
866 /// large files, prefer [`Client::file_upload`] which streams directly.
867 ///
868 /// # Errors
869 ///
870 /// Returns an error if there is insufficient disk space, the file cannot
871 /// be read, encryption fails, or quote collection fails.
872 pub async fn file_prepare_upload_with_progress(
873 &self,
874 path: &Path,
875 visibility: Visibility,
876 progress: Option<mpsc::Sender<UploadEvent>>,
877 ) -> Result<PreparedUpload> {
878 debug!(
879 "Preparing file upload for external signing (visibility={visibility:?}): {}",
880 path.display()
881 );
882
883 let file_size = std::fs::metadata(path)?.len();
884 check_disk_space_for_spill(file_size)?;
885
886 let (spill, data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
887
888 info!(
889 "Encrypted {} into {} chunks for external signing (spilled to disk)",
890 path.display(),
891 spill.len()
892 );
893
894 // Read each chunk from disk and collect quotes concurrently.
895 // Note: all PreparedChunks accumulate in memory because the external-signer
896 // protocol requires them for finalize_upload. NOT memory-bounded for large files.
897 let mut chunk_data: Vec<Bytes> = spill
898 .addresses
899 .iter()
900 .map(|addr| spill.read_chunk(addr))
901 .collect::<std::result::Result<Vec<_>, _>>()?;
902
903 // For public uploads, bundle the serialized DataMap as an extra chunk
904 // in the same payment batch. This lets the external signer pay for
905 // the data chunks and the DataMap chunk in one flow, and lets the
906 // finalize step return the DataMap's chunk address as the shareable
907 // retrieval address.
908 let data_map_address = match visibility {
909 Visibility::Private => None,
910 Visibility::Public => {
911 let serialized = rmp_serde::to_vec(&data_map).map_err(|e| {
912 Error::Serialization(format!("Failed to serialize DataMap: {e}"))
913 })?;
914 let bytes = Bytes::from(serialized);
915 let address = compute_address(&bytes);
916 info!(
917 "Public upload: bundling DataMap chunk ({} bytes) at address {}",
918 bytes.len(),
919 hex::encode(address)
920 );
921 chunk_data.push(bytes);
922 Some(address)
923 }
924 };
925
926 let chunk_count = chunk_data.len();
927
928 if let Some(ref tx) = progress {
929 let _ = tx
930 .send(UploadEvent::Encrypted {
931 total_chunks: chunk_count,
932 })
933 .await;
934 }
935
936 let payment_info = if should_use_merkle(chunk_count, PaymentMode::Auto) {
937 // Merkle path: build tree, collect candidate pools, return for external payment.
938 info!("Using merkle batch preparation for {chunk_count} file chunks");
939
940 let addresses: Vec<[u8; 32]> = chunk_data.iter().map(|c| compute_address(c)).collect();
941
942 let avg_size =
943 chunk_data.iter().map(bytes::Bytes::len).sum::<usize>() / chunk_count.max(1);
944 let avg_size_u64 = u64::try_from(avg_size).unwrap_or(0);
945
946 let prepared_batch = self
947 .prepare_merkle_batch_external(&addresses, DATA_TYPE_CHUNK, avg_size_u64)
948 .await?;
949
950 info!(
951 "File prepared for external merkle signing: {} chunks, depth={} ({})",
952 chunk_count,
953 prepared_batch.depth,
954 path.display()
955 );
956
957 ExternalPaymentInfo::Merkle {
958 prepared_batch,
959 chunk_contents: chunk_data,
960 chunk_addresses: addresses,
961 }
962 } else {
963 // Wave-batch path: collect quotes per chunk concurrently, emitting
964 // a `ChunkQuoted` event after each completion so callers can drive
965 // a progress bar through the (slow) quoting phase.
966 // Clamp fan-out to chunk_count so a partial wave doesn't
967 // pay for slots it can't fill (see PERF-RESULTS.md).
968 let quote_limiter = self.controller().quote.clone();
969 let quote_concurrency = quote_limiter.current().min(chunk_count.max(1));
970 let mut quote_stream = stream::iter(chunk_data)
971 .map(|content| {
972 let limiter = quote_limiter.clone();
973 async move {
974 observe_op(
975 &limiter,
976 || async move { self.prepare_chunk_payment(content).await },
977 classify_error,
978 )
979 .await
980 }
981 })
982 .buffer_unordered(quote_concurrency);
983
984 let mut prepared_chunks = Vec::with_capacity(spill.len());
985 let mut quoted = 0usize;
986 while let Some(result) = quote_stream.next().await {
987 if let Some(prepared) = result? {
988 prepared_chunks.push(prepared);
989 }
990 quoted += 1;
991 if let Some(ref tx) = progress {
992 let _ = tx.try_send(UploadEvent::ChunkQuoted {
993 quoted,
994 total: chunk_count,
995 });
996 }
997 }
998
999 // Surface the "DataMap chunk was already on the network" case
1000 // so debugging "why is data_map_address set but no storage cost
1001 // appears for it?" doesn't require reading the source. See the
1002 // `data_map_address` doc comment for why this is still a valid
1003 // `Some(addr)` outcome.
1004 if let Some(addr) = data_map_address {
1005 if !prepared_chunks.iter().any(|c| c.address == addr) {
1006 info!(
1007 "Public upload: DataMap chunk {} was already stored \
1008 on the network — address is retrievable without a \
1009 new payment",
1010 hex::encode(addr)
1011 );
1012 }
1013 }
1014
1015 let payment_intent = PaymentIntent::from_prepared_chunks(&prepared_chunks);
1016
1017 info!(
1018 "File prepared for external signing: {} chunks, total {} atto ({})",
1019 prepared_chunks.len(),
1020 payment_intent.total_amount,
1021 path.display()
1022 );
1023
1024 ExternalPaymentInfo::WaveBatch {
1025 prepared_chunks,
1026 payment_intent,
1027 }
1028 };
1029
1030 Ok(PreparedUpload {
1031 data_map,
1032 payment_info,
1033 data_map_address,
1034 })
1035 }
1036
1037 /// Phase 2 of external-signer upload (wave-batch): finalize with externally-signed tx hashes.
1038 ///
1039 /// Takes a [`PreparedUpload`] that used wave-batch payment and a map
1040 /// of `quote_hash -> tx_hash` provided by the external signer after on-chain
1041 /// payment. Builds payment proofs and stores chunks on the network.
1042 ///
1043 /// # Errors
1044 ///
1045 /// Returns an error if the prepared upload used merkle payment (use
1046 /// [`Client::finalize_upload_merkle`] instead), proof construction fails,
1047 /// or any chunk cannot be stored.
1048 pub async fn finalize_upload(
1049 &self,
1050 prepared: PreparedUpload,
1051 tx_hash_map: &HashMap<QuoteHash, TxHash>,
1052 ) -> Result<FileUploadResult> {
1053 self.finalize_upload_with_progress(prepared, tx_hash_map, None)
1054 .await
1055 }
1056
1057 /// Phase 2 of external-signer upload (wave-batch) with progress events.
1058 ///
1059 /// Same as [`Client::finalize_upload`] but emits [`UploadEvent::ChunkStored`]
1060 /// on the provided channel as each chunk is successfully stored.
1061 ///
1062 /// # Errors
1063 ///
1064 /// Same as [`Client::finalize_upload`].
1065 pub async fn finalize_upload_with_progress(
1066 &self,
1067 prepared: PreparedUpload,
1068 tx_hash_map: &HashMap<QuoteHash, TxHash>,
1069 progress: Option<mpsc::Sender<UploadEvent>>,
1070 ) -> Result<FileUploadResult> {
1071 let data_map_address = prepared.data_map_address;
1072 match prepared.payment_info {
1073 ExternalPaymentInfo::WaveBatch {
1074 prepared_chunks,
1075 payment_intent: _,
1076 } => {
1077 let total_chunks = prepared_chunks.len();
1078 let paid_chunks = finalize_batch_payment(prepared_chunks, tx_hash_map)?;
1079 let wave_result = self
1080 .store_paid_chunks_with_events(paid_chunks, progress.as_ref(), 0, total_chunks)
1081 .await;
1082 if !wave_result.failed.is_empty() {
1083 let failed_count = wave_result.failed.len();
1084 let stored_count = wave_result.stored.len();
1085 return Err(Error::PartialUpload {
1086 stored: wave_result.stored.clone(),
1087 stored_count,
1088 failed: wave_result.failed,
1089 failed_count,
1090 total_chunks: stored_count + failed_count,
1091 reason: "finalize_upload: chunk storage failed after retries".into(),
1092 });
1093 }
1094 let chunks_stored = wave_result.stored.len();
1095
1096 info!("External-signer upload finalized: {chunks_stored} chunks stored");
1097
1098 Ok(FileUploadResult {
1099 data_map: prepared.data_map,
1100 chunks_stored,
1101 chunks_failed: 0,
1102 total_chunks: chunks_stored,
1103 payment_mode_used: PaymentMode::Single,
1104 storage_cost_atto: "0".into(),
1105 gas_cost_wei: 0,
1106 data_map_address,
1107 })
1108 }
1109 ExternalPaymentInfo::Merkle { .. } => Err(Error::Payment(
1110 "Cannot finalize merkle upload with wave-batch tx hashes. \
1111 Use finalize_upload_merkle() instead."
1112 .to_string(),
1113 )),
1114 }
1115 }
1116
1117 /// Phase 2 of external-signer upload (merkle): finalize with winner pool hash.
1118 ///
1119 /// Takes a [`PreparedUpload`] that used merkle payment and the `winner_pool_hash`
1120 /// returned by the on-chain merkle payment transaction. Generates proofs and
1121 /// stores chunks on the network.
1122 ///
1123 /// # Errors
1124 ///
1125 /// Returns an error if the prepared upload used wave-batch payment (use
1126 /// [`Client::finalize_upload`] instead), proof generation fails,
1127 /// or any chunk cannot be stored.
1128 pub async fn finalize_upload_merkle(
1129 &self,
1130 prepared: PreparedUpload,
1131 winner_pool_hash: [u8; 32],
1132 ) -> Result<FileUploadResult> {
1133 self.finalize_upload_merkle_with_progress(prepared, winner_pool_hash, None)
1134 .await
1135 }
1136
1137 /// Phase 2 of external-signer upload (merkle) with progress events.
1138 ///
1139 /// Same as [`Client::finalize_upload_merkle`] but emits [`UploadEvent::ChunkStored`]
1140 /// on the provided channel as each chunk is successfully stored.
1141 ///
1142 /// # Errors
1143 ///
1144 /// Same as [`Client::finalize_upload_merkle`].
1145 pub async fn finalize_upload_merkle_with_progress(
1146 &self,
1147 prepared: PreparedUpload,
1148 winner_pool_hash: [u8; 32],
1149 progress: Option<mpsc::Sender<UploadEvent>>,
1150 ) -> Result<FileUploadResult> {
1151 let data_map_address = prepared.data_map_address;
1152 match prepared.payment_info {
1153 ExternalPaymentInfo::Merkle {
1154 prepared_batch,
1155 chunk_contents,
1156 chunk_addresses,
1157 } => {
1158 let batch_result = finalize_merkle_batch(prepared_batch, winner_pool_hash)?;
1159 let chunks_stored = self
1160 .merkle_upload_chunks(
1161 chunk_contents,
1162 chunk_addresses,
1163 &batch_result,
1164 progress.as_ref(),
1165 )
1166 .await?;
1167
1168 info!("External-signer merkle upload finalized: {chunks_stored} chunks stored");
1169
1170 Ok(FileUploadResult {
1171 data_map: prepared.data_map,
1172 chunks_stored,
1173 chunks_failed: 0,
1174 total_chunks: chunks_stored,
1175 payment_mode_used: PaymentMode::Merkle,
1176 storage_cost_atto: "0".into(),
1177 gas_cost_wei: 0,
1178 data_map_address,
1179 })
1180 }
1181 ExternalPaymentInfo::WaveBatch { .. } => Err(Error::Payment(
1182 "Cannot finalize wave-batch upload with merkle winner hash. \
1183 Use finalize_upload() instead."
1184 .to_string(),
1185 )),
1186 }
1187 }
1188
1189 /// Upload a file with a specific payment mode.
1190 ///
1191 /// Before encryption, checks that the temp directory has enough free
1192 /// disk space for the spilled chunks (~1.1× source file size).
1193 ///
1194 /// Encrypted chunks are spilled to a temp directory during encryption
1195 /// so that only their 32-byte addresses stay in memory. At upload time,
1196 /// chunks are read back one wave at a time (~64 × 4 MB ≈ 256 MB peak).
1197 ///
1198 /// # Errors
1199 ///
1200 /// Returns an error if there is insufficient disk space, the file cannot
1201 /// be read, encryption fails, or any chunk cannot be stored.
1202 #[allow(clippy::too_many_lines)]
1203 pub async fn file_upload_with_mode(
1204 &self,
1205 path: &Path,
1206 mode: PaymentMode,
1207 ) -> Result<FileUploadResult> {
1208 self.file_upload_with_progress(path, mode, None).await
1209 }
1210
1211 /// Upload a file with progress events sent to the given channel.
1212 ///
1213 /// Same as [`Client::file_upload_with_mode`] but sends [`UploadEvent`]s to the
1214 /// provided channel for UI progress feedback.
1215 #[allow(clippy::too_many_lines)]
1216 pub async fn file_upload_with_progress(
1217 &self,
1218 path: &Path,
1219 mode: PaymentMode,
1220 progress: Option<mpsc::Sender<UploadEvent>>,
1221 ) -> Result<FileUploadResult> {
1222 debug!(
1223 "Streaming file upload with mode {mode:?}: {}",
1224 path.display()
1225 );
1226
1227 // Pre-flight: verify enough temp disk space for the chunk spill.
1228 let file_size = std::fs::metadata(path)?.len();
1229 check_disk_space_for_spill(file_size)?;
1230
1231 // Phase 1: Encrypt file and spill chunks to temp directory.
1232 // Only 32-byte addresses stay in memory — chunk data lives on disk.
1233 let (spill, data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
1234
1235 let chunk_count = spill.len();
1236 info!(
1237 "Encrypted {} into {chunk_count} chunks (spilled to disk)",
1238 path.display()
1239 );
1240 if let Some(ref tx) = progress {
1241 let _ = tx
1242 .send(UploadEvent::Encrypted {
1243 total_chunks: chunk_count,
1244 })
1245 .await;
1246 }
1247
1248 // Phase 2: Decide payment mode and upload in waves from disk.
1249 let (chunks_stored, actual_mode, storage_cost_atto, gas_cost_wei) =
1250 if self.should_use_merkle(chunk_count, mode) {
1251 info!("Using merkle batch payment for {chunk_count} file chunks");
1252
1253 let batch_result = match self
1254 .pay_for_merkle_batch(&spill.addresses, DATA_TYPE_CHUNK, spill.avg_chunk_size())
1255 .await
1256 {
1257 Ok(result) => result,
1258 Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => {
1259 info!("Merkle needs more peers ({msg}), falling back to wave-batch");
1260 let (stored, sc, gc) =
1261 self.upload_waves_single(&spill, progress.as_ref()).await?;
1262 return Ok(FileUploadResult {
1263 data_map,
1264 chunks_stored: stored,
1265 chunks_failed: 0,
1266 total_chunks: chunk_count,
1267 payment_mode_used: PaymentMode::Single,
1268 storage_cost_atto: sc,
1269 gas_cost_wei: gc,
1270 data_map_address: None,
1271 });
1272 }
1273 Err(e) => return Err(e),
1274 };
1275
1276 let (stored, sc, gc) = self
1277 .upload_waves_merkle(&spill, &batch_result, progress.as_ref())
1278 .await?;
1279 (stored, PaymentMode::Merkle, sc, gc)
1280 } else {
1281 let (stored, sc, gc) = self.upload_waves_single(&spill, progress.as_ref()).await?;
1282 (stored, PaymentMode::Single, sc, gc)
1283 };
1284
1285 info!(
1286 "File uploaded with {actual_mode:?}: {chunks_stored} chunks stored ({})",
1287 path.display()
1288 );
1289
1290 Ok(FileUploadResult {
1291 data_map,
1292 chunks_stored,
1293 chunks_failed: 0,
1294 total_chunks: chunk_count,
1295 payment_mode_used: actual_mode,
1296 storage_cost_atto,
1297 gas_cost_wei,
1298 data_map_address: None,
1299 })
1300 }
1301
1302 /// Encrypt a file and spill chunks to a temp directory.
1303 ///
1304 /// Logs progress every 100 chunks so users get feedback during
1305 /// multi-GB encryptions.
1306 ///
1307 /// Returns the spill buffer (addresses on disk) and the `DataMap`.
1308 async fn encrypt_file_to_spill(
1309 &self,
1310 path: &Path,
1311 progress: Option<&mpsc::Sender<UploadEvent>>,
1312 ) -> Result<(ChunkSpill, DataMap)> {
1313 let (mut chunk_rx, datamap_rx, handle) = spawn_file_encryption(path.to_path_buf())?;
1314
1315 let mut spill = ChunkSpill::new()?;
1316 while let Some(content) = chunk_rx.recv().await {
1317 spill.push(&content)?;
1318 let chunks_done = spill.len();
1319 if let Some(tx) = progress {
1320 if chunks_done.is_multiple_of(10) {
1321 let _ = tx.send(UploadEvent::Encrypting { chunks_done }).await;
1322 }
1323 }
1324 if chunks_done % 100 == 0 {
1325 let mb = spill.total_bytes() / (1024 * 1024);
1326 info!(
1327 "Encryption progress: {chunks_done} chunks spilled ({mb} MB) — {}",
1328 path.display()
1329 );
1330 }
1331 }
1332
1333 // Await encryption completion to catch errors before paying.
1334 handle
1335 .await
1336 .map_err(|e| Error::Encryption(format!("encryption task panicked: {e}")))?
1337 .map_err(|e| Error::Encryption(format!("encryption failed: {e}")))?;
1338
1339 let data_map = datamap_rx
1340 .await
1341 .map_err(|_| Error::Encryption("no DataMap from encryption thread".to_string()))?;
1342
1343 Ok((spill, data_map))
1344 }
1345
1346 /// Upload chunks from a spill using wave-based per-chunk (single) payments.
1347 ///
1348 /// Reads one wave at a time from disk, prepares quotes, pays, and stores.
1349 /// Peak memory: ~`UPLOAD_WAVE_SIZE × MAX_CHUNK_SIZE` (~256 MB).
1350 ///
1351 /// Returns `(chunks_stored, storage_cost_atto, gas_cost_wei)`.
1352 async fn upload_waves_single(
1353 &self,
1354 spill: &ChunkSpill,
1355 progress: Option<&mpsc::Sender<UploadEvent>>,
1356 ) -> Result<(usize, String, u128)> {
1357 let mut total_stored = 0usize;
1358 let mut total_storage = Amount::ZERO;
1359 let mut total_gas: u128 = 0;
1360 let total_chunks = spill.len();
1361 let waves: Vec<&[[u8; 32]]> = spill.waves().collect();
1362 let wave_count = waves.len();
1363
1364 for (wave_idx, wave_addrs) in waves.into_iter().enumerate() {
1365 let wave_num = wave_idx + 1;
1366 let wave_data: Vec<Bytes> = wave_addrs
1367 .iter()
1368 .map(|addr| spill.read_chunk(addr))
1369 .collect::<Result<Vec<_>>>()?;
1370
1371 info!(
1372 "Wave {wave_num}/{wave_count}: quoting {} chunks — {total_stored}/{total_chunks} stored so far",
1373 wave_data.len()
1374 );
1375 if let Some(tx) = progress {
1376 let _ = tx
1377 .send(UploadEvent::QuotingChunks {
1378 wave: wave_num,
1379 total_waves: wave_count,
1380 chunks_in_wave: wave_data.len(),
1381 })
1382 .await;
1383 }
1384 let (addresses, wave_storage, wave_gas) = self
1385 .batch_upload_chunks_with_events(wave_data, progress, total_stored, total_chunks)
1386 .await?;
1387 total_stored += addresses.len();
1388 if let Ok(cost) = wave_storage.parse::<Amount>() {
1389 total_storage += cost;
1390 }
1391 total_gas = total_gas.saturating_add(wave_gas);
1392 if let Some(tx) = progress {
1393 let _ = tx
1394 .send(UploadEvent::WaveComplete {
1395 wave: wave_num,
1396 total_waves: wave_count,
1397 stored_so_far: total_stored,
1398 total: total_chunks,
1399 })
1400 .await;
1401 }
1402 }
1403
1404 Ok((total_stored, total_storage.to_string(), total_gas))
1405 }
1406
1407 /// Upload chunks from a spill using pre-computed merkle proofs.
1408 ///
1409 /// Reads one wave at a time from disk, pairs each chunk with its proof,
1410 /// and uploads concurrently. Peak memory: ~`UPLOAD_WAVE_SIZE × MAX_CHUNK_SIZE`.
1411 ///
1412 /// Returns `(chunks_stored, storage_cost_atto, gas_cost_wei)`.
1413 /// Costs come from the `batch_result` which was populated during payment.
1414 async fn upload_waves_merkle(
1415 &self,
1416 spill: &ChunkSpill,
1417 batch_result: &MerkleBatchPaymentResult,
1418 progress: Option<&mpsc::Sender<UploadEvent>>,
1419 ) -> Result<(usize, String, u128)> {
1420 let mut total_stored = 0usize;
1421 let total_chunks = spill.len();
1422 let waves: Vec<&[[u8; 32]]> = spill.waves().collect();
1423 let wave_count = waves.len();
1424 let mut stored_addresses: Vec<[u8; 32]> = Vec::new();
1425
1426 for (wave_idx, wave_addrs) in waves.into_iter().enumerate() {
1427 let wave_num = wave_idx + 1;
1428 let wave = spill.read_wave(wave_addrs)?;
1429
1430 info!(
1431 "Wave {wave_num}/{wave_count}: storing {} chunks (merkle) — {total_stored}/{total_chunks} stored so far",
1432 wave.len()
1433 );
1434
1435 let store_limiter = self.controller().store.clone();
1436 // Clamp fan-out to wave size — partial last wave should
1437 // not pay for extra slots (see PERF-RESULTS.md).
1438 let store_concurrency = store_limiter.current().min(wave.len().max(1));
1439 let mut upload_stream = stream::iter(wave.into_iter().map(|(content, addr)| {
1440 let proof_bytes = batch_result.proofs.get(&addr).cloned();
1441 let limiter = store_limiter.clone();
1442 async move {
1443 let proof = proof_bytes.ok_or_else(|| {
1444 (
1445 addr,
1446 Error::Payment(format!(
1447 "Missing merkle proof for chunk {}",
1448 hex::encode(addr)
1449 )),
1450 )
1451 })?;
1452 let peers = self.close_group_peers(&addr).await.map_err(|e| (addr, e))?;
1453 observe_op(
1454 &limiter,
1455 || async move {
1456 self.chunk_put_to_close_group(content, proof, &peers).await
1457 },
1458 classify_error,
1459 )
1460 .await
1461 .map(|_| addr)
1462 .map_err(|e| (addr, e))
1463 }
1464 }))
1465 .buffer_unordered(store_concurrency);
1466
1467 while let Some(result) = upload_stream.next().await {
1468 match result {
1469 Ok(addr) => {
1470 stored_addresses.push(addr);
1471 total_stored += 1;
1472 info!("Stored {total_stored}/{total_chunks}");
1473 if let Some(tx) = progress {
1474 let _ = tx
1475 .send(UploadEvent::ChunkStored {
1476 stored: total_stored,
1477 total: total_chunks,
1478 })
1479 .await;
1480 }
1481 }
1482 Err((addr, e)) => {
1483 warn!("merkle upload failed for chunk {}: {e}", hex::encode(addr));
1484 return Err(Error::PartialUpload {
1485 stored: stored_addresses,
1486 stored_count: total_stored,
1487 failed: vec![(addr, e.to_string())],
1488 failed_count: 1,
1489 total_chunks,
1490 reason: format!("merkle chunk upload failed: {e}"),
1491 });
1492 }
1493 }
1494 }
1495
1496 if let Some(tx) = progress {
1497 let _ = tx
1498 .send(UploadEvent::WaveComplete {
1499 wave: wave_num,
1500 total_waves: wave_count,
1501 stored_so_far: total_stored,
1502 total: total_chunks,
1503 })
1504 .await;
1505 }
1506 }
1507
1508 Ok((
1509 total_stored,
1510 batch_result.storage_cost_atto.clone(),
1511 batch_result.gas_cost_wei,
1512 ))
1513 }
1514
1515 /// Download and decrypt a file from the network, writing it to disk.
1516 ///
1517 /// Uses `streaming_decrypt` so that only one batch of chunks lives in
1518 /// memory at a time, avoiding OOM on large files. Chunks are fetched
1519 /// concurrently within each batch, then decrypted data is written to
1520 /// disk incrementally.
1521 ///
1522 /// Returns the number of bytes written.
1523 ///
1524 /// # Panics
1525 ///
1526 /// Requires a multi-threaded Tokio runtime (`flavor = "multi_thread"`).
1527 /// Will panic if called from a `current_thread` runtime because
1528 /// `streaming_decrypt` takes a synchronous callback that must bridge
1529 /// back to async via `block_in_place`.
1530 ///
1531 /// # Errors
1532 ///
1533 /// Returns an error if any chunk cannot be retrieved, decryption fails,
1534 /// or the file cannot be written.
1535 #[allow(clippy::unused_async)]
1536 pub async fn file_download(&self, data_map: &DataMap, output: &Path) -> Result<u64> {
1537 self.file_download_with_progress(data_map, output, None)
1538 .await
1539 }
1540
1541 /// Download and decrypt a file with progress events.
1542 ///
1543 /// Same as [`Client::file_download`] but sends [`DownloadEvent`]s for UI feedback.
1544 ///
1545 /// Progress reporting:
1546 /// 1. Resolves hierarchical DataMaps to the root level first (reports as
1547 /// `ChunksFetched` with `total: 0` during resolution)
1548 /// 2. Once the root DataMap is known, sends `total_chunks` with accurate count
1549 /// 3. Fetches data chunks with accurate `fetched/total` progress
1550 #[allow(clippy::unused_async)]
1551 pub async fn file_download_with_progress(
1552 &self,
1553 data_map: &DataMap,
1554 output: &Path,
1555 progress: Option<mpsc::Sender<DownloadEvent>>,
1556 ) -> Result<u64> {
1557 debug!("Downloading file to {}", output.display());
1558
1559 let handle = Handle::current();
1560
1561 // Phase 1: Resolve hierarchical DataMap to root level.
1562 // This fetches child DataMap chunks (typically 3) to discover the real chunk count.
1563 let root_map = if data_map.is_child() {
1564 let dm_chunks = data_map.len();
1565 if let Some(ref tx) = progress {
1566 let _ = tx.try_send(DownloadEvent::ResolvingDataMap {
1567 total_map_chunks: dm_chunks,
1568 });
1569 }
1570
1571 let resolve_progress = progress.clone();
1572 let resolve_counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
1573
1574 let resolved = tokio::task::block_in_place(|| {
1575 let counter_ref = resolve_counter.clone();
1576 let progress_ref = resolve_progress.clone();
1577 let fetch_limiter = self.controller().fetch.clone();
1578 let fetch = |batch: &[(usize, XorName)]| {
1579 let batch_owned: Vec<(usize, XorName)> = batch.to_vec();
1580 let counter = counter_ref.clone();
1581 let prog = progress_ref.clone();
1582 let limiter = fetch_limiter.clone();
1583 handle.block_on(async {
1584 // Build ONE peer pool for this batch via a
1585 // single DHT walk; reuse for every chunk.
1586 let addrs: Vec<[u8; 32]> = batch_owned.iter().map(|(_, h)| h.0).collect();
1587 let pool = self.build_peer_pool_for(&addrs).await.map_err(|e| {
1588 self_encryption::Error::Generic(format!(
1589 "DataMap resolution: peer pool build failed: {e}"
1590 ))
1591 })?;
1592 let pool_ref = &pool;
1593 // Clamp fan-out to batch size — self_encryption
1594 // requests small batches (default 10), so a
1595 // higher cap from the controller would be slots
1596 // we never fill (see PERF-RESULTS.md).
1597 let cap = limiter.current().min(batch_owned.len().max(1));
1598 let mut stream = futures::stream::iter(batch_owned)
1599 .map(|(idx, hash)| {
1600 let addr = hash.0;
1601 let limiter = limiter.clone();
1602 async move {
1603 let result = observe_op(
1604 &limiter,
1605 || async move {
1606 self.chunk_get_with_pool(&addr, pool_ref).await
1607 },
1608 classify_error,
1609 )
1610 .await;
1611 (idx, hash, result)
1612 }
1613 })
1614 .buffer_unordered(cap);
1615 let mut results = Vec::new();
1616 while let Some((idx, hash, result)) =
1617 futures::StreamExt::next(&mut stream).await
1618 {
1619 let chunk = result
1620 .map_err(|e| {
1621 self_encryption::Error::Generic(format!(
1622 "DataMap resolution failed: {e}"
1623 ))
1624 })?
1625 .ok_or_else(|| {
1626 self_encryption::Error::Generic(format!(
1627 "DataMap chunk not found: {}",
1628 hex::encode(hash.0)
1629 ))
1630 })?;
1631 results.push((idx, chunk.content));
1632 let fetched =
1633 counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
1634 if let Some(ref tx) = prog {
1635 let _ = tx.try_send(DownloadEvent::MapChunkFetched { fetched });
1636 }
1637 }
1638 // CRITICAL: self_encryption::get_root_data_map_parallel
1639 // pairs the returned Vec POSITIONALLY with the input
1640 // hashes via .zip() and discards our idx field. We
1641 // used buffer_unordered, so completions arrive in
1642 // arbitrary order. Sort by idx to restore the input
1643 // order before returning, otherwise chunk bytes get
1644 // paired with the wrong hashes and the root data
1645 // map is corrupted.
1646 results.sort_by_key(|(idx, _)| *idx);
1647 Ok(results)
1648 })
1649 };
1650 get_root_data_map_parallel(data_map.clone(), &fetch)
1651 })
1652 .map_err(|e| Error::Encryption(format!("DataMap resolution failed: {e}")))?;
1653
1654 info!(
1655 "Resolved hierarchical DataMap: {} data chunks",
1656 resolved.len()
1657 );
1658 resolved
1659 } else {
1660 data_map.clone()
1661 };
1662
1663 // Phase 2: Now we know the real chunk count.
1664 let total_chunks = root_map.len();
1665 if let Some(ref tx) = progress {
1666 let _ = tx.try_send(DownloadEvent::DataMapResolved { total_chunks });
1667 }
1668
1669 // Phase 3: Fetch and decrypt data chunks with accurate progress.
1670 let fetched_counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
1671 let fetched_for_closure = fetched_counter.clone();
1672 let progress_for_closure = progress.clone();
1673
1674 let fetch_limiter_outer = self.controller().fetch.clone();
1675 let stream = streaming_decrypt(&root_map, |batch: &[(usize, XorName)]| {
1676 let batch_owned: Vec<(usize, XorName)> = batch.to_vec();
1677 let fetched_ref = fetched_for_closure.clone();
1678 let progress_ref = progress_for_closure.clone();
1679 let fetch_limiter = fetch_limiter_outer.clone();
1680
1681 tokio::task::block_in_place(|| {
1682 handle.block_on(async {
1683 // Per-batch peer pool: one DHT walk shared across
1684 // all the chunks in this self_encryption batch.
1685 let addrs: Vec<[u8; 32]> = batch_owned.iter().map(|(_, h)| h.0).collect();
1686 let pool = self.build_peer_pool_for(&addrs).await.map_err(|e| {
1687 self_encryption::Error::Generic(format!(
1688 "Streaming decrypt: peer pool build failed: {e}"
1689 ))
1690 })?;
1691 let pool_ref = &pool;
1692 // Clamp fan-out to batch size — see PERF-RESULTS.md.
1693 let cap = fetch_limiter.current().min(batch_owned.len().max(1));
1694 let mut stream = futures::stream::iter(batch_owned)
1695 .map(|(idx, hash)| {
1696 let addr = hash.0;
1697 let limiter = fetch_limiter.clone();
1698 async move {
1699 let result =
1700 observe_op(
1701 &limiter,
1702 || async move {
1703 self.chunk_get_with_pool(&addr, pool_ref).await
1704 },
1705 classify_error,
1706 )
1707 .await;
1708 (idx, hash, result)
1709 }
1710 })
1711 .buffer_unordered(cap);
1712
1713 let mut results = Vec::new();
1714 while let Some((idx, hash, result)) =
1715 futures::StreamExt::next(&mut stream).await
1716 {
1717 let addr_hex = hex::encode(hash.0);
1718 let chunk = result
1719 .map_err(|e| {
1720 self_encryption::Error::Generic(format!(
1721 "Network fetch failed for {addr_hex}: {e}"
1722 ))
1723 })?
1724 .ok_or_else(|| {
1725 self_encryption::Error::Generic(format!(
1726 "Chunk not found: {addr_hex}"
1727 ))
1728 })?;
1729 results.push((idx, chunk.content));
1730 let fetched =
1731 fetched_ref.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
1732 info!("Downloaded {fetched}/{total_chunks}");
1733 if let Some(ref tx) = progress_ref {
1734 let _ = tx.try_send(DownloadEvent::ChunksFetched {
1735 fetched,
1736 total: total_chunks,
1737 });
1738 }
1739 }
1740 // streaming_decrypt itself sort_by_keys before
1741 // zipping, but the same closure is also passed
1742 // through get_root_data_map_parallel internally
1743 // (see self_encryption::stream_decrypt.rs::new), and
1744 // THAT path zips positionally without sorting. Sort
1745 // here so both consumers see input order.
1746 results.sort_by_key(|(idx, _)| *idx);
1747 Ok(results)
1748 })
1749 })
1750 })
1751 .map_err(|e| Error::Encryption(format!("streaming decrypt failed: {e}")))?;
1752
1753 // Write decrypted chunks to a temp file, then rename atomically.
1754 let parent = output.parent().unwrap_or_else(|| Path::new("."));
1755 let unique: u64 = rand::random();
1756 let tmp_path = parent.join(format!(".ant_download_{}_{unique}.tmp", std::process::id()));
1757
1758 let write_result = (|| -> Result<u64> {
1759 let mut file = std::fs::File::create(&tmp_path)?;
1760 let mut bytes_written = 0u64;
1761 for chunk_result in stream {
1762 let chunk_bytes = chunk_result
1763 .map_err(|e| Error::Encryption(format!("decryption failed: {e}")))?;
1764 file.write_all(&chunk_bytes)?;
1765 bytes_written += chunk_bytes.len() as u64;
1766 }
1767 file.flush()?;
1768 Ok(bytes_written)
1769 })();
1770
1771 match write_result {
1772 Ok(bytes_written) => match std::fs::rename(&tmp_path, output) {
1773 Ok(()) => {
1774 info!(
1775 "File downloaded: {bytes_written} bytes written to {}",
1776 output.display()
1777 );
1778 Ok(bytes_written)
1779 }
1780 Err(rename_err) => {
1781 if let Err(cleanup_err) = std::fs::remove_file(&tmp_path) {
1782 warn!(
1783 "Failed to remove temp download file {}: {cleanup_err}",
1784 tmp_path.display()
1785 );
1786 }
1787 Err(rename_err.into())
1788 }
1789 },
1790 Err(e) => {
1791 if let Err(cleanup_err) = std::fs::remove_file(&tmp_path) {
1792 warn!(
1793 "Failed to remove temp download file {}: {cleanup_err}",
1794 tmp_path.display()
1795 );
1796 }
1797 Err(e)
1798 }
1799 }
1800 }
1801}
1802
1803#[cfg(test)]
1804#[allow(clippy::unwrap_used)]
1805mod tests {
1806 use super::*;
1807
1808 #[test]
1809 fn disk_space_check_passes_for_small_file() {
1810 // A 1 KB file should always pass the disk space check
1811 check_disk_space_for_spill(1024).unwrap();
1812 }
1813
1814 #[test]
1815 fn disk_space_check_fails_for_absurd_size() {
1816 // Requesting space for a 1 exabyte file should fail on any real system
1817 let result = check_disk_space_for_spill(u64::MAX / 2);
1818 assert!(result.is_err());
1819 let err = result.unwrap_err();
1820 assert!(
1821 matches!(err, Error::InsufficientDiskSpace(_)),
1822 "expected InsufficientDiskSpace, got: {err}"
1823 );
1824 }
1825
1826 #[test]
1827 fn chunk_spill_round_trip() {
1828 let mut spill = ChunkSpill::new().unwrap();
1829 let data1 = vec![0xAA; 1024];
1830 let data2 = vec![0xBB; 2048];
1831
1832 spill.push(&data1).unwrap();
1833 spill.push(&data2).unwrap();
1834
1835 assert_eq!(spill.len(), 2);
1836 assert_eq!(spill.total_bytes(), 1024 + 2048);
1837 assert_eq!(spill.avg_chunk_size(), (1024 + 2048) / 2);
1838
1839 // Read back and verify
1840 let chunk1 = spill.read_chunk(spill.addresses.first().unwrap()).unwrap();
1841 assert_eq!(&chunk1[..], &data1[..]);
1842
1843 let chunk2 = spill.read_chunk(spill.addresses.get(1).unwrap()).unwrap();
1844 assert_eq!(&chunk2[..], &data2[..]);
1845
1846 // Verify waves with 1-chunk wave size
1847 let waves: Vec<_> = spill.addresses.chunks(1).collect();
1848 assert_eq!(waves.len(), 2);
1849 }
1850
1851 #[test]
1852 fn chunk_spill_cleanup_on_drop() {
1853 let dir;
1854 {
1855 let spill = ChunkSpill::new().unwrap();
1856 dir = spill.dir.clone();
1857 assert!(dir.exists());
1858 }
1859 // After drop, the directory should be cleaned up
1860 assert!(!dir.exists(), "spill dir should be removed on drop");
1861 }
1862
1863 #[test]
1864 fn chunk_spill_deduplicates_identical_content() {
1865 let mut spill = ChunkSpill::new().unwrap();
1866 let data = vec![0xCC; 512];
1867
1868 spill.push(&data).unwrap();
1869 spill.push(&data).unwrap(); // same content, should be skipped
1870 spill.push(&data).unwrap(); // again
1871
1872 assert_eq!(spill.len(), 1, "duplicate chunks should be deduplicated");
1873 assert_eq!(
1874 spill.total_bytes(),
1875 512,
1876 "total_bytes should count unique only"
1877 );
1878
1879 // Different content should still be added
1880 let data2 = vec![0xDD; 256];
1881 spill.push(&data2).unwrap();
1882 assert_eq!(spill.len(), 2);
1883 assert_eq!(spill.total_bytes(), 512 + 256);
1884 }
1885}
1886
1887/// Compile-time assertions that Client file method futures are Send.
1888#[cfg(test)]
1889mod send_assertions {
1890 use super::*;
1891
1892 fn _assert_send<T: Send>(_: &T) {}
1893
1894 #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
1895 async fn _file_upload_is_send(client: &Client) {
1896 let fut = client.file_upload(Path::new("/dev/null"));
1897 _assert_send(&fut);
1898 }
1899
1900 #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
1901 async fn _file_upload_with_mode_is_send(client: &Client) {
1902 let fut = client.file_upload_with_mode(Path::new("/dev/null"), PaymentMode::Auto);
1903 _assert_send(&fut);
1904 }
1905
1906 #[allow(
1907 dead_code,
1908 unreachable_code,
1909 unused_variables,
1910 clippy::diverging_sub_expression
1911 )]
1912 async fn _file_download_is_send(client: &Client) {
1913 let dm: DataMap = todo!();
1914 let fut = client.file_download(&dm, Path::new("/dev/null"));
1915 _assert_send(&fut);
1916 }
1917}