ant_core/data/client/file.rs
1//! File operations using streaming self-encryption.
2//!
3//! Upload files directly from disk without loading them entirely into memory.
4//! Uses `stream_encrypt` to process files in 8KB chunks, encrypting and
5//! uploading each piece as it's produced.
6//!
7//! Encrypted chunks are spilled to a temporary directory during encryption
8//! so that peak memory usage is bounded to one wave (~256 MB for 64 × 4 MB
9//! chunks) regardless of file size.
10//!
11//! For in-memory data uploads, see the `data` module.
12
13use crate::data::client::adaptive::{observe_op, rebucketed_unordered};
14use crate::data::client::batch::{
15 finalize_batch_payment, PaymentIntent, PreparedChunk, WaveAggregateStats,
16};
17use crate::data::client::classify_error;
18use crate::data::client::merkle::{
19 chunk_contents_for_upload_addresses, finalize_merkle_batch, should_use_merkle,
20 MerkleBatchPaymentResult, PaymentMode, PreparedMerkleBatch,
21};
22use crate::data::client::Client;
23use crate::data::error::{Error, Result};
24use ant_protocol::evm::{Amount, PaymentQuote, QuoteHash, TxHash, MAX_LEAVES};
25use ant_protocol::transport::{MultiAddr, PeerId};
26use ant_protocol::{compute_address, DATA_TYPE_CHUNK};
27use bytes::Bytes;
28use fs2::FileExt;
29use futures::stream::{self, StreamExt};
30use self_encryption::{
31 get_root_data_map_parallel, stream_decrypt_batch_size, stream_encrypt,
32 streaming_decrypt_with_batch_size, DataMap,
33};
34use std::collections::{HashMap, HashSet};
35use std::io::Write;
36use std::path::{Path, PathBuf};
37use std::sync::{Arc, Mutex};
38use tokio::runtime::Handle;
39use tokio::sync::mpsc;
40use tracing::{debug, info, warn};
41use xor_name::XorName;
42
43/// Progress events emitted during file upload for UI feedback.
44#[derive(Debug, Clone)]
45pub enum UploadEvent {
46 /// A chunk has been encrypted and spilled to disk.
47 Encrypting { chunks_done: usize },
48 /// File encryption complete.
49 Encrypted { total_chunks: usize },
50 /// Starting quote collection for a wave.
51 QuotingChunks {
52 wave: usize,
53 total_waves: usize,
54 chunks_in_wave: usize,
55 },
56 /// A chunk has been quoted (peer discovery + price received).
57 /// This is the slow phase — each quote involves network round-trips.
58 ChunkQuoted { quoted: usize, total: usize },
59 /// A chunk has been stored on the network.
60 ChunkStored { stored: usize, total: usize },
61 /// A wave has completed.
62 WaveComplete {
63 wave: usize,
64 total_waves: usize,
65 stored_so_far: usize,
66 total: usize,
67 },
68}
69
70/// Progress events emitted during file download for UI feedback.
71#[derive(Debug, Clone)]
72pub enum DownloadEvent {
73 /// Resolving hierarchical DataMap to discover real chunk count.
74 ResolvingDataMap { total_map_chunks: usize },
75 /// A DataMap chunk has been fetched during resolution.
76 MapChunkFetched { fetched: usize },
77 /// DataMap resolved — total data chunk count now known.
78 DataMapResolved { total_chunks: usize },
79 /// Data chunks are being fetched from the network.
80 ChunksFetched { fetched: usize, total: usize },
81}
82
83/// One entry in the per-chunk quote list returned by
84/// [`Client::get_store_quotes`]: the responding peer, its addresses, the
85/// signed quote it returned, and the payment amount it is demanding.
86type QuoteEntry = (PeerId, Vec<MultiAddr>, PaymentQuote, Amount);
87
88/// Number of chunks per upload wave (matches batch.rs PAYMENT_WAVE_SIZE).
89const UPLOAD_WAVE_SIZE: usize = 64;
90
91/// Stream decrypt batches should be larger than fetch fan-out so
92/// the rolling fetch scheduler can keep launching new chunk GETs as earlier
93/// ones complete, instead of stopping at each self-encryption batch boundary.
94const DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER: usize = 4;
95
96/// Use at most this fraction of currently usable RAM for one decrypt batch.
97const DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR: u64 = 4;
98
99/// A decrypt batch briefly holds encrypted chunk bytes, decrypted chunk bytes,
100/// and Vec/Bytes overhead. Use a conservative multiplier rather than assuming
101/// payload bytes alone.
102const DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER: u64 = 3;
103
104/// Maximum number of distinct chunk addresses to sample when probing for a
105/// representative quote in [`Client::estimate_upload_cost`].
106///
107/// Bounded small so we never spend more than a couple of round-trips on the
108/// `AlreadyStored` retry path, which only matters when many leading chunks
109/// of a file already live on the network.
110const ESTIMATE_SAMPLE_CAP: usize = 5;
111
112/// Gas used by one `pay_for_quotes` transaction that packs up to
113/// `UPLOAD_WAVE_SIZE` (quote_hash, rewards_address, amount) entries.
114///
115/// `batch_pay` in `batch.rs` flattens every chunk's close-group quotes into a
116/// single EVM call, so the dominant cost is the SSTOREs for each entry plus
117/// the base tx overhead. On Arbitrum that is roughly
118/// `21_000 + 64 × (20_000 + small)` ≈ 1.3M; we round up to 1.5M as a
119/// conservative per-wave upper bound.
120const GAS_PER_WAVE_TX: u128 = 1_500_000;
121
122/// Gas used by one merkle batch payment transaction.
123///
124/// One on-chain tx per merkle sub-batch, but each tx verifies a merkle tree
125/// and posts a pool commitment, so budget higher than a plain transfer.
126const GAS_PER_MERKLE_TX: u128 = 500_000;
127
128/// Advisory gas price (wei/gas) used to turn the gas estimate into an ETH
129/// figure when no live gas oracle is consulted.
130///
131/// Arbitrum One typically settles around 0.1 gwei on quiet blocks; we use
132/// that as the default so the CLI prints a sensible order-of-magnitude
133/// number. Users should treat the reported gas cost as an estimate, not a
134/// commitment — real gas is bid at submission time.
135const ARBITRUM_GAS_PRICE_WEI: u128 = 100_000_000;
136
137/// Extra headroom percentage for disk space check.
138///
139/// Encrypted chunks are slightly larger than the source data due to padding
140/// and self-encryption overhead. We require file_size + 10% free space in
141/// the temp directory to account for this.
142const DISK_SPACE_HEADROOM_PERCENT: u64 = 10;
143
144/// Temporary on-disk buffer for encrypted chunks.
145///
146/// During file encryption, chunks are written to a temp directory so that
147/// only their 32-byte addresses stay in memory. At upload time chunks are
148/// read back one wave at a time, keeping peak RAM at ~`UPLOAD_WAVE_SIZE × 4 MB`.
149/// Grace period (in seconds) before a spill dir is eligible for stale cleanup.
150///
151/// This is a small TOCTOU guard covering the sub-millisecond window inside
152/// [`ChunkSpill::new`] between `create_dir` and `try_lock_exclusive`. Once a
153/// dir is older than this and its lockfile is releasable, the owning process
154/// is gone and the dir is safe to reap — regardless of how old it is.
155///
156/// The previous policy waited 24 h before reaping any orphan, which meant
157/// that any non-graceful exit (SIGKILL, kernel OOM, panic abort) leaked its
158/// spill dir until the next day's upload — and on a host being restart-looped
159/// by systemd, orphans could fill the disk well within that window.
160const SPILL_STALE_GRACE_SECS: u64 = 30;
161
162/// Prefix for spill directory names to distinguish from user files.
163const SPILL_DIR_PREFIX: &str = "spill_";
164
165/// Lockfile name inside each spill dir to signal active use.
166const SPILL_LOCK_NAME: &str = ".lock";
167
168struct ChunkSpill {
169 /// Directory holding spilled chunk files (named by hex address).
170 dir: PathBuf,
171 /// Lockfile held for the lifetime of this spill (prevents stale cleanup).
172 _lock: std::fs::File,
173 /// Deduplicated list of chunk addresses.
174 addresses: Vec<[u8; 32]>,
175 /// Tracks seen addresses for deduplication.
176 seen: HashSet<[u8; 32]>,
177 /// Byte size per spilled chunk address.
178 sizes: HashMap<[u8; 32], u64>,
179 /// Running total of unique chunk byte sizes (for average-size calculation).
180 total_bytes: u64,
181}
182
183impl ChunkSpill {
184 /// Return the parent directory for all spill dirs: `<data_dir>/spill/`.
185 fn spill_root() -> Result<PathBuf> {
186 use crate::config;
187 let root = config::data_dir()
188 .map_err(|e| Error::Config(format!("cannot determine data dir for spill: {e}")))?
189 .join("spill");
190 Ok(root)
191 }
192
193 /// Create a new spill directory under `<data_dir>/spill/`.
194 ///
195 /// Directory name is `spill_<timestamp>_<random>` so orphans can be
196 /// identified by prefix and cleaned up by age. A lockfile inside the
197 /// dir prevents concurrent cleanup from deleting an active spill.
198 fn new() -> Result<Self> {
199 let root = Self::spill_root()?;
200 std::fs::create_dir_all(&root)?;
201
202 // Clean up stale spill dirs from previous crashed runs.
203 Self::cleanup_stale(&root);
204
205 let now = std::time::SystemTime::now()
206 .duration_since(std::time::UNIX_EPOCH)
207 .unwrap_or_default()
208 .as_secs();
209 let unique: u64 = rand::random();
210 let dir = root.join(format!("{SPILL_DIR_PREFIX}{now}_{unique}"));
211 std::fs::create_dir(&dir)?;
212
213 // Create and hold a lockfile for the lifetime of this spill.
214 // cleanup_stale() will skip dirs with locked files.
215 let lock_path = dir.join(SPILL_LOCK_NAME);
216 let lock_file = std::fs::File::create(&lock_path).map_err(|e| {
217 Error::Io(std::io::Error::new(
218 e.kind(),
219 format!("failed to create spill lockfile: {e}"),
220 ))
221 })?;
222 lock_file.try_lock_exclusive().map_err(|e| {
223 Error::Io(std::io::Error::new(
224 e.kind(),
225 format!("failed to lock spill lockfile: {e}"),
226 ))
227 })?;
228
229 Ok(Self {
230 dir,
231 _lock: lock_file,
232 addresses: Vec::new(),
233 seen: HashSet::new(),
234 sizes: HashMap::new(),
235 total_bytes: 0,
236 })
237 }
238
239 /// Clean up stale spill directories. Best-effort, errors are logged.
240 ///
241 /// A spill dir is reaped when:
242 /// 1. Its name starts with `SPILL_DIR_PREFIX` (ignores unrelated files)
243 /// 2. It is an actual directory, not a symlink (prevents symlink attacks)
244 /// 3. Its timestamp is older than `SPILL_STALE_GRACE_SECS` (TOCTOU guard)
245 /// 4. Its lockfile is releasable — i.e. no live process holds it
246 ///
247 /// The lockfile is the primary correctness gate: a releasable lock means
248 /// the owning `ChunkSpill` has been dropped or the process is gone, so
249 /// the dir is fair game. The grace period covers only the brief window
250 /// inside [`Self::new`] between `create_dir` and `try_lock_exclusive`.
251 ///
252 /// Safe to call concurrently from multiple processes.
253 fn cleanup_stale(root: &Path) {
254 let now = std::time::SystemTime::now()
255 .duration_since(std::time::UNIX_EPOCH)
256 .unwrap_or_default()
257 .as_secs();
258
259 if now == 0 {
260 // Clock is broken (before Unix epoch). Skip cleanup to avoid
261 // misidentifying dirs as stale.
262 warn!("System clock before Unix epoch, skipping spill cleanup");
263 return;
264 }
265
266 let entries = match std::fs::read_dir(root) {
267 Ok(entries) => entries,
268 Err(_) => return,
269 };
270
271 for entry in entries.flatten() {
272 let name = entry.file_name();
273 let name_str = name.to_string_lossy();
274
275 // Only process dirs with our prefix.
276 let suffix = match name_str.strip_prefix(SPILL_DIR_PREFIX) {
277 Some(s) => s,
278 None => continue,
279 };
280
281 // Parse timestamp: "spill_<timestamp>_<random>"
282 let timestamp: u64 = match suffix.split('_').next().and_then(|s| s.parse().ok()) {
283 Some(ts) => ts,
284 None => continue,
285 };
286
287 if now.saturating_sub(timestamp) < SPILL_STALE_GRACE_SECS {
288 continue;
289 }
290
291 // Safety: only delete actual directories, not symlinks.
292 let file_type = match entry.file_type() {
293 Ok(ft) => ft,
294 Err(_) => continue,
295 };
296 if !file_type.is_dir() {
297 continue;
298 }
299
300 let path = entry.path();
301
302 // Check lockfile: if locked, the dir is in active use -- skip it.
303 let lock_path = path.join(SPILL_LOCK_NAME);
304 if let Ok(lock_file) = std::fs::File::open(&lock_path) {
305 use fs2::FileExt;
306 if lock_file.try_lock_exclusive().is_err() {
307 // Lock held by another process -- dir is active.
308 debug!("Skipping active spill dir: {}", path.display());
309 continue;
310 }
311 // We acquired the lock, so no one else holds it.
312 // Drop it before deleting.
313 drop(lock_file);
314 }
315
316 info!("Cleaning up stale spill dir: {}", path.display());
317 if let Err(e) = std::fs::remove_dir_all(&path) {
318 warn!("Failed to clean up stale spill dir {}: {e}", path.display());
319 }
320 }
321 }
322
323 /// Run stale spill cleanup. Call at client startup or periodically.
324 #[allow(dead_code)]
325 pub(crate) fn run_cleanup() {
326 if let Ok(root) = Self::spill_root() {
327 Self::cleanup_stale(&root);
328 }
329 }
330
331 /// Write one encrypted chunk to disk and record its address.
332 ///
333 /// Deduplicates by content address: if the same chunk was already
334 /// spilled, the write and accounting are skipped. This prevents
335 /// double-uploads and inflated quoting metrics.
336 fn push(&mut self, content: &[u8]) -> Result<()> {
337 let address = compute_address(content);
338 if !self.seen.insert(address) {
339 return Ok(());
340 }
341 let path = self.dir.join(hex::encode(address));
342 std::fs::write(&path, content)?;
343 let content_len = content.len() as u64;
344 self.sizes.insert(address, content_len);
345 self.total_bytes += content_len;
346 self.addresses.push(address);
347 Ok(())
348 }
349
350 /// Number of chunks stored.
351 fn len(&self) -> usize {
352 self.addresses.len()
353 }
354
355 /// Total bytes of all spilled chunks.
356 fn total_bytes(&self) -> u64 {
357 self.total_bytes
358 }
359
360 /// Address and byte-size pairs for all spilled chunks.
361 fn chunk_entries(&self) -> Result<Vec<([u8; 32], u64)>> {
362 self.addresses
363 .iter()
364 .map(|address| {
365 self.sizes
366 .get(address)
367 .copied()
368 .map(|size| (*address, size))
369 .ok_or_else(|| {
370 Error::Storage(format!(
371 "missing size for spilled chunk {}",
372 hex::encode(address)
373 ))
374 })
375 })
376 .collect()
377 }
378
379 /// Read a single chunk back from disk by address.
380 fn read_chunk(&self, address: &[u8; 32]) -> Result<Bytes> {
381 let path = self.dir.join(hex::encode(address));
382 let data = std::fs::read(&path).map_err(|e| {
383 Error::Io(std::io::Error::new(
384 e.kind(),
385 format!("reading spilled chunk {}: {e}", hex::encode(address)),
386 ))
387 })?;
388 Ok(Bytes::from(data))
389 }
390
391 /// Read a wave of chunks from disk.
392 fn read_wave(&self, wave_addrs: &[[u8; 32]]) -> Result<Vec<(Bytes, [u8; 32])>> {
393 let mut out = Vec::with_capacity(wave_addrs.len());
394 for addr in wave_addrs {
395 let content = self.read_chunk(addr)?;
396 out.push((content, *addr));
397 }
398 Ok(out)
399 }
400
401 /// Clean up the spill directory.
402 fn cleanup(&self) {
403 if let Err(e) = std::fs::remove_dir_all(&self.dir) {
404 warn!(
405 "Failed to clean up chunk spill dir {}: {e}",
406 self.dir.display()
407 );
408 }
409 }
410}
411
412impl Drop for ChunkSpill {
413 fn drop(&mut self) {
414 self.cleanup();
415 }
416}
417
418fn cached_merkle_covers_addresses(
419 cached: &MerkleBatchPaymentResult,
420 addresses: &[[u8; 32]],
421) -> bool {
422 addresses
423 .iter()
424 .all(|addr| cached.proofs.contains_key(addr))
425}
426
427/// Check that the spill directory has enough free space for the spilled chunks.
428///
429/// `file_size` is the source file's byte count. We require
430/// `file_size + 10%` free space to account for self-encryption overhead.
431fn check_disk_space_for_spill(file_size: u64) -> Result<()> {
432 let spill_root = ChunkSpill::spill_root()?;
433
434 // Ensure the root exists so fs2 can query it.
435 std::fs::create_dir_all(&spill_root)?;
436
437 let available = fs2::available_space(&spill_root).map_err(|e| {
438 Error::Io(std::io::Error::new(
439 e.kind(),
440 format!(
441 "failed to query disk space on {}: {e}",
442 spill_root.display()
443 ),
444 ))
445 })?;
446
447 // Use integer arithmetic to avoid f64 precision loss on large file sizes.
448 let headroom = file_size / DISK_SPACE_HEADROOM_PERCENT;
449 let required = file_size.saturating_add(headroom);
450
451 if available < required {
452 let avail_mb = available / (1024 * 1024);
453 let req_mb = required / (1024 * 1024);
454 return Err(Error::InsufficientDiskSpace(format!(
455 "need ~{req_mb} MB in spill dir ({}) but only {avail_mb} MB available",
456 spill_root.display()
457 )));
458 }
459
460 debug!(
461 "Disk space check passed: {available} bytes available, {required} bytes required (spill: {})",
462 spill_root.display()
463 );
464 Ok(())
465}
466
467fn usable_memory_bytes() -> Option<u64> {
468 let mut system = sysinfo::System::new();
469 system.refresh_memory();
470
471 let available_memory = system.available_memory();
472 let free_memory = system.free_memory();
473 let used_memory = system.used_memory();
474 let total_memory = system.total_memory();
475 let unused_memory = total_memory.saturating_sub(used_memory);
476
477 let mut usable = [available_memory, free_memory, unused_memory]
478 .into_iter()
479 .filter(|bytes| *bytes > 0)
480 .max();
481
482 let cgroup_free_memory = system
483 .cgroup_limits()
484 .filter(|limits| limits.total_memory > 0)
485 .map(|limits| limits.free_memory);
486 if let Some(cgroup_free_memory) = cgroup_free_memory {
487 usable = Some(usable.unwrap_or(u64::MAX).min(cgroup_free_memory));
488 }
489
490 debug!(
491 available_memory,
492 free_memory,
493 used_memory,
494 total_memory,
495 cgroup_free_memory,
496 usable_memory = ?usable,
497 "Detected usable memory for stream decrypt batch sizing"
498 );
499
500 usable
501}
502
503fn stream_decrypt_batch_memory_cap(usable_memory_bytes: u64) -> usize {
504 let budget = usable_memory_bytes / DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR;
505 let estimated_bytes_per_chunk = (self_encryption::MAX_CHUNK_SIZE as u64)
506 .saturating_mul(DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER)
507 .max(1);
508 let cap = (budget / estimated_bytes_per_chunk).max(1);
509
510 usize::try_from(cap).unwrap_or(usize::MAX)
511}
512
513fn adaptive_stream_decrypt_batch_size(
514 total_chunks: usize,
515 fetch_cap: usize,
516 configured_batch_floor: usize,
517 usable_memory_bytes: Option<u64>,
518) -> usize {
519 let fetch_target = fetch_cap
520 .max(1)
521 .saturating_mul(DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER);
522 let requested = match usable_memory_bytes {
523 Some(bytes) => {
524 let memory_cap = stream_decrypt_batch_memory_cap(bytes);
525 configured_batch_floor
526 .max(fetch_target)
527 .max(1)
528 .min(memory_cap)
529 }
530 None => configured_batch_floor.max(1),
531 };
532
533 requested.min(total_chunks.max(1)).max(1)
534}
535
536/// Whether the data map is published to the network for address-based retrieval.
537///
538/// A private upload stores only the data chunks and returns the `DataMap` to
539/// the caller — only someone holding that `DataMap` can reconstruct the file.
540/// A public upload additionally stores the serialized `DataMap` as a chunk on
541/// the network, yielding a single chunk address that anyone can use to
542/// retrieve the `DataMap` (via [`Client::data_map_fetch`]) and then the file.
543#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
544pub enum Visibility {
545 /// Keep the data map local; only the holder can retrieve the file.
546 #[default]
547 Private,
548 /// Publish the data map as a network chunk so anyone with the returned
549 /// address can retrieve and decrypt the file.
550 Public,
551}
552
553/// Estimated cost of uploading a file, returned by
554/// [`Client::estimate_upload_cost`].
555#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
556pub struct UploadCostEstimate {
557 /// Original file size in bytes.
558 pub file_size: u64,
559 /// Number of chunks the file would be split into (data chunks only,
560 /// does not include the DataMap chunk added during public uploads).
561 pub chunk_count: usize,
562 /// Estimated total storage cost in atto (token smallest unit).
563 pub storage_cost_atto: String,
564 /// Estimated gas cost in wei as a string. This is a rough heuristic
565 /// based on chunk count and payment mode, NOT a live gas price query.
566 pub estimated_gas_cost_wei: String,
567 /// Payment mode that would be used.
568 pub payment_mode: PaymentMode,
569}
570
571/// Result of a file upload: the `DataMap` needed to retrieve the file.
572///
573/// Marked `#[non_exhaustive]` so adding a new field in future is not a
574/// breaking change for downstream consumers that construct or pattern-match
575/// on this struct.
576#[derive(Debug, Clone)]
577#[non_exhaustive]
578pub struct FileUploadResult {
579 /// The data map containing chunk metadata for reconstruction.
580 pub data_map: DataMap,
581 /// Number of chunks stored on the network.
582 pub chunks_stored: usize,
583 /// Number of chunks that failed to store. Always 0 for a successful
584 /// upload — partial-failure information is conveyed via
585 /// [`crate::data::Error::PartialUpload`] instead.
586 pub chunks_failed: usize,
587 /// Total number of chunks in the upload, including chunks that were
588 /// already stored and skipped. On full success this equals `chunks_stored`.
589 pub total_chunks: usize,
590 /// Which payment mode was actually used (not just requested).
591 pub payment_mode_used: PaymentMode,
592 /// Total storage cost paid in token units (atto). "0" if all chunks already existed.
593 pub storage_cost_atto: String,
594 /// Total gas cost in wei. 0 if no on-chain transactions were made.
595 pub gas_cost_wei: u128,
596 /// Chunk address of the serialized `DataMap`, set only for
597 /// [`Visibility::Public`] uploads. **`Some` means this address is
598 /// retrievable from the network (via [`Client::data_map_fetch`])**, not
599 /// necessarily that *this* upload paid to store it — if the serialized
600 /// `DataMap` hashed to a chunk that was already on the network (same
601 /// file uploaded before; deterministic via self-encryption), the address
602 /// is still returned but no storage payment was made for it.
603 pub data_map_address: Option<[u8; 32]>,
604 /// Sum of chunk-store RPC attempts across the upload
605 /// (`>= chunks_stored` on full success; more if any chunk retried).
606 /// `0` for paths that don't run the wave store loop.
607 pub chunk_attempts_total: usize,
608 /// Per-chunk store wall-clock in ms (length == `chunks_stored` on full
609 /// success, empty for paths that don't run the wave store loop).
610 pub store_durations_ms: Vec<u64>,
611 /// Count of stored chunks that succeeded on each retry round
612 /// (index 0 = first attempt, 1 = first retry, etc.). All zeros for
613 /// paths that don't run the wave store loop.
614 pub retries_histogram: [usize; 4],
615}
616
617/// Payment information for external signing — either wave-batch or merkle.
618#[derive(Debug)]
619pub enum ExternalPaymentInfo {
620 /// Wave-batch: individual (quote_hash, rewards_address, amount) tuples.
621 WaveBatch {
622 /// Chunks ready for payment (needed for finalize).
623 prepared_chunks: Vec<PreparedChunk>,
624 /// Payment intent for external signing.
625 payment_intent: PaymentIntent,
626 },
627 /// Merkle: single on-chain call with depth, pool commitments, timestamp.
628 Merkle {
629 /// The prepared merkle batch (public fields sent to frontend, private fields stay in Rust).
630 prepared_batch: PreparedMerkleBatch,
631 /// Raw chunk contents that still need upload after the preflight check.
632 chunk_contents: Vec<Bytes>,
633 /// Chunk addresses that still need upload after the preflight check.
634 chunk_addresses: Vec<[u8; 32]>,
635 },
636}
637
638/// Prepared upload ready for external payment.
639///
640/// Contains everything needed to construct the on-chain payment transaction
641/// externally (e.g. via WalletConnect in a desktop app) and then finalize
642/// the upload without a Rust-side wallet.
643///
644/// Note: This struct stays in Rust memory — only the public fields of
645/// `payment_info` are sent to the frontend. `PreparedChunk` contains
646/// non-serializable network types, so the full struct cannot derive `Serialize`.
647///
648/// Marked `#[non_exhaustive]` so adding a new field in future is not a
649/// breaking change for downstream consumers.
650#[derive(Debug)]
651#[non_exhaustive]
652pub struct PreparedUpload {
653 /// The data map for later retrieval.
654 pub data_map: DataMap,
655 /// Payment information for chunks that still need payment after the
656 /// already-stored preflight. This may be wave-batch even when the original
657 /// chunk count was merkle-eligible if the remaining count is below the
658 /// merkle threshold.
659 pub payment_info: ExternalPaymentInfo,
660 /// Chunk address of the serialized `DataMap` when this upload was
661 /// prepared with [`Visibility::Public`]. `Some` means the address is
662 /// retrievable on the network after finalization — either because this
663 /// upload paid to store the chunk in `payment_info`, or because the
664 /// chunk was already on the network (deterministic self-encryption).
665 /// Carried through to [`FileUploadResult::data_map_address`].
666 pub data_map_address: Option<[u8; 32]>,
667 /// Chunk addresses already present on the network when this upload was
668 /// prepared. These do not require payment or PUT during finalization.
669 pub already_stored_addresses: Vec<[u8; 32]>,
670 /// Total chunk count for the upload, including already-stored chunks.
671 pub total_chunks: usize,
672}
673
674/// Return type for [`spawn_file_encryption`]: chunk receiver, `DataMap` oneshot, join handle.
675type EncryptionChannels = (
676 tokio::sync::mpsc::Receiver<Bytes>,
677 tokio::sync::oneshot::Receiver<DataMap>,
678 tokio::task::JoinHandle<Result<()>>,
679);
680
681/// Spawn a blocking task that streams file encryption through a channel.
682fn spawn_file_encryption(path: PathBuf) -> Result<EncryptionChannels> {
683 let metadata = std::fs::metadata(&path)?;
684 let data_size = usize::try_from(metadata.len())
685 .map_err(|e| Error::Encryption(format!("file size exceeds platform usize: {e}")))?;
686
687 let (chunk_tx, chunk_rx) = tokio::sync::mpsc::channel(2);
688 let (datamap_tx, datamap_rx) = tokio::sync::oneshot::channel();
689
690 let handle = tokio::task::spawn_blocking(move || {
691 let file = std::fs::File::open(&path)?;
692 let mut reader = std::io::BufReader::new(file);
693
694 let read_error: Arc<Mutex<Option<std::io::Error>>> = Arc::new(Mutex::new(None));
695 let read_error_clone = Arc::clone(&read_error);
696
697 let data_iter = std::iter::from_fn(move || {
698 let mut buffer = vec![0u8; 8192];
699 match std::io::Read::read(&mut reader, &mut buffer) {
700 Ok(0) => None,
701 Ok(n) => {
702 buffer.truncate(n);
703 Some(Bytes::from(buffer))
704 }
705 Err(e) => {
706 let mut guard = read_error_clone
707 .lock()
708 .unwrap_or_else(|poisoned| poisoned.into_inner());
709 *guard = Some(e);
710 None
711 }
712 }
713 });
714
715 let mut stream = stream_encrypt(data_size, data_iter)
716 .map_err(|e| Error::Encryption(format!("stream_encrypt failed: {e}")))?;
717
718 for chunk_result in stream.chunks() {
719 // Check for captured read errors immediately after each chunk.
720 // stream_encrypt sees None (EOF) when a read fails, so it stops
721 // producing chunks. We must detect this before sending the
722 // partial results to avoid uploading a truncated DataMap.
723 {
724 let guard = read_error
725 .lock()
726 .unwrap_or_else(|poisoned| poisoned.into_inner());
727 if let Some(ref e) = *guard {
728 return Err(Error::Io(std::io::Error::new(e.kind(), e.to_string())));
729 }
730 }
731
732 let (_hash, content) = chunk_result
733 .map_err(|e| Error::Encryption(format!("chunk encryption failed: {e}")))?;
734 if chunk_tx.blocking_send(content).is_err() {
735 return Err(Error::Encryption("upload receiver dropped".to_string()));
736 }
737 }
738
739 // Final check: read error after last chunk (stream saw EOF).
740 {
741 let guard = read_error
742 .lock()
743 .unwrap_or_else(|poisoned| poisoned.into_inner());
744 if let Some(ref e) = *guard {
745 return Err(Error::Io(std::io::Error::new(e.kind(), e.to_string())));
746 }
747 }
748
749 let datamap = stream
750 .into_datamap()
751 .ok_or_else(|| Error::Encryption("no DataMap after encryption".to_string()))?;
752 if datamap_tx.send(datamap).is_err() {
753 warn!("DataMap receiver dropped — upload may have been cancelled");
754 }
755 Ok(())
756 });
757
758 Ok((chunk_rx, datamap_rx, handle))
759}
760
761impl Client {
762 /// Upload a file to the network using streaming self-encryption.
763 ///
764 /// Automatically selects merkle batch payment for files that produce
765 /// 64+ chunks (saves gas). Encrypted chunks are spilled to a temp
766 /// directory so peak memory stays at ~256 MB regardless of file size.
767 ///
768 /// # Errors
769 ///
770 /// Returns an error if the file cannot be read, encryption fails,
771 /// or any chunk cannot be stored.
772 pub async fn file_upload(&self, path: &Path) -> Result<FileUploadResult> {
773 self.file_upload_with_mode(path, PaymentMode::Auto).await
774 }
775
776 /// Estimate the cost of uploading a file without actually uploading.
777 ///
778 /// Encrypts the file to determine chunk count and sizes, then requests
779 /// a single quote from the network for a representative chunk. The
780 /// per-chunk price is extrapolated to the total chunk count.
781 ///
782 /// The estimate is fast (~2-5s) and does not require a wallet. Spilled
783 /// chunks are cleaned up automatically when the function returns.
784 ///
785 /// Gas cost is an advisory heuristic, not a live gas-oracle query. It is
786 /// derived from realistic per-transaction budgets (`GAS_PER_WAVE_TX`,
787 /// `GAS_PER_MERKLE_TX`) priced at `ARBITRUM_GAS_PRICE_WEI`. Real gas
788 /// varies with network conditions.
789 ///
790 /// If the first sampled chunk is already stored on the network, the
791 /// function retries with subsequent chunk addresses (up to
792 /// `ESTIMATE_SAMPLE_CAP`). If every sampled address reports stored,
793 /// a [`Error::CostEstimationInconclusive`] is returned so callers can
794 /// decide how to react rather than trust a bogus "free" estimate. Only
795 /// when every address in the file is stored do we return a zero-cost
796 /// estimate.
797 ///
798 /// # Errors
799 ///
800 /// Returns an error if the file cannot be read, encryption fails,
801 /// the network cannot provide a quote, or every sampled chunk is
802 /// already stored ([`Error::CostEstimationInconclusive`]).
803 pub async fn estimate_upload_cost(
804 &self,
805 path: &Path,
806 mode: PaymentMode,
807 progress: Option<mpsc::Sender<UploadEvent>>,
808 ) -> Result<UploadCostEstimate> {
809 let file_size = std::fs::metadata(path).map_err(Error::Io)?.len();
810
811 if file_size < 3 {
812 return Err(Error::InvalidData(
813 "File too small: self-encryption requires at least 3 bytes".into(),
814 ));
815 }
816
817 check_disk_space_for_spill(file_size)?;
818
819 info!(
820 "Estimating upload cost for {} ({file_size} bytes)",
821 path.display()
822 );
823
824 let (spill, _data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
825 let chunk_count = spill.len();
826
827 if let Some(ref tx) = progress {
828 let _ = tx
829 .send(UploadEvent::Encrypted {
830 total_chunks: chunk_count,
831 })
832 .await;
833 }
834
835 info!("Encrypted into {chunk_count} chunks, requesting quote");
836
837 // Sample up to ESTIMATE_SAMPLE_CAP distinct chunk addresses. A single
838 // AlreadyStored result says nothing about the rest of the file — the
839 // first chunk is often a DataMap-adjacent chunk that collides with
840 // prior uploads even when 99% of the file is new. Only treat the
841 // whole file as "fully stored" when every sample comes back stored.
842 let sample_limit = spill.addresses.len().min(ESTIMATE_SAMPLE_CAP);
843 let mut sampled = 0usize;
844 let mut all_already_stored = true;
845 let mut quotes_opt: Option<Vec<QuoteEntry>> = None;
846
847 for addr in spill.addresses.iter().take(sample_limit) {
848 sampled += 1;
849 let chunk_bytes = spill.read_chunk(addr)?;
850 let data_size = u64::try_from(chunk_bytes.len())
851 .map_err(|e| Error::InvalidData(format!("chunk size too large: {e}")))?;
852 match self
853 .get_store_quotes(addr, data_size, DATA_TYPE_CHUNK)
854 .await
855 {
856 Ok(q) => {
857 quotes_opt = Some(q);
858 all_already_stored = false;
859 break;
860 }
861 Err(Error::AlreadyStored) => {
862 debug!(
863 "Sample chunk {} already stored; trying next address ({sampled}/{sample_limit})",
864 hex::encode(addr)
865 );
866 continue;
867 }
868 Err(e) => return Err(e),
869 }
870 }
871
872 let uses_merkle = should_use_merkle(chunk_count, mode);
873
874 let quotes = match quotes_opt {
875 Some(q) => q,
876 None if all_already_stored && sampled == chunk_count => {
877 // Every address in the file was sampled and every one is
878 // already on the network — returning a zero-cost estimate is
879 // accurate in this case.
880 info!("All {chunk_count} chunks already stored; returning zero-cost estimate");
881 return Ok(UploadCostEstimate {
882 file_size,
883 chunk_count,
884 storage_cost_atto: "0".into(),
885 estimated_gas_cost_wei: "0".into(),
886 payment_mode: if uses_merkle {
887 PaymentMode::Merkle
888 } else {
889 PaymentMode::Single
890 },
891 });
892 }
893 None => {
894 return Err(Error::CostEstimationInconclusive(format!(
895 "sampled {sampled} chunk addresses out of {chunk_count} and every \
896 one reported AlreadyStored; cannot infer a representative price \
897 for the remaining chunks"
898 )));
899 }
900 };
901
902 // Use the median price × 3 (matches SingleNodePayment::from_quotes
903 // which pays 3x the median to incentivize reliable storage).
904 let mut prices: Vec<Amount> = quotes.iter().map(|(_, _, _, price)| *price).collect();
905 prices.sort();
906 let median_price = prices
907 .get(prices.len() / 2)
908 .copied()
909 .unwrap_or(Amount::ZERO);
910 let per_chunk_cost = median_price * Amount::from(3u64);
911
912 let chunk_count_u64 = u64::try_from(chunk_count).unwrap_or(u64::MAX);
913 let total_storage = per_chunk_cost * Amount::from(chunk_count_u64);
914
915 // Estimate gas cost from realistic per-transaction budgets rather
916 // than a flat per-chunk or per-wave number.
917 //
918 // - Single mode: `batch_pay` packs up to UPLOAD_WAVE_SIZE chunks'
919 // close-group quotes into one `pay_for_quotes` call on Arbitrum.
920 // The dominant cost is one SSTORE per entry plus base tx overhead,
921 // so we use GAS_PER_WAVE_TX (≈1.5M) as a conservative upper bound
922 // on a full wave and multiply by the number of waves. The previous
923 // per-wave figure of 150k was closer to a single-entry transfer
924 // and understated cost by 5–10x for full waves.
925 // - Merkle mode: one tx per sub-batch that verifies a merkle tree
926 // and posts a pool commitment (GAS_PER_MERKLE_TX ≈ 500k each).
927 //
928 // Gas is priced at ARBITRUM_GAS_PRICE_WEI (~0.1 gwei, a typical
929 // Arbitrum baseline). Treat the result as advisory, not a commitment.
930 let waves = u128::try_from(chunk_count.div_ceil(UPLOAD_WAVE_SIZE)).unwrap_or(u128::MAX);
931 let merkle_batches = u128::try_from(chunk_count.div_ceil(MAX_LEAVES)).unwrap_or(u128::MAX);
932 let estimated_gas: u128 = if uses_merkle {
933 merkle_batches
934 .saturating_mul(GAS_PER_MERKLE_TX)
935 .saturating_mul(ARBITRUM_GAS_PRICE_WEI)
936 } else {
937 waves
938 .saturating_mul(GAS_PER_WAVE_TX)
939 .saturating_mul(ARBITRUM_GAS_PRICE_WEI)
940 };
941
942 info!(
943 "Estimate: {chunk_count} chunks, storage={total_storage} atto, gas~={estimated_gas} wei"
944 );
945
946 Ok(UploadCostEstimate {
947 file_size,
948 chunk_count,
949 storage_cost_atto: total_storage.to_string(),
950 estimated_gas_cost_wei: estimated_gas.to_string(),
951 payment_mode: if uses_merkle {
952 PaymentMode::Merkle
953 } else {
954 PaymentMode::Single
955 },
956 })
957 }
958
959 /// Phase 1 of external-signer upload: encrypt file and prepare chunks.
960 ///
961 /// Equivalent to [`Client::file_prepare_upload_with_visibility`] with
962 /// [`Visibility::Private`] — see that method for details.
963 pub async fn file_prepare_upload(&self, path: &Path) -> Result<PreparedUpload> {
964 self.file_prepare_upload_with_progress(path, Visibility::Private, None)
965 .await
966 }
967
968 /// Phase 1 of external-signer upload with explicit [`Visibility`] control.
969 ///
970 /// Equivalent to [`Client::file_prepare_upload_with_progress`] with
971 /// `progress: None` — see that method for details.
972 pub async fn file_prepare_upload_with_visibility(
973 &self,
974 path: &Path,
975 visibility: Visibility,
976 ) -> Result<PreparedUpload> {
977 self.file_prepare_upload_with_progress(path, visibility, None)
978 .await
979 }
980
981 /// Phase 1 of external-signer upload with progress events.
982 ///
983 /// Requires an EVM network (for contract price queries) but NOT a wallet.
984 /// Returns a [`PreparedUpload`] containing the data map, prepared chunks,
985 /// and a [`PaymentIntent`] that the external signer uses to construct
986 /// and submit the on-chain payment transaction.
987 ///
988 /// When `visibility` is [`Visibility::Public`], the serialized `DataMap`
989 /// is bundled into the payment batch as an additional chunk and its
990 /// address is recorded on the returned [`PreparedUpload`]. After
991 /// [`Client::finalize_upload`] (or `_merkle`) succeeds, that address is
992 /// surfaced via [`FileUploadResult::data_map_address`] so the uploader
993 /// can share a single address from which anyone can retrieve the file.
994 ///
995 /// When `progress` is `Some`, [`UploadEvent`]s are emitted on the channel
996 /// during encryption ([`UploadEvent::Encrypting`] / [`UploadEvent::Encrypted`])
997 /// and per-chunk quoting ([`UploadEvent::ChunkQuoted`]). Storage events are
998 /// emitted later by [`Client::finalize_upload_with_progress`] /
999 /// [`Client::finalize_upload_merkle_with_progress`].
1000 ///
1001 /// **Memory note:** Encryption uses disk spilling for bounded memory, but
1002 /// the returned [`PreparedUpload`] holds all chunk content in memory (each
1003 /// [`PreparedChunk`] contains a `Bytes` with the full chunk data). This is
1004 /// inherent to the two-phase external-signer protocol — the chunks must
1005 /// stay in memory until [`Client::finalize_upload`] stores them. For very
1006 /// large files, prefer [`Client::file_upload`] which streams directly.
1007 ///
1008 /// # Errors
1009 ///
1010 /// Returns an error if there is insufficient disk space, the file cannot
1011 /// be read, encryption fails, or quote collection fails.
1012 pub async fn file_prepare_upload_with_progress(
1013 &self,
1014 path: &Path,
1015 visibility: Visibility,
1016 progress: Option<mpsc::Sender<UploadEvent>>,
1017 ) -> Result<PreparedUpload> {
1018 debug!(
1019 "Preparing file upload for external signing (visibility={visibility:?}): {}",
1020 path.display()
1021 );
1022
1023 let file_size = std::fs::metadata(path)?.len();
1024 check_disk_space_for_spill(file_size)?;
1025
1026 let (spill, data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
1027
1028 info!(
1029 "Encrypted {} into {} chunks for external signing (spilled to disk)",
1030 path.display(),
1031 spill.len()
1032 );
1033
1034 // Read each chunk from disk and collect quotes concurrently.
1035 // Note: all PreparedChunks accumulate in memory because the external-signer
1036 // protocol requires them for finalize_upload. NOT memory-bounded for large files.
1037 let mut chunk_data: Vec<Bytes> = spill
1038 .addresses
1039 .iter()
1040 .map(|addr| spill.read_chunk(addr))
1041 .collect::<std::result::Result<Vec<_>, _>>()?;
1042
1043 // For public uploads, bundle the serialized DataMap as an extra chunk
1044 // in the same payment batch. This lets the external signer pay for
1045 // the data chunks and the DataMap chunk in one flow, and lets the
1046 // finalize step return the DataMap's chunk address as the shareable
1047 // retrieval address.
1048 let data_map_address = match visibility {
1049 Visibility::Private => None,
1050 Visibility::Public => {
1051 let serialized = rmp_serde::to_vec(&data_map).map_err(|e| {
1052 Error::Serialization(format!("Failed to serialize DataMap: {e}"))
1053 })?;
1054 let bytes = Bytes::from(serialized);
1055 let address = compute_address(&bytes);
1056 info!(
1057 "Public upload: bundling DataMap chunk ({} bytes) at address {}",
1058 bytes.len(),
1059 hex::encode(address)
1060 );
1061 chunk_data.push(bytes);
1062 Some(address)
1063 }
1064 };
1065
1066 let chunk_count = chunk_data.len();
1067
1068 if let Some(ref tx) = progress {
1069 let _ = tx
1070 .send(UploadEvent::Encrypted {
1071 total_chunks: chunk_count,
1072 })
1073 .await;
1074 }
1075
1076 let (payment_info, already_stored_addresses) = if should_use_merkle(
1077 chunk_count,
1078 PaymentMode::Auto,
1079 ) {
1080 // Merkle path: build tree, collect candidate pools, return for external payment.
1081 info!("Using merkle batch preparation for {chunk_count} file chunks");
1082
1083 let chunk_entries: Vec<([u8; 32], u64)> = chunk_data
1084 .iter()
1085 .map(|chunk| {
1086 let size = u64::try_from(chunk.len())
1087 .map_err(|e| Error::InvalidData(format!("chunk size too large: {e}")))?;
1088 Ok((compute_address(chunk), size))
1089 })
1090 .collect::<Result<Vec<_>>>()?;
1091
1092 let merkle_plan = self
1093 .plan_merkle_upload(chunk_entries, DATA_TYPE_CHUNK, progress.as_ref())
1094 .await?;
1095
1096 if merkle_plan.to_upload.is_empty() {
1097 info!("All {chunk_count} file chunks already stored; no external payment needed");
1098 (
1099 ExternalPaymentInfo::WaveBatch {
1100 prepared_chunks: Vec::new(),
1101 payment_intent: PaymentIntent::from_prepared_chunks(&[]),
1102 },
1103 merkle_plan.already_stored,
1104 )
1105 } else {
1106 let chunk_data =
1107 chunk_contents_for_upload_addresses(chunk_data, &merkle_plan.to_upload)?;
1108
1109 if !should_use_merkle(merkle_plan.to_upload.len(), PaymentMode::Auto) {
1110 info!(
1111 "{} file chunks need upload after merkle preflight; preparing wave-batch payment",
1112 merkle_plan.to_upload.len()
1113 );
1114 let (payment_info, mut wave_already_stored) = self
1115 .prepare_wave_batch_external_chunks(
1116 chunk_data,
1117 progress.as_ref(),
1118 chunk_count,
1119 )
1120 .await?;
1121 let mut already_stored = merkle_plan.already_stored;
1122 already_stored.append(&mut wave_already_stored);
1123 (payment_info, already_stored)
1124 } else {
1125 match self
1126 .prepare_merkle_batch_external(
1127 &merkle_plan.to_upload,
1128 DATA_TYPE_CHUNK,
1129 merkle_plan.to_upload_avg_size(),
1130 )
1131 .await
1132 {
1133 Ok(prepared_batch) => {
1134 info!(
1135 "File prepared for external merkle signing: {} chunks, depth={} ({})",
1136 merkle_plan.to_upload.len(),
1137 prepared_batch.depth,
1138 path.display()
1139 );
1140
1141 (
1142 ExternalPaymentInfo::Merkle {
1143 prepared_batch,
1144 chunk_contents: chunk_data,
1145 chunk_addresses: merkle_plan.to_upload,
1146 },
1147 merkle_plan.already_stored,
1148 )
1149 }
1150 Err(Error::InsufficientPeers(ref msg)) => {
1151 info!(
1152 "External merkle preparation needs more peers ({msg}); preparing wave-batch payment"
1153 );
1154 let (payment_info, mut wave_already_stored) = self
1155 .prepare_wave_batch_external_chunks(
1156 chunk_data,
1157 progress.as_ref(),
1158 chunk_count,
1159 )
1160 .await?;
1161 let mut already_stored = merkle_plan.already_stored;
1162 already_stored.append(&mut wave_already_stored);
1163 (payment_info, already_stored)
1164 }
1165 Err(e) => return Err(e),
1166 }
1167 }
1168 }
1169 } else {
1170 self.prepare_wave_batch_external_chunks(chunk_data, progress.as_ref(), chunk_count)
1171 .await?
1172 };
1173
1174 // Surface the "DataMap chunk was already on the network" case
1175 // so debugging "why is data_map_address set but no storage cost
1176 // appears for it?" doesn't require reading the source. See the
1177 // `data_map_address` doc comment for why this is still a valid
1178 // `Some(addr)` outcome.
1179 if let Some(addr) = data_map_address {
1180 let data_map_needs_payment = match &payment_info {
1181 ExternalPaymentInfo::WaveBatch {
1182 prepared_chunks, ..
1183 } => prepared_chunks.iter().any(|c| c.address == addr),
1184 ExternalPaymentInfo::Merkle {
1185 chunk_addresses, ..
1186 } => chunk_addresses.contains(&addr),
1187 };
1188 if !data_map_needs_payment {
1189 info!(
1190 "Public upload: DataMap chunk {} was already stored \
1191 on the network — address is retrievable without a \
1192 new payment",
1193 hex::encode(addr)
1194 );
1195 }
1196 }
1197
1198 Ok(PreparedUpload {
1199 data_map,
1200 payment_info,
1201 data_map_address,
1202 already_stored_addresses,
1203 total_chunks: chunk_count,
1204 })
1205 }
1206
1207 async fn prepare_wave_batch_external_chunks(
1208 &self,
1209 chunk_data: Vec<Bytes>,
1210 progress: Option<&mpsc::Sender<UploadEvent>>,
1211 progress_total: usize,
1212 ) -> Result<(ExternalPaymentInfo, Vec<[u8; 32]>)> {
1213 let chunk_count = chunk_data.len();
1214 let chunks_with_addr: Vec<(Bytes, [u8; 32])> = chunk_data
1215 .into_iter()
1216 .map(|content| {
1217 let address = compute_address(&content);
1218 (content, address)
1219 })
1220 .collect();
1221
1222 // Wave-batch path: collect quotes per chunk concurrently, emitting
1223 // a `ChunkQuoted` event after each completion so callers can drive
1224 // a progress bar through the slow quote phase.
1225 let quote_limiter = self.controller().quote.clone();
1226 let quote_concurrency = quote_limiter.current().min(chunk_count.max(1));
1227 let mut quote_stream = stream::iter(chunks_with_addr)
1228 .map(|(content, address)| {
1229 let limiter = quote_limiter.clone();
1230 async move {
1231 let result = observe_op(
1232 &limiter,
1233 || async move { self.prepare_chunk_payment(content).await },
1234 classify_error,
1235 )
1236 .await;
1237 (address, result)
1238 }
1239 })
1240 .buffer_unordered(quote_concurrency);
1241
1242 let mut prepared_chunks = Vec::with_capacity(chunk_count);
1243 let mut already_stored = Vec::new();
1244 let mut quoted = 0usize;
1245 while let Some((address, result)) = quote_stream.next().await {
1246 match result? {
1247 Some(prepared) => prepared_chunks.push(prepared),
1248 None => already_stored.push(address),
1249 }
1250 quoted += 1;
1251 if let Some(tx) = progress {
1252 let _ = tx.try_send(UploadEvent::ChunkQuoted {
1253 quoted,
1254 total: progress_total,
1255 });
1256 }
1257 }
1258
1259 let payment_intent = PaymentIntent::from_prepared_chunks(&prepared_chunks);
1260 info!(
1261 "Prepared external wave-batch payment: {} chunks, {} already stored, total {} atto",
1262 prepared_chunks.len(),
1263 already_stored.len(),
1264 payment_intent.total_amount,
1265 );
1266
1267 Ok((
1268 ExternalPaymentInfo::WaveBatch {
1269 prepared_chunks,
1270 payment_intent,
1271 },
1272 already_stored,
1273 ))
1274 }
1275
1276 /// Phase 2 of external-signer upload (wave-batch): finalize with externally-signed tx hashes.
1277 ///
1278 /// Takes a [`PreparedUpload`] that used wave-batch payment and a map
1279 /// of `quote_hash -> tx_hash` provided by the external signer after on-chain
1280 /// payment. Builds payment proofs and stores chunks on the network.
1281 ///
1282 /// # Errors
1283 ///
1284 /// Returns an error if the prepared upload used merkle payment (use
1285 /// [`Client::finalize_upload_merkle`] instead), proof construction fails,
1286 /// or any chunk cannot be stored.
1287 pub async fn finalize_upload(
1288 &self,
1289 prepared: PreparedUpload,
1290 tx_hash_map: &HashMap<QuoteHash, TxHash>,
1291 ) -> Result<FileUploadResult> {
1292 self.finalize_upload_with_progress(prepared, tx_hash_map, None)
1293 .await
1294 }
1295
1296 /// Phase 2 of external-signer upload (wave-batch) with progress events.
1297 ///
1298 /// Same as [`Client::finalize_upload`] but emits [`UploadEvent::ChunkStored`]
1299 /// on the provided channel as each chunk is successfully stored.
1300 ///
1301 /// # Errors
1302 ///
1303 /// Same as [`Client::finalize_upload`].
1304 pub async fn finalize_upload_with_progress(
1305 &self,
1306 prepared: PreparedUpload,
1307 tx_hash_map: &HashMap<QuoteHash, TxHash>,
1308 progress: Option<mpsc::Sender<UploadEvent>>,
1309 ) -> Result<FileUploadResult> {
1310 let data_map_address = prepared.data_map_address;
1311 let already_stored_addresses = prepared.already_stored_addresses;
1312 let already_stored_count = already_stored_addresses.len();
1313 let total_chunks = prepared.total_chunks;
1314 match prepared.payment_info {
1315 ExternalPaymentInfo::WaveBatch {
1316 prepared_chunks,
1317 payment_intent: _,
1318 } => {
1319 let paid_chunks = finalize_batch_payment(prepared_chunks, tx_hash_map)?;
1320 let wave_result = self
1321 .store_paid_chunks_with_events(
1322 paid_chunks,
1323 progress.as_ref(),
1324 already_stored_count,
1325 total_chunks,
1326 )
1327 .await;
1328 if !wave_result.failed.is_empty() {
1329 let failed_count = wave_result.failed.len();
1330 let stored_count = already_stored_count + wave_result.stored.len();
1331 let mut stored = already_stored_addresses;
1332 stored.extend(wave_result.stored);
1333 return Err(Error::PartialUpload {
1334 stored,
1335 stored_count,
1336 failed: wave_result.failed,
1337 failed_count,
1338 total_chunks,
1339 reason: "finalize_upload: chunk storage failed after retries".into(),
1340 });
1341 }
1342 let chunks_stored = already_stored_count + wave_result.stored.len();
1343
1344 info!("External-signer upload finalized: {chunks_stored} chunks stored");
1345
1346 let mut stats = WaveAggregateStats::default();
1347 stats.absorb(&wave_result);
1348
1349 Ok(FileUploadResult {
1350 data_map: prepared.data_map,
1351 chunks_stored,
1352 chunks_failed: 0,
1353 total_chunks,
1354 payment_mode_used: PaymentMode::Single,
1355 storage_cost_atto: "0".into(),
1356 gas_cost_wei: 0,
1357 data_map_address,
1358 chunk_attempts_total: stats.chunk_attempts_total,
1359 store_durations_ms: stats.store_durations_ms,
1360 retries_histogram: stats.retries_histogram,
1361 })
1362 }
1363 ExternalPaymentInfo::Merkle { .. } => Err(Error::Payment(
1364 "Cannot finalize merkle upload with wave-batch tx hashes. \
1365 Use finalize_upload_merkle() instead."
1366 .to_string(),
1367 )),
1368 }
1369 }
1370
1371 /// Phase 2 of external-signer upload (merkle): finalize with winner pool hash.
1372 ///
1373 /// Takes a [`PreparedUpload`] that used merkle payment and the `winner_pool_hash`
1374 /// returned by the on-chain merkle payment transaction. Generates proofs and
1375 /// stores chunks on the network.
1376 ///
1377 /// # Errors
1378 ///
1379 /// Returns an error if the prepared upload used wave-batch payment (use
1380 /// [`Client::finalize_upload`] instead), proof generation fails,
1381 /// or any chunk cannot be stored.
1382 pub async fn finalize_upload_merkle(
1383 &self,
1384 prepared: PreparedUpload,
1385 winner_pool_hash: [u8; 32],
1386 ) -> Result<FileUploadResult> {
1387 self.finalize_upload_merkle_with_progress(prepared, winner_pool_hash, None)
1388 .await
1389 }
1390
1391 /// Phase 2 of external-signer upload (merkle) with progress events.
1392 ///
1393 /// Same as [`Client::finalize_upload_merkle`] but emits [`UploadEvent::ChunkStored`]
1394 /// on the provided channel as each chunk is successfully stored.
1395 ///
1396 /// # Errors
1397 ///
1398 /// Same as [`Client::finalize_upload_merkle`].
1399 pub async fn finalize_upload_merkle_with_progress(
1400 &self,
1401 prepared: PreparedUpload,
1402 winner_pool_hash: [u8; 32],
1403 progress: Option<mpsc::Sender<UploadEvent>>,
1404 ) -> Result<FileUploadResult> {
1405 let data_map_address = prepared.data_map_address;
1406 let already_stored_count = prepared.already_stored_addresses.len();
1407 let total_chunks = prepared.total_chunks;
1408 match prepared.payment_info {
1409 ExternalPaymentInfo::Merkle {
1410 prepared_batch,
1411 chunk_contents,
1412 chunk_addresses,
1413 } => {
1414 let batch_result = finalize_merkle_batch(prepared_batch, winner_pool_hash)?;
1415 let outcome = self
1416 .merkle_upload_chunks(
1417 chunk_contents,
1418 chunk_addresses,
1419 &batch_result,
1420 progress.as_ref(),
1421 already_stored_count,
1422 total_chunks,
1423 )
1424 .await?;
1425
1426 info!(
1427 "External-signer merkle upload finalized: {} chunks stored, {} failed",
1428 outcome.stored, outcome.failed
1429 );
1430
1431 Ok(FileUploadResult {
1432 data_map: prepared.data_map,
1433 chunks_stored: outcome.stored,
1434 chunks_failed: outcome.failed,
1435 total_chunks,
1436 payment_mode_used: PaymentMode::Merkle,
1437 storage_cost_atto: "0".into(),
1438 gas_cost_wei: 0,
1439 data_map_address,
1440 chunk_attempts_total: outcome.stats.chunk_attempts_total,
1441 store_durations_ms: outcome.stats.store_durations_ms,
1442 retries_histogram: outcome.stats.retries_histogram,
1443 })
1444 }
1445 ExternalPaymentInfo::WaveBatch { .. } => Err(Error::Payment(
1446 "Cannot finalize wave-batch upload with merkle winner hash. \
1447 Use finalize_upload() instead."
1448 .to_string(),
1449 )),
1450 }
1451 }
1452
1453 /// Upload a file with a specific payment mode.
1454 ///
1455 /// Before encryption, checks that the temp directory has enough free
1456 /// disk space for the spilled chunks (~1.1× source file size).
1457 ///
1458 /// Encrypted chunks are spilled to a temp directory during encryption
1459 /// so that only their 32-byte addresses stay in memory. At upload time,
1460 /// chunks are read back one wave at a time (~64 × 4 MB ≈ 256 MB peak).
1461 ///
1462 /// # Errors
1463 ///
1464 /// Returns an error if there is insufficient disk space, the file cannot
1465 /// be read, encryption fails, or any chunk cannot be stored.
1466 #[allow(clippy::too_many_lines)]
1467 pub async fn file_upload_with_mode(
1468 &self,
1469 path: &Path,
1470 mode: PaymentMode,
1471 ) -> Result<FileUploadResult> {
1472 self.file_upload_with_progress(path, mode, None).await
1473 }
1474
1475 /// Upload a file with progress events sent to the given channel.
1476 ///
1477 /// Same as [`Client::file_upload_with_mode`] but sends [`UploadEvent`]s to the
1478 /// provided channel for UI progress feedback.
1479 #[allow(clippy::too_many_lines)]
1480 pub async fn file_upload_with_progress(
1481 &self,
1482 path: &Path,
1483 mode: PaymentMode,
1484 progress: Option<mpsc::Sender<UploadEvent>>,
1485 ) -> Result<FileUploadResult> {
1486 debug!(
1487 "Streaming file upload with mode {mode:?}: {}",
1488 path.display()
1489 );
1490
1491 // Pre-flight: verify enough temp disk space for the chunk spill.
1492 let file_size = std::fs::metadata(path)?.len();
1493 check_disk_space_for_spill(file_size)?;
1494
1495 // Phase 1: Encrypt file and spill chunks to temp directory.
1496 // Only 32-byte addresses stay in memory — chunk data lives on disk.
1497 let (spill, data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
1498
1499 let chunk_count = spill.len();
1500 info!(
1501 "Encrypted {} into {chunk_count} chunks (spilled to disk)",
1502 path.display()
1503 );
1504 if let Some(ref tx) = progress {
1505 let _ = tx
1506 .send(UploadEvent::Encrypted {
1507 total_chunks: chunk_count,
1508 })
1509 .await;
1510 }
1511
1512 // Phase 2: Decide payment mode and upload in waves from disk.
1513 //
1514 // For the merkle path, attempt to resume from a cached
1515 // receipt before paying again. The cache is keyed by the
1516 // CANONICAL source path so `./foo`, `/abs/foo`, and any
1517 // symlink alias all resolve to the same cache entry — a
1518 // crash-and-retry from a different cwd or via a different
1519 // alias still hits the receipt. Canonicalize may fail (the
1520 // file could have been moved between phase 1 and here); we
1521 // fall back to the display string in that case, which
1522 // preserves pre-fix behaviour rather than dropping cache
1523 // resume entirely.
1524 let file_path_key = std::fs::canonicalize(path)
1525 .map(|p| p.display().to_string())
1526 .unwrap_or_else(|_| path.display().to_string());
1527 let (chunks_stored, actual_mode, storage_cost_atto, gas_cost_wei, stats) = if self
1528 .should_use_merkle(chunk_count, mode)
1529 {
1530 info!("Using merkle batch payment for {chunk_count} file chunks");
1531
1532 let cached_merkle =
1533 crate::data::client::cached_merkle::try_load_for_file(&file_path_key)
1534 .map(|(_cache_path, cached)| cached);
1535
1536 let merkle_plan = match self
1537 .plan_merkle_upload(spill.chunk_entries()?, DATA_TYPE_CHUNK, progress.as_ref())
1538 .await
1539 {
1540 Ok(plan) => plan,
1541 Err(e) => {
1542 if let Some(cached) = cached_merkle
1543 .as_ref()
1544 .filter(|cached| cached_merkle_covers_addresses(cached, &spill.addresses))
1545 {
1546 info!(
1547 "Merkle preflight failed ({e}); \
1548 resuming with cached merkle proofs"
1549 );
1550 let (stored, sc, gc, stats) = self
1551 .upload_waves_merkle(
1552 &spill,
1553 &spill.addresses,
1554 cached,
1555 &[],
1556 progress.as_ref(),
1557 )
1558 .await?;
1559 crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
1560 return Ok(FileUploadResult {
1561 data_map,
1562 chunks_stored: stored,
1563 chunks_failed: 0,
1564 total_chunks: chunk_count,
1565 payment_mode_used: PaymentMode::Merkle,
1566 storage_cost_atto: sc,
1567 gas_cost_wei: gc,
1568 data_map_address: None,
1569 chunk_attempts_total: stats.chunk_attempts_total,
1570 store_durations_ms: stats.store_durations_ms,
1571 retries_histogram: stats.retries_histogram,
1572 });
1573 }
1574 match &e {
1575 Error::InsufficientPeers(msg) if mode == PaymentMode::Auto => {
1576 info!(
1577 "Merkle preflight needs more peers ({msg}), \
1578 falling back to wave-batch"
1579 );
1580 let (stored, sc, gc, fb_stats) = self
1581 .upload_waves_single(
1582 &spill,
1583 progress.as_ref(),
1584 Some(&file_path_key),
1585 )
1586 .await?;
1587 crate::data::client::cached_single::try_delete_for_file(&file_path_key);
1588 return Ok(FileUploadResult {
1589 data_map,
1590 chunks_stored: stored,
1591 chunks_failed: 0,
1592 total_chunks: chunk_count,
1593 payment_mode_used: PaymentMode::Single,
1594 storage_cost_atto: sc,
1595 gas_cost_wei: gc,
1596 data_map_address: None,
1597 chunk_attempts_total: fb_stats.chunk_attempts_total,
1598 store_durations_ms: fb_stats.store_durations_ms,
1599 retries_histogram: fb_stats.retries_histogram,
1600 });
1601 }
1602 _ => return Err(e),
1603 }
1604 }
1605 };
1606
1607 if merkle_plan.to_upload.is_empty() {
1608 info!("All {chunk_count} merkle chunks already stored; skipping payment");
1609 crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
1610 crate::data::client::cached_single::try_delete_for_file(&file_path_key);
1611 (
1612 chunk_count,
1613 PaymentMode::Merkle,
1614 "0".to_string(),
1615 0,
1616 WaveAggregateStats::default(),
1617 )
1618 } else if !self.should_use_merkle(merkle_plan.to_upload.len(), mode) {
1619 let remaining_chunks = merkle_plan.to_upload.len();
1620 if let Some(cached) = cached_merkle
1621 .as_ref()
1622 .filter(|cached| cached_merkle_covers_addresses(cached, &merkle_plan.to_upload))
1623 {
1624 info!(
1625 "{remaining_chunks} chunks remain below merkle threshold; \
1626 reusing cached merkle proofs"
1627 );
1628 let (stored, sc, gc, stats) = self
1629 .upload_waves_merkle(
1630 &spill,
1631 &merkle_plan.to_upload,
1632 cached,
1633 &merkle_plan.already_stored,
1634 progress.as_ref(),
1635 )
1636 .await?;
1637 crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
1638 (stored, PaymentMode::Merkle, sc, gc, stats)
1639 } else {
1640 if cached_merkle.is_some() {
1641 info!(
1642 "{remaining_chunks} chunks remain below merkle threshold, \
1643 and the cached merkle receipt does not cover them. \
1644 Discarding cache and using single-node payment."
1645 );
1646 crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
1647 } else {
1648 info!(
1649 "{remaining_chunks} chunks need upload after merkle preflight; \
1650 using single-node payment"
1651 );
1652 }
1653 let (stored, sc, gc, stats) = self
1654 .upload_spill_addresses_single(
1655 &spill,
1656 &merkle_plan.to_upload,
1657 progress.as_ref(),
1658 merkle_plan.already_stored.len(),
1659 chunk_count,
1660 Some(&file_path_key),
1661 )
1662 .await?;
1663 crate::data::client::cached_single::try_delete_for_file(&file_path_key);
1664 (stored, PaymentMode::Single, sc, gc, stats)
1665 }
1666 } else {
1667 let batch_result = if let Some(cached) = cached_merkle.as_ref() {
1668 // Validate the cache against the chunks that still need
1669 // storage. Extra proofs are harmless: a previous attempt
1670 // may have paid for chunks that are now already stored.
1671 if cached_merkle_covers_addresses(cached, &merkle_plan.to_upload) {
1672 info!(
1673 "Skipping merkle payment phase; resuming with \
1674 cached proofs for {} remaining chunks",
1675 merkle_plan.to_upload.len()
1676 );
1677 Ok(cached.clone())
1678 } else {
1679 info!(
1680 "Cached merkle receipt does not cover the current \
1681 remaining chunks (cached={}, remaining={}). \
1682 Discarding cache and paying fresh.",
1683 cached.proofs.len(),
1684 merkle_plan.to_upload.len()
1685 );
1686 crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
1687 self.pay_for_merkle_batch(
1688 &merkle_plan.to_upload,
1689 DATA_TYPE_CHUNK,
1690 merkle_plan.to_upload_avg_size(),
1691 )
1692 .await
1693 .inspect(|result| {
1694 crate::data::client::cached_merkle::try_save(&file_path_key, result);
1695 })
1696 }
1697 } else {
1698 self.pay_for_merkle_batch(
1699 &merkle_plan.to_upload,
1700 DATA_TYPE_CHUNK,
1701 merkle_plan.to_upload_avg_size(),
1702 )
1703 .await
1704 .inspect(|result| {
1705 // Save BEFORE the store phase so a crash
1706 // mid-upload leaves a resumable receipt.
1707 crate::data::client::cached_merkle::try_save(&file_path_key, result);
1708 })
1709 };
1710
1711 let batch_result = match batch_result {
1712 Ok(result) => result,
1713 Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => {
1714 info!("Merkle needs more peers ({msg}), falling back to wave-batch");
1715 let (stored, sc, gc, fb_stats) = self
1716 .upload_spill_addresses_single(
1717 &spill,
1718 &merkle_plan.to_upload,
1719 progress.as_ref(),
1720 merkle_plan.already_stored.len(),
1721 chunk_count,
1722 Some(&file_path_key),
1723 )
1724 .await?;
1725 crate::data::client::cached_single::try_delete_for_file(&file_path_key);
1726 return Ok(FileUploadResult {
1727 data_map,
1728 chunks_stored: stored,
1729 chunks_failed: 0,
1730 total_chunks: chunk_count,
1731 payment_mode_used: PaymentMode::Single,
1732 storage_cost_atto: sc,
1733 gas_cost_wei: gc,
1734 data_map_address: None,
1735 chunk_attempts_total: fb_stats.chunk_attempts_total,
1736 store_durations_ms: fb_stats.store_durations_ms,
1737 retries_histogram: fb_stats.retries_histogram,
1738 });
1739 }
1740 Err(e) => return Err(e),
1741 };
1742
1743 let (stored, sc, gc, stats) = self
1744 .upload_waves_merkle(
1745 &spill,
1746 &merkle_plan.to_upload,
1747 &batch_result,
1748 &merkle_plan.already_stored,
1749 progress.as_ref(),
1750 )
1751 .await?;
1752 // Upload succeeded end-to-end; the cached receipt is
1753 // no longer needed.
1754 crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
1755 (stored, PaymentMode::Merkle, sc, gc, stats)
1756 }
1757 } else {
1758 let (stored, sc, gc, stats) = self
1759 .upload_waves_single(&spill, progress.as_ref(), Some(&file_path_key))
1760 .await?;
1761 // Full file success: drop any cached single-node receipt.
1762 crate::data::client::cached_single::try_delete_for_file(&file_path_key);
1763 (stored, PaymentMode::Single, sc, gc, stats)
1764 };
1765
1766 info!(
1767 "File uploaded with {actual_mode:?}: {chunks_stored} chunks stored ({})",
1768 path.display()
1769 );
1770
1771 Ok(FileUploadResult {
1772 data_map,
1773 chunks_stored,
1774 chunks_failed: 0,
1775 total_chunks: chunk_count,
1776 payment_mode_used: actual_mode,
1777 storage_cost_atto,
1778 gas_cost_wei,
1779 data_map_address: None,
1780 chunk_attempts_total: stats.chunk_attempts_total,
1781 store_durations_ms: stats.store_durations_ms,
1782 retries_histogram: stats.retries_histogram,
1783 })
1784 }
1785
1786 /// Encrypt a file and spill chunks to a temp directory.
1787 ///
1788 /// Logs progress every 100 chunks so users get feedback during
1789 /// multi-GB encryptions.
1790 ///
1791 /// Returns the spill buffer (addresses on disk) and the `DataMap`.
1792 async fn encrypt_file_to_spill(
1793 &self,
1794 path: &Path,
1795 progress: Option<&mpsc::Sender<UploadEvent>>,
1796 ) -> Result<(ChunkSpill, DataMap)> {
1797 let (mut chunk_rx, datamap_rx, handle) = spawn_file_encryption(path.to_path_buf())?;
1798
1799 let mut spill = ChunkSpill::new()?;
1800 while let Some(content) = chunk_rx.recv().await {
1801 spill.push(&content)?;
1802 let chunks_done = spill.len();
1803 if let Some(tx) = progress {
1804 if chunks_done.is_multiple_of(10) {
1805 let _ = tx.send(UploadEvent::Encrypting { chunks_done }).await;
1806 }
1807 }
1808 if chunks_done % 100 == 0 {
1809 let mb = spill.total_bytes() / (1024 * 1024);
1810 info!(
1811 "Encryption progress: {chunks_done} chunks spilled ({mb} MB) — {}",
1812 path.display()
1813 );
1814 }
1815 }
1816
1817 // Await encryption completion to catch errors before paying.
1818 handle
1819 .await
1820 .map_err(|e| Error::Encryption(format!("encryption task panicked: {e}")))?
1821 .map_err(|e| Error::Encryption(format!("encryption failed: {e}")))?;
1822
1823 let data_map = datamap_rx
1824 .await
1825 .map_err(|_| Error::Encryption("no DataMap from encryption thread".to_string()))?;
1826
1827 Ok((spill, data_map))
1828 }
1829
1830 /// Upload chunks from a spill using wave-based per-chunk (single) payments.
1831 ///
1832 /// Reads one wave at a time from disk, prepares quotes, pays, and stores.
1833 /// Peak memory: ~`UPLOAD_WAVE_SIZE × MAX_CHUNK_SIZE` (~256 MB).
1834 ///
1835 /// Returns `(chunks_stored, storage_cost_atto, gas_cost_wei)`.
1836 async fn upload_waves_single(
1837 &self,
1838 spill: &ChunkSpill,
1839 progress: Option<&mpsc::Sender<UploadEvent>>,
1840 resume_key: Option<&str>,
1841 ) -> Result<(usize, String, u128, WaveAggregateStats)> {
1842 self.upload_spill_addresses_single(
1843 spill,
1844 &spill.addresses,
1845 progress,
1846 0,
1847 spill.len(),
1848 resume_key,
1849 )
1850 .await
1851 }
1852
1853 async fn upload_spill_addresses_single(
1854 &self,
1855 spill: &ChunkSpill,
1856 addresses: &[[u8; 32]],
1857 progress: Option<&mpsc::Sender<UploadEvent>>,
1858 stored_offset: usize,
1859 total_chunks: usize,
1860 resume_key: Option<&str>,
1861 ) -> Result<(usize, String, u128, WaveAggregateStats)> {
1862 let mut total_stored = stored_offset;
1863 let mut total_storage = Amount::ZERO;
1864 let mut total_gas: u128 = 0;
1865 let mut agg_stats = WaveAggregateStats::default();
1866 let waves: Vec<&[[u8; 32]]> = addresses.chunks(UPLOAD_WAVE_SIZE).collect();
1867 let wave_count = waves.len();
1868
1869 for (wave_idx, wave_addrs) in waves.into_iter().enumerate() {
1870 let wave_num = wave_idx + 1;
1871 let wave_data: Vec<Bytes> = wave_addrs
1872 .iter()
1873 .map(|addr| spill.read_chunk(addr))
1874 .collect::<Result<Vec<_>>>()?;
1875
1876 info!(
1877 "Wave {wave_num}/{wave_count}: quoting {} chunks — {total_stored}/{total_chunks} stored so far",
1878 wave_data.len()
1879 );
1880 if let Some(tx) = progress {
1881 let _ = tx
1882 .send(UploadEvent::QuotingChunks {
1883 wave: wave_num,
1884 total_waves: wave_count,
1885 chunks_in_wave: wave_data.len(),
1886 })
1887 .await;
1888 }
1889 let (addresses, wave_storage, wave_gas, wave_stats) = self
1890 .batch_upload_chunks_with_events(
1891 wave_data,
1892 progress,
1893 total_stored,
1894 total_chunks,
1895 resume_key,
1896 )
1897 .await?;
1898 total_stored += addresses.len();
1899 if let Ok(cost) = wave_storage.parse::<Amount>() {
1900 total_storage += cost;
1901 }
1902 total_gas = total_gas.saturating_add(wave_gas);
1903 // Merge per-call stats (each call already aggregates across the
1904 // waves it ran internally, so a simple sum/extend is correct).
1905 agg_stats.chunk_attempts_total = agg_stats
1906 .chunk_attempts_total
1907 .saturating_add(wave_stats.chunk_attempts_total);
1908 agg_stats
1909 .store_durations_ms
1910 .extend(wave_stats.store_durations_ms);
1911 for (slot, count) in agg_stats
1912 .retries_histogram
1913 .iter_mut()
1914 .zip(wave_stats.retries_histogram.iter())
1915 {
1916 *slot = slot.saturating_add(*count);
1917 }
1918 if let Some(tx) = progress {
1919 let _ = tx
1920 .send(UploadEvent::WaveComplete {
1921 wave: wave_num,
1922 total_waves: wave_count,
1923 stored_so_far: total_stored,
1924 total: total_chunks,
1925 })
1926 .await;
1927 }
1928 }
1929
1930 Ok((
1931 total_stored,
1932 total_storage.to_string(),
1933 total_gas,
1934 agg_stats,
1935 ))
1936 }
1937
1938 /// Upload chunks from a spill using pre-computed merkle proofs.
1939 ///
1940 /// Reads one wave at a time from disk, pairs each chunk with its proof,
1941 /// and uploads concurrently. Peak memory: ~`UPLOAD_WAVE_SIZE × MAX_CHUNK_SIZE`.
1942 ///
1943 /// Returns `(chunks_stored, storage_cost_atto, gas_cost_wei)`.
1944 /// Costs come from the `batch_result` which was populated during payment.
1945 async fn upload_waves_merkle(
1946 &self,
1947 spill: &ChunkSpill,
1948 addresses: &[[u8; 32]],
1949 batch_result: &MerkleBatchPaymentResult,
1950 already_stored_addresses: &[[u8; 32]],
1951 progress: Option<&mpsc::Sender<UploadEvent>>,
1952 ) -> Result<(usize, String, u128, WaveAggregateStats)> {
1953 let mut total_stored = already_stored_addresses.len();
1954 let total_chunks = total_stored + addresses.len();
1955 let waves: Vec<&[[u8; 32]]> = addresses.chunks(UPLOAD_WAVE_SIZE).collect();
1956 let wave_count = waves.len();
1957 let mut stored_addresses: Vec<[u8; 32]> = already_stored_addresses.to_vec();
1958 let mut agg_stats = WaveAggregateStats::default();
1959
1960 for (wave_idx, wave_addrs) in waves.into_iter().enumerate() {
1961 let wave_num = wave_idx + 1;
1962 let wave = spill.read_wave(wave_addrs)?;
1963
1964 info!(
1965 "Wave {wave_num}/{wave_count}: storing {} chunks (merkle) — {total_stored}/{total_chunks} stored so far",
1966 wave.len()
1967 );
1968
1969 let store_limiter = self.controller().store.clone();
1970 // Clamp fan-out to wave size — partial last wave should
1971 // not pay for extra slots (see PERF-RESULTS.md).
1972 let store_concurrency = store_limiter.current().min(wave.len().max(1));
1973 let mut upload_stream = stream::iter(wave.into_iter().map(|(content, addr)| {
1974 let proof_bytes = batch_result.proofs.get(&addr).cloned();
1975 let limiter = store_limiter.clone();
1976 async move {
1977 let started = std::time::Instant::now();
1978 let proof = proof_bytes.ok_or_else(|| {
1979 (
1980 addr,
1981 Error::Payment(format!(
1982 "Missing merkle proof for chunk {}",
1983 hex::encode(addr)
1984 )),
1985 started,
1986 )
1987 })?;
1988 let peers = self
1989 .close_group_peers(&addr)
1990 .await
1991 .map_err(|e| (addr, e, started))?;
1992 observe_op(
1993 &limiter,
1994 || async move {
1995 self.chunk_put_to_close_group(content, proof, &peers).await
1996 },
1997 classify_error,
1998 )
1999 .await
2000 .map(|_| (addr, started))
2001 .map_err(|e| (addr, e, started))
2002 }
2003 }))
2004 .buffer_unordered(store_concurrency);
2005
2006 while let Some(result) = upload_stream.next().await {
2007 match result {
2008 Ok((addr, started)) => {
2009 let duration_ms =
2010 u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX);
2011 agg_stats.store_durations_ms.push(duration_ms);
2012 agg_stats.chunk_attempts_total =
2013 agg_stats.chunk_attempts_total.saturating_add(1);
2014 agg_stats.retries_histogram[0] =
2015 agg_stats.retries_histogram[0].saturating_add(1);
2016 stored_addresses.push(addr);
2017 total_stored += 1;
2018 info!("Stored {total_stored}/{total_chunks}");
2019 if let Some(tx) = progress {
2020 let _ = tx
2021 .send(UploadEvent::ChunkStored {
2022 stored: total_stored,
2023 total: total_chunks,
2024 })
2025 .await;
2026 }
2027 }
2028 Err((addr, e, _started)) => {
2029 warn!("merkle upload failed for chunk {}: {e}", hex::encode(addr));
2030 return Err(Error::PartialUpload {
2031 stored: stored_addresses,
2032 stored_count: total_stored,
2033 failed: vec![(addr, e.to_string())],
2034 failed_count: 1,
2035 total_chunks,
2036 reason: format!("merkle chunk upload failed: {e}"),
2037 });
2038 }
2039 }
2040 }
2041
2042 if let Some(tx) = progress {
2043 let _ = tx
2044 .send(UploadEvent::WaveComplete {
2045 wave: wave_num,
2046 total_waves: wave_count,
2047 stored_so_far: total_stored,
2048 total: total_chunks,
2049 })
2050 .await;
2051 }
2052 }
2053
2054 Ok((
2055 total_stored,
2056 batch_result.storage_cost_atto.clone(),
2057 batch_result.gas_cost_wei,
2058 agg_stats,
2059 ))
2060 }
2061
2062 /// Download and decrypt a file from the network, writing it to disk.
2063 ///
2064 /// Uses `streaming_decrypt` so that only one batch of chunks lives in
2065 /// memory at a time, avoiding OOM on large files. Chunks are fetched
2066 /// concurrently within each batch, then decrypted data is written to
2067 /// disk incrementally.
2068 ///
2069 /// Returns the number of bytes written.
2070 ///
2071 /// # Panics
2072 ///
2073 /// Requires a multi-threaded Tokio runtime (`flavor = "multi_thread"`).
2074 /// Will panic if called from a `current_thread` runtime because
2075 /// `streaming_decrypt` takes a synchronous callback that must bridge
2076 /// back to async via `block_in_place`.
2077 ///
2078 /// # Errors
2079 ///
2080 /// Returns an error if any chunk cannot be retrieved, decryption fails,
2081 /// or the file cannot be written.
2082 #[allow(clippy::unused_async)]
2083 pub async fn file_download(&self, data_map: &DataMap, output: &Path) -> Result<u64> {
2084 self.file_download_with_progress(data_map, output, None)
2085 .await
2086 }
2087
2088 /// Download and decrypt a file with progress events.
2089 ///
2090 /// Same as [`Client::file_download`] but sends [`DownloadEvent`]s for UI feedback.
2091 ///
2092 /// Progress reporting:
2093 /// 1. Resolves hierarchical DataMaps to the root level first (reports as
2094 /// `ChunksFetched` with `total: 0` during resolution)
2095 /// 2. Once the root DataMap is known, sends `total_chunks` with accurate count
2096 /// 3. Fetches data chunks with accurate `fetched/total` progress
2097 #[allow(clippy::unused_async)]
2098 pub async fn file_download_with_progress(
2099 &self,
2100 data_map: &DataMap,
2101 output: &Path,
2102 progress: Option<mpsc::Sender<DownloadEvent>>,
2103 ) -> Result<u64> {
2104 debug!("Downloading file to {}", output.display());
2105
2106 let handle = Handle::current();
2107
2108 // Phase 1: Resolve hierarchical DataMap to root level.
2109 // This fetches child DataMap chunks (typically 3) to discover the real chunk count.
2110 let root_map = if data_map.is_child() {
2111 let dm_chunks = data_map.len();
2112 if let Some(ref tx) = progress {
2113 let _ = tx.try_send(DownloadEvent::ResolvingDataMap {
2114 total_map_chunks: dm_chunks,
2115 });
2116 }
2117
2118 let resolve_progress = progress.clone();
2119 let resolve_counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
2120
2121 let resolved = tokio::task::block_in_place(|| {
2122 let counter_ref = resolve_counter.clone();
2123 let progress_ref = resolve_progress.clone();
2124 let fetch_limiter = self.controller().fetch.clone();
2125 let fetch = |batch: &[(usize, XorName)]| {
2126 let batch_owned: Vec<(usize, XorName)> = batch.to_vec();
2127 let counter = counter_ref.clone();
2128 let prog = progress_ref.clone();
2129 let limiter = fetch_limiter.clone();
2130 handle.block_on(async {
2131 // Use rebucketed_unordered so the in-flight cap
2132 // is re-read from the limiter as each slot frees.
2133 // `buffer_unordered` snapshots the cap once at
2134 // pipeline build, which means observe_op
2135 // signals from inside chunk_get cannot reduce
2136 // concurrency on the current batch — exactly
2137 // the case where load-shedding is needed.
2138 let mut results = rebucketed_unordered(
2139 &limiter,
2140 batch_owned,
2141 |(idx, hash): (usize, XorName)| {
2142 let counter = counter.clone();
2143 let prog = prog.clone();
2144 async move {
2145 let addr = hash.0;
2146 // chunk_get_observed feeds the
2147 // adaptive fetch limiter once per
2148 // call via chunk_get_outcome
2149 // (Ok(None) -> Timeout is the
2150 // load-shedding signal for
2151 // sustained close-group exhaustion).
2152 let chunk = self
2153 .chunk_get_observed(&addr)
2154 .await
2155 .map_err(|e| {
2156 self_encryption::Error::Generic(format!(
2157 "DataMap resolution failed: {e}"
2158 ))
2159 })?
2160 .ok_or_else(|| {
2161 self_encryption::Error::Generic(format!(
2162 "DataMap chunk not found: {}",
2163 hex::encode(addr)
2164 ))
2165 })?;
2166 let fetched = counter
2167 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
2168 + 1;
2169 if let Some(ref tx) = prog {
2170 let _ =
2171 tx.try_send(DownloadEvent::MapChunkFetched { fetched });
2172 }
2173 Ok::<_, self_encryption::Error>((idx, chunk.content))
2174 }
2175 },
2176 )
2177 .await?;
2178 // CRITICAL: self_encryption::get_root_data_map_parallel
2179 // pairs the returned Vec POSITIONALLY with the input
2180 // hashes via .zip() and discards our idx field.
2181 // rebucketed_unordered preserves first-completion
2182 // order, so sort by idx to restore input order
2183 // before returning.
2184 results.sort_by_key(|(idx, _)| *idx);
2185 Ok(results)
2186 })
2187 };
2188 get_root_data_map_parallel(data_map.clone(), &fetch)
2189 })
2190 .map_err(|e| Error::Encryption(format!("DataMap resolution failed: {e}")))?;
2191
2192 info!(
2193 "Resolved hierarchical DataMap: {} data chunks",
2194 resolved.len()
2195 );
2196 resolved
2197 } else {
2198 data_map.clone()
2199 };
2200
2201 // Phase 2: Now we know the real chunk count.
2202 let total_chunks = root_map.len();
2203 if let Some(ref tx) = progress {
2204 let _ = tx.try_send(DownloadEvent::DataMapResolved { total_chunks });
2205 }
2206
2207 // Phase 3: Fetch and decrypt data chunks with accurate progress.
2208 let fetched_counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
2209 let fetched_for_closure = fetched_counter.clone();
2210 let progress_for_closure = progress.clone();
2211
2212 let fetch_limiter_outer = self.controller().fetch.clone();
2213 let usable_memory = usable_memory_bytes();
2214 let configured_batch_floor = stream_decrypt_batch_size();
2215 let fetch_cap = fetch_limiter_outer.current();
2216 let decrypt_batch_size = adaptive_stream_decrypt_batch_size(
2217 total_chunks,
2218 fetch_cap,
2219 configured_batch_floor,
2220 usable_memory,
2221 );
2222 info!(
2223 total_chunks,
2224 fetch_cap,
2225 configured_batch_floor,
2226 ?usable_memory,
2227 decrypt_batch_size,
2228 "Selected adaptive stream decrypt batch size"
2229 );
2230
2231 let stream = streaming_decrypt_with_batch_size(
2232 &root_map,
2233 |batch: &[(usize, XorName)]| {
2234 let batch_owned: Vec<(usize, XorName)> = batch.to_vec();
2235 let fetched_ref = fetched_for_closure.clone();
2236 let progress_ref = progress_for_closure.clone();
2237 let fetch_limiter = fetch_limiter_outer.clone();
2238
2239 tokio::task::block_in_place(|| {
2240 handle.block_on(async {
2241 // First pass: try every chunk in the batch via
2242 // chunk_get_observed (which already does its own
2243 // first-attempt + retry sweep). A chunk that
2244 // returns Ok(None) here is NOT a fatal failure
2245 // — it's a candidate for a deferred retry below.
2246 // We carry the chunk's XorName through so the
2247 // retry pass can re-fetch by address.
2248 //
2249 // The closure ONLY returns Err on a true
2250 // protocol/network error from chunk_get (the
2251 // Err variant). Ok(None) is encoded as
2252 // `Err(addr)` in the inner Result so the outer
2253 // rebucketed pass doesn't early-abort on it.
2254 type BatchEntry =
2255 (usize, std::result::Result<bytes::Bytes, XorName>);
2256 let raw: Vec<BatchEntry> = rebucketed_unordered(
2257 &fetch_limiter,
2258 batch_owned,
2259 |(idx, hash): (usize, XorName)| {
2260 let fetched_ref = fetched_ref.clone();
2261 let progress_ref = progress_ref.clone();
2262 async move {
2263 let addr = hash.0;
2264 let addr_hex = hex::encode(addr);
2265 match self.chunk_get_observed(&addr).await {
2266 Ok(Some(chunk)) => {
2267 let fetched = fetched_ref.fetch_add(
2268 1,
2269 std::sync::atomic::Ordering::Relaxed,
2270 ) + 1;
2271 info!("Downloaded {fetched}/{total_chunks}");
2272 if let Some(ref tx) = progress_ref {
2273 let _ = tx.try_send(
2274 DownloadEvent::ChunksFetched {
2275 fetched,
2276 total: total_chunks,
2277 },
2278 );
2279 }
2280 Ok::<BatchEntry, self_encryption::Error>((
2281 idx,
2282 Ok(chunk.content),
2283 ))
2284 }
2285 // chunk_get returned Ok(None): defer
2286 // this chunk for a later retry rather
2287 // than aborting the whole batch.
2288 Ok(None) => Ok((idx, Err(hash))),
2289 // A transient error for one chunk
2290 // (e.g. its close-group DHT walk
2291 // erroring on this pass) must not
2292 // abort a multi-hundred-chunk
2293 // download. Defer it to the retry
2294 // rounds, same as Ok(None); only a
2295 // chunk that survives all deferred
2296 // rounds is fatal.
2297 Err(e) => {
2298 info!(
2299 "First-pass fetch error for {addr_hex}: {e}; deferring"
2300 );
2301 Ok((idx, Err(hash)))
2302 }
2303 }
2304 }
2305 },
2306 )
2307 .await?;
2308
2309 // Partition: things we already have vs the
2310 // deferred set we need to retry.
2311 let mut results: Vec<(usize, bytes::Bytes)> = Vec::new();
2312 let mut deferred: Vec<(usize, XorName)> = Vec::new();
2313 for (idx, inner) in raw {
2314 match inner {
2315 Ok(bytes) => results.push((idx, bytes)),
2316 Err(hash) => deferred.push((idx, hash)),
2317 }
2318 }
2319
2320 // Deferred retry pass: retry the deferred chunks
2321 // in CONCURRENT rounds (reusing the fetch
2322 // limiter's cap), not serially. The first round
2323 // fires immediately — most deferrals on a
2324 // healthy-but-lossy link are peer-side noise
2325 // that clears in well under a second, and
2326 // serializing them behind mandatory multi-second
2327 // sleeps was the single biggest throughput sink
2328 // on such links (a batch deferring ~20 chunks
2329 // burned minutes of near-zero throughput even
2330 // though every chunk succeeded on its first
2331 // retry). Only chunks that survive a round get a
2332 // longer back-off before the next, so genuine
2333 // saturation still gets time to settle.
2334 if !deferred.is_empty() {
2335 // Round delays in seconds. Round 0 is
2336 // immediate; later rounds back off to ride
2337 // out sustained saturation.
2338 const DEFERRED_ROUND_DELAYS_SECS: [u64; 3] = [0, 15, 45];
2339 info!(
2340 "Deferring {} chunk(s) for concurrent retry after batch settles",
2341 deferred.len()
2342 );
2343 let mut remaining = deferred;
2344 for (round, &delay_secs) in DEFERRED_ROUND_DELAYS_SECS
2345 .iter()
2346 .enumerate()
2347 {
2348 if remaining.is_empty() {
2349 break;
2350 }
2351 if delay_secs > 0 {
2352 tokio::time::sleep(std::time::Duration::from_secs(
2353 delay_secs,
2354 ))
2355 .await;
2356 }
2357 info!(
2358 "Deferred retry round {}/{}: {} chunk(s)",
2359 round + 1,
2360 DEFERRED_ROUND_DELAYS_SECS.len(),
2361 remaining.len(),
2362 );
2363 let round_input = std::mem::take(&mut remaining);
2364 let round_results: Vec<BatchEntry> = rebucketed_unordered(
2365 &fetch_limiter,
2366 round_input,
2367 |(idx, hash): (usize, XorName)| {
2368 let fetched_ref = fetched_ref.clone();
2369 let progress_ref = progress_ref.clone();
2370 async move {
2371 let addr = hash.0;
2372 // Both Ok(None) and a transient
2373 // Err re-defer the chunk to the
2374 // next round rather than
2375 // aborting; only the final
2376 // round's leftovers are fatal.
2377 match self.chunk_get_observed(&addr).await {
2378 Ok(Some(chunk)) => {
2379 let fetched = fetched_ref.fetch_add(
2380 1,
2381 std::sync::atomic::Ordering::Relaxed,
2382 ) + 1;
2383 info!(
2384 "Downloaded {fetched}/{total_chunks} (deferred retry)"
2385 );
2386 if let Some(ref tx) = progress_ref {
2387 let _ = tx.try_send(
2388 DownloadEvent::ChunksFetched {
2389 fetched,
2390 total: total_chunks,
2391 },
2392 );
2393 }
2394 Ok::<BatchEntry, self_encryption::Error>((
2395 idx,
2396 Ok(chunk.content),
2397 ))
2398 }
2399 Ok(None) => Ok((idx, Err(hash))),
2400 Err(e) => {
2401 info!(
2402 "Deferred retry for {} hit transient error: {e}; re-deferring",
2403 hex::encode(addr)
2404 );
2405 Ok((idx, Err(hash)))
2406 }
2407 }
2408 }
2409 },
2410 )
2411 .await?;
2412 for (idx, inner) in round_results {
2413 match inner {
2414 Ok(bytes) => results.push((idx, bytes)),
2415 Err(hash) => remaining.push((idx, hash)),
2416 }
2417 }
2418 }
2419 if let Some((_, hash)) = remaining.first() {
2420 return Err(self_encryption::Error::Generic(format!(
2421 "Chunk not found after {} deferred retry rounds: {}",
2422 DEFERRED_ROUND_DELAYS_SECS.len(),
2423 hex::encode(hash.0),
2424 )));
2425 }
2426 }
2427
2428 // streaming_decrypt itself sort_by_keys before
2429 // zipping, but the same closure is also passed
2430 // through get_root_data_map_parallel internally
2431 // (see self_encryption::stream_decrypt.rs::new), and
2432 // THAT path zips positionally without sorting. Sort
2433 // here so both consumers see input order.
2434 results.sort_by_key(|(idx, _)| *idx);
2435 Ok(results)
2436 })
2437 })
2438 },
2439 decrypt_batch_size,
2440 )
2441 .map_err(|e| Error::Encryption(format!("streaming decrypt failed: {e}")))?;
2442
2443 // Write decrypted chunks to a temp file, then rename atomically.
2444 let parent = output.parent().unwrap_or_else(|| Path::new("."));
2445 let unique: u64 = rand::random();
2446 let tmp_path = parent.join(format!(".ant_download_{}_{unique}.tmp", std::process::id()));
2447
2448 let write_result = (|| -> Result<u64> {
2449 let mut file = std::fs::File::create(&tmp_path)?;
2450 let mut bytes_written = 0u64;
2451 for chunk_result in stream {
2452 let chunk_bytes = chunk_result
2453 .map_err(|e| Error::Encryption(format!("decryption failed: {e}")))?;
2454 file.write_all(&chunk_bytes)?;
2455 bytes_written += chunk_bytes.len() as u64;
2456 }
2457 file.flush()?;
2458 Ok(bytes_written)
2459 })();
2460
2461 match write_result {
2462 Ok(bytes_written) => match std::fs::rename(&tmp_path, output) {
2463 Ok(()) => {
2464 info!(
2465 "File downloaded: {bytes_written} bytes written to {}",
2466 output.display()
2467 );
2468 Ok(bytes_written)
2469 }
2470 Err(rename_err) => {
2471 if let Err(cleanup_err) = std::fs::remove_file(&tmp_path) {
2472 warn!(
2473 "Failed to remove temp download file {}: {cleanup_err}",
2474 tmp_path.display()
2475 );
2476 }
2477 Err(rename_err.into())
2478 }
2479 },
2480 Err(e) => {
2481 if let Err(cleanup_err) = std::fs::remove_file(&tmp_path) {
2482 warn!(
2483 "Failed to remove temp download file {}: {cleanup_err}",
2484 tmp_path.display()
2485 );
2486 }
2487 Err(e)
2488 }
2489 }
2490 }
2491}
2492
2493#[cfg(test)]
2494#[allow(clippy::unwrap_used)]
2495mod tests {
2496 use super::*;
2497
2498 #[test]
2499 fn disk_space_check_passes_for_small_file() {
2500 // A 1 KB file should always pass the disk space check
2501 check_disk_space_for_spill(1024).unwrap();
2502 }
2503
2504 #[test]
2505 fn disk_space_check_fails_for_absurd_size() {
2506 // Requesting space for a 1 exabyte file should fail on any real system
2507 let result = check_disk_space_for_spill(u64::MAX / 2);
2508 assert!(result.is_err());
2509 let err = result.unwrap_err();
2510 assert!(
2511 matches!(err, Error::InsufficientDiskSpace(_)),
2512 "expected InsufficientDiskSpace, got: {err}"
2513 );
2514 }
2515
2516 #[test]
2517 fn adaptive_stream_decrypt_batch_size_tracks_fetch_headroom() {
2518 let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, Some(u64::MAX));
2519
2520 assert_eq!(batch_size, 64 * DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER);
2521 }
2522
2523 #[test]
2524 fn adaptive_stream_decrypt_batch_size_caps_to_total_chunks() {
2525 let batch_size = adaptive_stream_decrypt_batch_size(12, 64, 10, Some(u64::MAX));
2526
2527 assert_eq!(batch_size, 12);
2528 }
2529
2530 #[test]
2531 fn adaptive_stream_decrypt_batch_size_honours_configured_floor() {
2532 let batch_size = adaptive_stream_decrypt_batch_size(1_000, 1, 32, None);
2533
2534 assert_eq!(batch_size, 32);
2535 }
2536
2537 #[test]
2538 fn adaptive_stream_decrypt_batch_size_does_not_expand_without_memory_reading() {
2539 let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, None);
2540
2541 assert_eq!(batch_size, 10);
2542 }
2543
2544 #[test]
2545 fn adaptive_stream_decrypt_batch_size_caps_to_memory_budget() {
2546 let estimated_bytes_per_chunk = (self_encryption::MAX_CHUNK_SIZE as u64)
2547 .saturating_mul(DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER)
2548 .max(1);
2549 let usable_memory = estimated_bytes_per_chunk
2550 .saturating_mul(16)
2551 .saturating_mul(DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR);
2552 let batch_size = adaptive_stream_decrypt_batch_size(1_000, 256, 10, Some(usable_memory));
2553
2554 assert_eq!(batch_size, 16);
2555 }
2556
2557 #[test]
2558 fn adaptive_stream_decrypt_batch_size_keeps_one_chunk_when_memory_is_tight() {
2559 let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, Some(1));
2560
2561 assert_eq!(batch_size, 1);
2562 }
2563
2564 #[test]
2565 fn cached_merkle_covers_only_when_all_addresses_have_proofs() {
2566 let covered = compute_address(&Bytes::from_static(b"covered"));
2567 let extra = compute_address(&Bytes::from_static(b"extra"));
2568 let missing = compute_address(&Bytes::from_static(b"missing"));
2569 let cached = MerkleBatchPaymentResult {
2570 proofs: HashMap::from([(covered, vec![1]), (extra, vec![2])]),
2571 chunk_count: 2,
2572 storage_cost_atto: "0".to_string(),
2573 gas_cost_wei: 0,
2574 merkle_payment_timestamp: 0,
2575 };
2576
2577 assert!(cached_merkle_covers_addresses(&cached, &[covered]));
2578 assert!(cached_merkle_covers_addresses(&cached, &[covered, extra]));
2579 assert!(!cached_merkle_covers_addresses(
2580 &cached,
2581 &[covered, missing]
2582 ));
2583 }
2584
2585 #[test]
2586 fn chunk_spill_round_trip() {
2587 let mut spill = ChunkSpill::new().unwrap();
2588 let data1 = vec![0xAA; 1024];
2589 let data2 = vec![0xBB; 2048];
2590
2591 spill.push(&data1).unwrap();
2592 spill.push(&data2).unwrap();
2593
2594 assert_eq!(spill.len(), 2);
2595 assert_eq!(spill.total_bytes(), 1024 + 2048);
2596 let chunk_entries = spill.chunk_entries().unwrap();
2597 let entry_total: u64 = chunk_entries.iter().map(|(_, size)| *size).sum();
2598 assert_eq!(entry_total, 1024 + 2048);
2599
2600 // Read back and verify
2601 let chunk1 = spill.read_chunk(spill.addresses.first().unwrap()).unwrap();
2602 assert_eq!(&chunk1[..], &data1[..]);
2603
2604 let chunk2 = spill.read_chunk(spill.addresses.get(1).unwrap()).unwrap();
2605 assert_eq!(&chunk2[..], &data2[..]);
2606
2607 // Verify waves with 1-chunk wave size
2608 let waves: Vec<_> = spill.addresses.chunks(1).collect();
2609 assert_eq!(waves.len(), 2);
2610 }
2611
2612 #[test]
2613 fn chunk_spill_cleanup_on_drop() {
2614 let dir;
2615 {
2616 let spill = ChunkSpill::new().unwrap();
2617 dir = spill.dir.clone();
2618 assert!(dir.exists());
2619 }
2620 // After drop, the directory should be cleaned up
2621 assert!(!dir.exists(), "spill dir should be removed on drop");
2622 }
2623
2624 #[test]
2625 fn chunk_spill_deduplicates_identical_content() {
2626 let mut spill = ChunkSpill::new().unwrap();
2627 let data = vec![0xCC; 512];
2628
2629 spill.push(&data).unwrap();
2630 spill.push(&data).unwrap(); // same content, should be skipped
2631 spill.push(&data).unwrap(); // again
2632
2633 assert_eq!(spill.len(), 1, "duplicate chunks should be deduplicated");
2634 assert_eq!(
2635 spill.total_bytes(),
2636 512,
2637 "total_bytes should count unique only"
2638 );
2639
2640 // Different content should still be added
2641 let data2 = vec![0xDD; 256];
2642 spill.push(&data2).unwrap();
2643 assert_eq!(spill.len(), 2);
2644 assert_eq!(spill.total_bytes(), 512 + 256);
2645 }
2646}
2647
2648/// Compile-time assertions that Client file method futures are Send.
2649#[cfg(test)]
2650mod send_assertions {
2651 use super::*;
2652
2653 fn _assert_send<T: Send>(_: &T) {}
2654
2655 #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
2656 async fn _file_upload_is_send(client: &Client) {
2657 let fut = client.file_upload(Path::new("/dev/null"));
2658 _assert_send(&fut);
2659 }
2660
2661 #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
2662 async fn _file_upload_with_mode_is_send(client: &Client) {
2663 let fut = client.file_upload_with_mode(Path::new("/dev/null"), PaymentMode::Auto);
2664 _assert_send(&fut);
2665 }
2666
2667 #[allow(
2668 dead_code,
2669 unreachable_code,
2670 unused_variables,
2671 clippy::diverging_sub_expression
2672 )]
2673 async fn _file_download_is_send(client: &Client) {
2674 let dm: DataMap = todo!();
2675 let fut = client.file_download(&dm, Path::new("/dev/null"));
2676 _assert_send(&fut);
2677 }
2678}