ant_core/data/client/file.rs
1//! File operations using streaming self-encryption.
2//!
3//! Upload files directly from disk without loading them entirely into memory.
4//! Uses `stream_encrypt` to process files in 8KB chunks, encrypting and
5//! uploading each piece as it's produced.
6//!
7//! Encrypted chunks are spilled to a temporary directory during encryption
8//! so that peak memory usage is bounded to one wave (~256 MB for 64 × 4 MB
9//! chunks) regardless of file size.
10//!
11//! For in-memory data uploads, see the `data` module.
12
13use crate::data::client::adaptive::{observe_op, rebucketed_unordered};
14use crate::data::client::batch::{
15 finalize_batch_payment, PaymentIntent, PreparedChunk, WaveAggregateStats,
16};
17use crate::data::client::classify_error;
18use crate::data::client::merkle::{
19 chunk_contents_for_upload_addresses, finalize_merkle_batch, merkle_store_with_retry,
20 should_use_merkle, MerkleBatchPaymentResult, PaymentMode, PreparedMerkleBatch,
21 MERKLE_RETRY_BACKOFF, MERKLE_STORE_MAX_ATTEMPTS,
22};
23use crate::data::client::Client;
24use crate::data::error::{Error, Result};
25use ant_protocol::evm::{Amount, PaymentQuote, QuoteHash, TxHash, MAX_LEAVES};
26use ant_protocol::transport::{MultiAddr, PeerId};
27use ant_protocol::{compute_address, DATA_TYPE_CHUNK};
28use bytes::Bytes;
29use fs2::FileExt;
30use futures::stream::{self, StreamExt};
31use self_encryption::{
32 get_root_data_map_parallel, stream_decrypt_batch_size, stream_encrypt,
33 streaming_decrypt_with_batch_size, DataMap,
34};
35use std::collections::{HashMap, HashSet};
36use std::io::Write;
37use std::path::{Path, PathBuf};
38use std::sync::{Arc, Mutex};
39use tokio::runtime::Handle;
40use tokio::sync::mpsc;
41use tracing::{debug, info, warn};
42use xor_name::XorName;
43
44/// Progress events emitted during file upload for UI feedback.
45#[derive(Debug, Clone)]
46pub enum UploadEvent {
47 /// A chunk has been encrypted and spilled to disk.
48 Encrypting { chunks_done: usize },
49 /// File encryption complete.
50 Encrypted { total_chunks: usize },
51 /// Starting quote collection for a wave.
52 QuotingChunks {
53 wave: usize,
54 total_waves: usize,
55 chunks_in_wave: usize,
56 },
57 /// A chunk has been quoted (peer discovery + price received).
58 /// This is the slow phase — each quote involves network round-trips.
59 ChunkQuoted { quoted: usize, total: usize },
60 /// A chunk has been stored on the network.
61 ChunkStored { stored: usize, total: usize },
62 /// A wave has completed.
63 WaveComplete {
64 wave: usize,
65 total_waves: usize,
66 stored_so_far: usize,
67 total: usize,
68 },
69}
70
71/// Progress events emitted during file download for UI feedback.
72#[derive(Debug, Clone)]
73pub enum DownloadEvent {
74 /// Resolving hierarchical DataMap to discover real chunk count.
75 ResolvingDataMap { total_map_chunks: usize },
76 /// A DataMap chunk has been fetched during resolution.
77 MapChunkFetched { fetched: usize },
78 /// DataMap resolved — total data chunk count now known.
79 DataMapResolved { total_chunks: usize },
80 /// Data chunks are being fetched from the network.
81 ChunksFetched { fetched: usize, total: usize },
82}
83
84/// One entry in the per-chunk quote list returned by
85/// [`Client::get_store_quotes`]: the responding peer, its addresses, the
86/// signed quote it returned, and the payment amount it is demanding.
87type QuoteEntry = (PeerId, Vec<MultiAddr>, PaymentQuote, Amount);
88
89/// Number of chunks per upload wave (matches batch.rs PAYMENT_WAVE_SIZE).
90const UPLOAD_WAVE_SIZE: usize = 64;
91
92/// Stream decrypt batches should be larger than fetch fan-out so
93/// the rolling fetch scheduler can keep launching new chunk GETs as earlier
94/// ones complete, instead of stopping at each self-encryption batch boundary.
95const DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER: usize = 4;
96
97/// Use at most this fraction of currently usable RAM for one decrypt batch.
98const DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR: u64 = 4;
99
100/// A decrypt batch briefly holds encrypted chunk bytes, decrypted chunk bytes,
101/// and Vec/Bytes overhead. Use a conservative multiplier rather than assuming
102/// payload bytes alone.
103const DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER: u64 = 3;
104
105/// Maximum number of distinct chunk addresses to sample when probing for a
106/// representative quote in [`Client::estimate_upload_cost`].
107///
108/// Bounded small so we never spend more than a couple of round-trips on the
109/// `AlreadyStored` retry path, which only matters when many leading chunks
110/// of a file already live on the network.
111const ESTIMATE_SAMPLE_CAP: usize = 5;
112
113/// Gas used by one `pay_for_quotes` transaction that packs up to
114/// `UPLOAD_WAVE_SIZE` (quote_hash, rewards_address, amount) entries.
115///
116/// `batch_pay` in `batch.rs` flattens every chunk's close-group quotes into a
117/// single EVM call, so the dominant cost is the SSTOREs for each entry plus
118/// the base tx overhead. On Arbitrum that is roughly
119/// `21_000 + 64 × (20_000 + small)` ≈ 1.3M; we round up to 1.5M as a
120/// conservative per-wave upper bound.
121const GAS_PER_WAVE_TX: u128 = 1_500_000;
122
123/// Gas used by one merkle batch payment transaction.
124///
125/// One on-chain tx per merkle sub-batch, but each tx verifies a merkle tree
126/// and posts a pool commitment, so budget higher than a plain transfer.
127const GAS_PER_MERKLE_TX: u128 = 500_000;
128
129/// Advisory gas price (wei/gas) used to turn the gas estimate into an ETH
130/// figure when no live gas oracle is consulted.
131///
132/// Arbitrum One typically settles around 0.1 gwei on quiet blocks; we use
133/// that as the default so the CLI prints a sensible order-of-magnitude
134/// number. Users should treat the reported gas cost as an estimate, not a
135/// commitment — real gas is bid at submission time.
136const ARBITRUM_GAS_PRICE_WEI: u128 = 100_000_000;
137
138/// Extra headroom percentage for disk space check.
139///
140/// Encrypted chunks are slightly larger than the source data due to padding
141/// and self-encryption overhead. We require file_size + 10% free space in
142/// the temp directory to account for this.
143const DISK_SPACE_HEADROOM_PERCENT: u64 = 10;
144
145/// Temporary on-disk buffer for encrypted chunks.
146///
147/// During file encryption, chunks are written to a temp directory so that
148/// only their 32-byte addresses stay in memory. At upload time chunks are
149/// read back one wave at a time, keeping peak RAM at ~`UPLOAD_WAVE_SIZE × 4 MB`.
150/// Grace period (in seconds) before a spill dir is eligible for stale cleanup.
151///
152/// This is a small TOCTOU guard covering the sub-millisecond window inside
153/// [`ChunkSpill::new`] between `create_dir` and `try_lock_exclusive`. Once a
154/// dir is older than this and its lockfile is releasable, the owning process
155/// is gone and the dir is safe to reap — regardless of how old it is.
156///
157/// The previous policy waited 24 h before reaping any orphan, which meant
158/// that any non-graceful exit (SIGKILL, kernel OOM, panic abort) leaked its
159/// spill dir until the next day's upload — and on a host being restart-looped
160/// by systemd, orphans could fill the disk well within that window.
161const SPILL_STALE_GRACE_SECS: u64 = 30;
162
163/// Prefix for spill directory names to distinguish from user files.
164const SPILL_DIR_PREFIX: &str = "spill_";
165
166/// Lockfile name inside each spill dir to signal active use.
167const SPILL_LOCK_NAME: &str = ".lock";
168
169struct ChunkSpill {
170 /// Directory holding spilled chunk files (named by hex address).
171 dir: PathBuf,
172 /// Lockfile held for the lifetime of this spill (prevents stale cleanup).
173 _lock: std::fs::File,
174 /// Deduplicated list of chunk addresses.
175 addresses: Vec<[u8; 32]>,
176 /// Tracks seen addresses for deduplication.
177 seen: HashSet<[u8; 32]>,
178 /// Byte size per spilled chunk address.
179 sizes: HashMap<[u8; 32], u64>,
180 /// Running total of unique chunk byte sizes (for average-size calculation).
181 total_bytes: u64,
182}
183
184impl ChunkSpill {
185 /// Return the parent directory for all spill dirs: `<data_dir>/spill/`.
186 fn spill_root() -> Result<PathBuf> {
187 use crate::config;
188 let root = config::data_dir()
189 .map_err(|e| Error::Config(format!("cannot determine data dir for spill: {e}")))?
190 .join("spill");
191 Ok(root)
192 }
193
194 /// Create a new spill directory under `<data_dir>/spill/`.
195 ///
196 /// Directory name is `spill_<timestamp>_<random>` so orphans can be
197 /// identified by prefix and cleaned up by age. A lockfile inside the
198 /// dir prevents concurrent cleanup from deleting an active spill.
199 fn new() -> Result<Self> {
200 let root = Self::spill_root()?;
201 std::fs::create_dir_all(&root)?;
202
203 // Clean up stale spill dirs from previous crashed runs.
204 Self::cleanup_stale(&root);
205
206 let now = std::time::SystemTime::now()
207 .duration_since(std::time::UNIX_EPOCH)
208 .unwrap_or_default()
209 .as_secs();
210 let unique: u64 = rand::random();
211 let dir = root.join(format!("{SPILL_DIR_PREFIX}{now}_{unique}"));
212 std::fs::create_dir(&dir)?;
213
214 // Create and hold a lockfile for the lifetime of this spill.
215 // cleanup_stale() will skip dirs with locked files.
216 let lock_path = dir.join(SPILL_LOCK_NAME);
217 let lock_file = std::fs::File::create(&lock_path).map_err(|e| {
218 Error::Io(std::io::Error::new(
219 e.kind(),
220 format!("failed to create spill lockfile: {e}"),
221 ))
222 })?;
223 lock_file.try_lock_exclusive().map_err(|e| {
224 Error::Io(std::io::Error::new(
225 e.kind(),
226 format!("failed to lock spill lockfile: {e}"),
227 ))
228 })?;
229
230 Ok(Self {
231 dir,
232 _lock: lock_file,
233 addresses: Vec::new(),
234 seen: HashSet::new(),
235 sizes: HashMap::new(),
236 total_bytes: 0,
237 })
238 }
239
240 /// Clean up stale spill directories. Best-effort, errors are logged.
241 ///
242 /// A spill dir is reaped when:
243 /// 1. Its name starts with `SPILL_DIR_PREFIX` (ignores unrelated files)
244 /// 2. It is an actual directory, not a symlink (prevents symlink attacks)
245 /// 3. Its timestamp is older than `SPILL_STALE_GRACE_SECS` (TOCTOU guard)
246 /// 4. Its lockfile is releasable — i.e. no live process holds it
247 ///
248 /// The lockfile is the primary correctness gate: a releasable lock means
249 /// the owning `ChunkSpill` has been dropped or the process is gone, so
250 /// the dir is fair game. The grace period covers only the brief window
251 /// inside [`Self::new`] between `create_dir` and `try_lock_exclusive`.
252 ///
253 /// Safe to call concurrently from multiple processes.
254 fn cleanup_stale(root: &Path) {
255 let now = std::time::SystemTime::now()
256 .duration_since(std::time::UNIX_EPOCH)
257 .unwrap_or_default()
258 .as_secs();
259
260 if now == 0 {
261 // Clock is broken (before Unix epoch). Skip cleanup to avoid
262 // misidentifying dirs as stale.
263 warn!("System clock before Unix epoch, skipping spill cleanup");
264 return;
265 }
266
267 let entries = match std::fs::read_dir(root) {
268 Ok(entries) => entries,
269 Err(_) => return,
270 };
271
272 for entry in entries.flatten() {
273 let name = entry.file_name();
274 let name_str = name.to_string_lossy();
275
276 // Only process dirs with our prefix.
277 let suffix = match name_str.strip_prefix(SPILL_DIR_PREFIX) {
278 Some(s) => s,
279 None => continue,
280 };
281
282 // Parse timestamp: "spill_<timestamp>_<random>"
283 let timestamp: u64 = match suffix.split('_').next().and_then(|s| s.parse().ok()) {
284 Some(ts) => ts,
285 None => continue,
286 };
287
288 if now.saturating_sub(timestamp) < SPILL_STALE_GRACE_SECS {
289 continue;
290 }
291
292 // Safety: only delete actual directories, not symlinks.
293 let file_type = match entry.file_type() {
294 Ok(ft) => ft,
295 Err(_) => continue,
296 };
297 if !file_type.is_dir() {
298 continue;
299 }
300
301 let path = entry.path();
302
303 // Check lockfile: if locked, the dir is in active use -- skip it.
304 let lock_path = path.join(SPILL_LOCK_NAME);
305 if let Ok(lock_file) = std::fs::File::open(&lock_path) {
306 use fs2::FileExt;
307 if lock_file.try_lock_exclusive().is_err() {
308 // Lock held by another process -- dir is active.
309 debug!("Skipping active spill dir: {}", path.display());
310 continue;
311 }
312 // We acquired the lock, so no one else holds it.
313 // Drop it before deleting.
314 drop(lock_file);
315 }
316
317 info!("Cleaning up stale spill dir: {}", path.display());
318 if let Err(e) = std::fs::remove_dir_all(&path) {
319 warn!("Failed to clean up stale spill dir {}: {e}", path.display());
320 }
321 }
322 }
323
324 /// Run stale spill cleanup. Call at client startup or periodically.
325 #[allow(dead_code)]
326 pub(crate) fn run_cleanup() {
327 if let Ok(root) = Self::spill_root() {
328 Self::cleanup_stale(&root);
329 }
330 }
331
332 /// Write one encrypted chunk to disk and record its address.
333 ///
334 /// Deduplicates by content address: if the same chunk was already
335 /// spilled, the write and accounting are skipped. This prevents
336 /// double-uploads and inflated quoting metrics.
337 fn push(&mut self, content: &[u8]) -> Result<()> {
338 let address = compute_address(content);
339 if !self.seen.insert(address) {
340 return Ok(());
341 }
342 let path = self.dir.join(hex::encode(address));
343 std::fs::write(&path, content)?;
344 let content_len = content.len() as u64;
345 self.sizes.insert(address, content_len);
346 self.total_bytes += content_len;
347 self.addresses.push(address);
348 Ok(())
349 }
350
351 /// Number of chunks stored.
352 fn len(&self) -> usize {
353 self.addresses.len()
354 }
355
356 /// Total bytes of all spilled chunks.
357 fn total_bytes(&self) -> u64 {
358 self.total_bytes
359 }
360
361 /// Address and byte-size pairs for all spilled chunks.
362 fn chunk_entries(&self) -> Result<Vec<([u8; 32], u64)>> {
363 self.addresses
364 .iter()
365 .map(|address| {
366 self.sizes
367 .get(address)
368 .copied()
369 .map(|size| (*address, size))
370 .ok_or_else(|| {
371 Error::Storage(format!(
372 "missing size for spilled chunk {}",
373 hex::encode(address)
374 ))
375 })
376 })
377 .collect()
378 }
379
380 /// Read a single chunk back from disk by address.
381 fn read_chunk(&self, address: &[u8; 32]) -> Result<Bytes> {
382 let path = self.dir.join(hex::encode(address));
383 let data = std::fs::read(&path).map_err(|e| {
384 Error::Io(std::io::Error::new(
385 e.kind(),
386 format!("reading spilled chunk {}: {e}", hex::encode(address)),
387 ))
388 })?;
389 Ok(Bytes::from(data))
390 }
391
392 /// Read a wave of chunks from disk.
393 fn read_wave(&self, wave_addrs: &[[u8; 32]]) -> Result<Vec<(Bytes, [u8; 32])>> {
394 let mut out = Vec::with_capacity(wave_addrs.len());
395 for addr in wave_addrs {
396 let content = self.read_chunk(addr)?;
397 out.push((content, *addr));
398 }
399 Ok(out)
400 }
401
402 /// Clean up the spill directory.
403 fn cleanup(&self) {
404 if let Err(e) = std::fs::remove_dir_all(&self.dir) {
405 warn!(
406 "Failed to clean up chunk spill dir {}: {e}",
407 self.dir.display()
408 );
409 }
410 }
411}
412
413impl Drop for ChunkSpill {
414 fn drop(&mut self) {
415 self.cleanup();
416 }
417}
418
419fn cached_merkle_covers_addresses(
420 cached: &MerkleBatchPaymentResult,
421 addresses: &[[u8; 32]],
422) -> bool {
423 addresses
424 .iter()
425 .all(|addr| cached.proofs.contains_key(addr))
426}
427
428/// Split `addresses` into `(to_store, missing_proof)`: those that have a merkle
429/// proof in `proofs`, and those that don't.
430///
431/// A partial [`MerkleBatchPaymentResult`] (from a `pay_for_merkle_multi_batch`
432/// where a later sub-batch's payment failed) carries proofs only for the
433/// already-paid sub-batches, so unpaid chunks reach the upload path with no
434/// proof. `upload_waves_merkle` reports those as failed via
435/// [`Error::PartialUpload`] rather than aborting the whole file. Order within
436/// each group follows `addresses`.
437fn partition_addresses_by_proof(
438 addresses: &[[u8; 32]],
439 proofs: &HashMap<[u8; 32], Vec<u8>>,
440) -> (Vec<[u8; 32]>, Vec<[u8; 32]>) {
441 addresses
442 .iter()
443 .copied()
444 .partition(|addr| proofs.contains_key(addr))
445}
446
447/// Check that the spill directory has enough free space for the spilled chunks.
448///
449/// `file_size` is the source file's byte count. We require
450/// `file_size + 10%` free space to account for self-encryption overhead.
451fn check_disk_space_for_spill(file_size: u64) -> Result<()> {
452 let spill_root = ChunkSpill::spill_root()?;
453
454 // Ensure the root exists so fs2 can query it.
455 std::fs::create_dir_all(&spill_root)?;
456
457 let available = fs2::available_space(&spill_root).map_err(|e| {
458 Error::Io(std::io::Error::new(
459 e.kind(),
460 format!(
461 "failed to query disk space on {}: {e}",
462 spill_root.display()
463 ),
464 ))
465 })?;
466
467 // Use integer arithmetic to avoid f64 precision loss on large file sizes.
468 let headroom = file_size / DISK_SPACE_HEADROOM_PERCENT;
469 let required = file_size.saturating_add(headroom);
470
471 if available < required {
472 let avail_mb = available / (1024 * 1024);
473 let req_mb = required / (1024 * 1024);
474 return Err(Error::InsufficientDiskSpace(format!(
475 "need ~{req_mb} MB in spill dir ({}) but only {avail_mb} MB available",
476 spill_root.display()
477 )));
478 }
479
480 debug!(
481 "Disk space check passed: {available} bytes available, {required} bytes required (spill: {})",
482 spill_root.display()
483 );
484 Ok(())
485}
486
487fn usable_memory_bytes() -> Option<u64> {
488 let mut system = sysinfo::System::new();
489 system.refresh_memory();
490
491 let available_memory = system.available_memory();
492 let free_memory = system.free_memory();
493 let used_memory = system.used_memory();
494 let total_memory = system.total_memory();
495 let unused_memory = total_memory.saturating_sub(used_memory);
496
497 let mut usable = [available_memory, free_memory, unused_memory]
498 .into_iter()
499 .filter(|bytes| *bytes > 0)
500 .max();
501
502 let cgroup_free_memory = system
503 .cgroup_limits()
504 .filter(|limits| limits.total_memory > 0)
505 .map(|limits| limits.free_memory);
506 if let Some(cgroup_free_memory) = cgroup_free_memory {
507 usable = Some(usable.unwrap_or(u64::MAX).min(cgroup_free_memory));
508 }
509
510 debug!(
511 available_memory,
512 free_memory,
513 used_memory,
514 total_memory,
515 cgroup_free_memory,
516 usable_memory = ?usable,
517 "Detected usable memory for stream decrypt batch sizing"
518 );
519
520 usable
521}
522
523fn stream_decrypt_batch_memory_cap(usable_memory_bytes: u64) -> usize {
524 let budget = usable_memory_bytes / DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR;
525 let estimated_bytes_per_chunk = (self_encryption::MAX_CHUNK_SIZE as u64)
526 .saturating_mul(DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER)
527 .max(1);
528 let cap = (budget / estimated_bytes_per_chunk).max(1);
529
530 usize::try_from(cap).unwrap_or(usize::MAX)
531}
532
533fn adaptive_stream_decrypt_batch_size(
534 total_chunks: usize,
535 fetch_cap: usize,
536 configured_batch_floor: usize,
537 usable_memory_bytes: Option<u64>,
538) -> usize {
539 let fetch_target = fetch_cap
540 .max(1)
541 .saturating_mul(DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER);
542 let requested = match usable_memory_bytes {
543 Some(bytes) => {
544 let memory_cap = stream_decrypt_batch_memory_cap(bytes);
545 configured_batch_floor
546 .max(fetch_target)
547 .max(1)
548 .min(memory_cap)
549 }
550 None => configured_batch_floor.max(1),
551 };
552
553 requested.min(total_chunks.max(1)).max(1)
554}
555
556/// Whether the data map is published to the network for address-based retrieval.
557///
558/// A private upload stores only the data chunks and returns the `DataMap` to
559/// the caller — only someone holding that `DataMap` can reconstruct the file.
560/// A public upload additionally stores the serialized `DataMap` as a chunk on
561/// the network, yielding a single chunk address that anyone can use to
562/// retrieve the `DataMap` (via [`Client::data_map_fetch`]) and then the file.
563#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
564pub enum Visibility {
565 /// Keep the data map local; only the holder can retrieve the file.
566 #[default]
567 Private,
568 /// Publish the data map as a network chunk so anyone with the returned
569 /// address can retrieve and decrypt the file.
570 Public,
571}
572
573/// Estimated cost of uploading a file, returned by
574/// [`Client::estimate_upload_cost`].
575#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
576pub struct UploadCostEstimate {
577 /// Original file size in bytes.
578 pub file_size: u64,
579 /// Number of chunks the file would be split into (data chunks only,
580 /// does not include the DataMap chunk added during public uploads).
581 pub chunk_count: usize,
582 /// Estimated total storage cost in atto (token smallest unit).
583 pub storage_cost_atto: String,
584 /// Estimated gas cost in wei as a string. This is a rough heuristic
585 /// based on chunk count and payment mode, NOT a live gas price query.
586 pub estimated_gas_cost_wei: String,
587 /// Payment mode that would be used.
588 pub payment_mode: PaymentMode,
589}
590
591/// Result of a file upload: the `DataMap` needed to retrieve the file.
592///
593/// Marked `#[non_exhaustive]` so adding a new field in future is not a
594/// breaking change for downstream consumers that construct or pattern-match
595/// on this struct.
596#[derive(Debug, Clone)]
597#[non_exhaustive]
598pub struct FileUploadResult {
599 /// The data map containing chunk metadata for reconstruction.
600 pub data_map: DataMap,
601 /// Number of chunks stored on the network.
602 pub chunks_stored: usize,
603 /// Number of chunks that failed to store. Always 0 for a successful
604 /// upload — partial-failure information is conveyed via
605 /// [`crate::data::Error::PartialUpload`] instead.
606 pub chunks_failed: usize,
607 /// Total number of chunks in the upload, including chunks that were
608 /// already stored and skipped. On full success this equals `chunks_stored`.
609 pub total_chunks: usize,
610 /// Which payment mode was actually used (not just requested).
611 pub payment_mode_used: PaymentMode,
612 /// Total storage cost paid in token units (atto). "0" if all chunks already existed.
613 pub storage_cost_atto: String,
614 /// Total gas cost in wei. 0 if no on-chain transactions were made.
615 pub gas_cost_wei: u128,
616 /// Chunk address of the serialized `DataMap`, set only for
617 /// [`Visibility::Public`] uploads. **`Some` means this address is
618 /// retrievable from the network (via [`Client::data_map_fetch`])**, not
619 /// necessarily that *this* upload paid to store it — if the serialized
620 /// `DataMap` hashed to a chunk that was already on the network (same
621 /// file uploaded before; deterministic via self-encryption), the address
622 /// is still returned but no storage payment was made for it.
623 pub data_map_address: Option<[u8; 32]>,
624 /// Sum of chunk-store RPC attempts across the upload
625 /// (`>= chunks_stored` on full success; more if any chunk retried).
626 /// `0` for paths that don't run the wave store loop.
627 pub chunk_attempts_total: usize,
628 /// Per-chunk store wall-clock in ms (length == `chunks_stored` on full
629 /// success, empty for paths that don't run the wave store loop).
630 pub store_durations_ms: Vec<u64>,
631 /// Count of stored chunks that succeeded on each retry round
632 /// (index 0 = first attempt, 1 = first retry, etc.). All zeros for
633 /// paths that don't run the wave store loop.
634 pub retries_histogram: [usize; 4],
635}
636
637/// Payment information for external signing — either wave-batch or merkle.
638#[derive(Debug)]
639pub enum ExternalPaymentInfo {
640 /// Wave-batch: individual (quote_hash, rewards_address, amount) tuples.
641 WaveBatch {
642 /// Chunks ready for payment (needed for finalize).
643 prepared_chunks: Vec<PreparedChunk>,
644 /// Payment intent for external signing.
645 payment_intent: PaymentIntent,
646 },
647 /// Merkle: single on-chain call with depth, pool commitments, timestamp.
648 Merkle {
649 /// The prepared merkle batch (public fields sent to frontend, private fields stay in Rust).
650 prepared_batch: PreparedMerkleBatch,
651 /// Raw chunk contents that still need upload after the preflight check.
652 chunk_contents: Vec<Bytes>,
653 /// Chunk addresses that still need upload after the preflight check.
654 chunk_addresses: Vec<[u8; 32]>,
655 },
656}
657
658/// Prepared upload ready for external payment.
659///
660/// Contains everything needed to construct the on-chain payment transaction
661/// externally (e.g. via WalletConnect in a desktop app) and then finalize
662/// the upload without a Rust-side wallet.
663///
664/// Note: This struct stays in Rust memory — only the public fields of
665/// `payment_info` are sent to the frontend. `PreparedChunk` contains
666/// non-serializable network types, so the full struct cannot derive `Serialize`.
667///
668/// Marked `#[non_exhaustive]` so adding a new field in future is not a
669/// breaking change for downstream consumers.
670#[derive(Debug)]
671#[non_exhaustive]
672pub struct PreparedUpload {
673 /// The data map for later retrieval.
674 pub data_map: DataMap,
675 /// Payment information for chunks that still need payment after the
676 /// already-stored preflight. This may be wave-batch even when the original
677 /// chunk count was merkle-eligible if the remaining count is below the
678 /// merkle threshold.
679 pub payment_info: ExternalPaymentInfo,
680 /// Chunk address of the serialized `DataMap` when this upload was
681 /// prepared with [`Visibility::Public`]. `Some` means the address is
682 /// retrievable on the network after finalization — either because this
683 /// upload paid to store the chunk in `payment_info`, or because the
684 /// chunk was already on the network (deterministic self-encryption).
685 /// Carried through to [`FileUploadResult::data_map_address`].
686 pub data_map_address: Option<[u8; 32]>,
687 /// Chunk addresses already present on the network when this upload was
688 /// prepared. These do not require payment or PUT during finalization.
689 pub already_stored_addresses: Vec<[u8; 32]>,
690 /// Total chunk count for the upload, including already-stored chunks.
691 pub total_chunks: usize,
692}
693
694/// Return type for [`spawn_file_encryption`]: chunk receiver, `DataMap` oneshot, join handle.
695type EncryptionChannels = (
696 tokio::sync::mpsc::Receiver<Bytes>,
697 tokio::sync::oneshot::Receiver<DataMap>,
698 tokio::task::JoinHandle<Result<()>>,
699);
700
701/// Spawn a blocking task that streams file encryption through a channel.
702fn spawn_file_encryption(path: PathBuf) -> Result<EncryptionChannels> {
703 let metadata = std::fs::metadata(&path)?;
704 let data_size = usize::try_from(metadata.len())
705 .map_err(|e| Error::Encryption(format!("file size exceeds platform usize: {e}")))?;
706
707 let (chunk_tx, chunk_rx) = tokio::sync::mpsc::channel(2);
708 let (datamap_tx, datamap_rx) = tokio::sync::oneshot::channel();
709
710 let handle = tokio::task::spawn_blocking(move || {
711 let file = std::fs::File::open(&path)?;
712 let mut reader = std::io::BufReader::new(file);
713
714 let read_error: Arc<Mutex<Option<std::io::Error>>> = Arc::new(Mutex::new(None));
715 let read_error_clone = Arc::clone(&read_error);
716
717 let data_iter = std::iter::from_fn(move || {
718 let mut buffer = vec![0u8; 8192];
719 match std::io::Read::read(&mut reader, &mut buffer) {
720 Ok(0) => None,
721 Ok(n) => {
722 buffer.truncate(n);
723 Some(Bytes::from(buffer))
724 }
725 Err(e) => {
726 let mut guard = read_error_clone
727 .lock()
728 .unwrap_or_else(|poisoned| poisoned.into_inner());
729 *guard = Some(e);
730 None
731 }
732 }
733 });
734
735 let mut stream = stream_encrypt(data_size, data_iter)
736 .map_err(|e| Error::Encryption(format!("stream_encrypt failed: {e}")))?;
737
738 for chunk_result in stream.chunks() {
739 // Check for captured read errors immediately after each chunk.
740 // stream_encrypt sees None (EOF) when a read fails, so it stops
741 // producing chunks. We must detect this before sending the
742 // partial results to avoid uploading a truncated DataMap.
743 {
744 let guard = read_error
745 .lock()
746 .unwrap_or_else(|poisoned| poisoned.into_inner());
747 if let Some(ref e) = *guard {
748 return Err(Error::Io(std::io::Error::new(e.kind(), e.to_string())));
749 }
750 }
751
752 let (_hash, content) = chunk_result
753 .map_err(|e| Error::Encryption(format!("chunk encryption failed: {e}")))?;
754 if chunk_tx.blocking_send(content).is_err() {
755 return Err(Error::Encryption("upload receiver dropped".to_string()));
756 }
757 }
758
759 // Final check: read error after last chunk (stream saw EOF).
760 {
761 let guard = read_error
762 .lock()
763 .unwrap_or_else(|poisoned| poisoned.into_inner());
764 if let Some(ref e) = *guard {
765 return Err(Error::Io(std::io::Error::new(e.kind(), e.to_string())));
766 }
767 }
768
769 let datamap = stream
770 .into_datamap()
771 .ok_or_else(|| Error::Encryption("no DataMap after encryption".to_string()))?;
772 if datamap_tx.send(datamap).is_err() {
773 warn!("DataMap receiver dropped — upload may have been cancelled");
774 }
775 Ok(())
776 });
777
778 Ok((chunk_rx, datamap_rx, handle))
779}
780
781impl Client {
782 /// Upload a file to the network using streaming self-encryption.
783 ///
784 /// Automatically selects merkle batch payment for files that produce
785 /// 64+ chunks (saves gas). Encrypted chunks are spilled to a temp
786 /// directory so peak memory stays at ~256 MB regardless of file size.
787 ///
788 /// # Errors
789 ///
790 /// Returns an error if the file cannot be read, encryption fails,
791 /// or any chunk cannot be stored.
792 pub async fn file_upload(&self, path: &Path) -> Result<FileUploadResult> {
793 self.file_upload_with_mode(path, PaymentMode::Auto).await
794 }
795
796 /// Estimate the cost of uploading a file without actually uploading.
797 ///
798 /// Encrypts the file to determine chunk count and sizes, then requests
799 /// a single quote from the network for a representative chunk. The
800 /// per-chunk price is extrapolated to the total chunk count.
801 ///
802 /// The estimate is fast (~2-5s) and does not require a wallet. Spilled
803 /// chunks are cleaned up automatically when the function returns.
804 ///
805 /// Gas cost is an advisory heuristic, not a live gas-oracle query. It is
806 /// derived from realistic per-transaction budgets (`GAS_PER_WAVE_TX`,
807 /// `GAS_PER_MERKLE_TX`) priced at `ARBITRUM_GAS_PRICE_WEI`. Real gas
808 /// varies with network conditions.
809 ///
810 /// If the first sampled chunk is already stored on the network, the
811 /// function retries with subsequent chunk addresses (up to
812 /// `ESTIMATE_SAMPLE_CAP`). If every sampled address reports stored,
813 /// a [`Error::CostEstimationInconclusive`] is returned so callers can
814 /// decide how to react rather than trust a bogus "free" estimate. Only
815 /// when every address in the file is stored do we return a zero-cost
816 /// estimate.
817 ///
818 /// # Errors
819 ///
820 /// Returns an error if the file cannot be read, encryption fails,
821 /// the network cannot provide a quote, or every sampled chunk is
822 /// already stored ([`Error::CostEstimationInconclusive`]).
823 pub async fn estimate_upload_cost(
824 &self,
825 path: &Path,
826 mode: PaymentMode,
827 progress: Option<mpsc::Sender<UploadEvent>>,
828 ) -> Result<UploadCostEstimate> {
829 let file_size = std::fs::metadata(path).map_err(Error::Io)?.len();
830
831 if file_size < 3 {
832 return Err(Error::InvalidData(
833 "File too small: self-encryption requires at least 3 bytes".into(),
834 ));
835 }
836
837 check_disk_space_for_spill(file_size)?;
838
839 info!(
840 "Estimating upload cost for {} ({file_size} bytes)",
841 path.display()
842 );
843
844 let (spill, _data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
845 let chunk_count = spill.len();
846
847 if let Some(ref tx) = progress {
848 let _ = tx
849 .send(UploadEvent::Encrypted {
850 total_chunks: chunk_count,
851 })
852 .await;
853 }
854
855 info!("Encrypted into {chunk_count} chunks, requesting quote");
856
857 // Sample up to ESTIMATE_SAMPLE_CAP distinct chunk addresses. A single
858 // AlreadyStored result says nothing about the rest of the file — the
859 // first chunk is often a DataMap-adjacent chunk that collides with
860 // prior uploads even when 99% of the file is new. Only treat the
861 // whole file as "fully stored" when every sample comes back stored.
862 let sample_limit = spill.addresses.len().min(ESTIMATE_SAMPLE_CAP);
863 let mut sampled = 0usize;
864 let mut all_already_stored = true;
865 let mut quotes_opt: Option<Vec<QuoteEntry>> = None;
866
867 for addr in spill.addresses.iter().take(sample_limit) {
868 sampled += 1;
869 let chunk_bytes = spill.read_chunk(addr)?;
870 let data_size = u64::try_from(chunk_bytes.len())
871 .map_err(|e| Error::InvalidData(format!("chunk size too large: {e}")))?;
872 match self
873 .get_store_quotes(addr, data_size, DATA_TYPE_CHUNK)
874 .await
875 {
876 Ok(q) => {
877 quotes_opt = Some(q);
878 all_already_stored = false;
879 break;
880 }
881 Err(Error::AlreadyStored) => {
882 debug!(
883 "Sample chunk {} already stored; trying next address ({sampled}/{sample_limit})",
884 hex::encode(addr)
885 );
886 continue;
887 }
888 Err(e) => return Err(e),
889 }
890 }
891
892 let uses_merkle = should_use_merkle(chunk_count, mode);
893
894 let quotes = match quotes_opt {
895 Some(q) => q,
896 None if all_already_stored && sampled == chunk_count => {
897 // Every address in the file was sampled and every one is
898 // already on the network — returning a zero-cost estimate is
899 // accurate in this case.
900 info!("All {chunk_count} chunks already stored; returning zero-cost estimate");
901 return Ok(UploadCostEstimate {
902 file_size,
903 chunk_count,
904 storage_cost_atto: "0".into(),
905 estimated_gas_cost_wei: "0".into(),
906 payment_mode: if uses_merkle {
907 PaymentMode::Merkle
908 } else {
909 PaymentMode::Single
910 },
911 });
912 }
913 None => {
914 return Err(Error::CostEstimationInconclusive(format!(
915 "sampled {sampled} chunk addresses out of {chunk_count} and every \
916 one reported AlreadyStored; cannot infer a representative price \
917 for the remaining chunks"
918 )));
919 }
920 };
921
922 // Use the median price × 3 (matches SingleNodePayment::from_quotes
923 // which pays 3x the median to incentivize reliable storage).
924 let mut prices: Vec<Amount> = quotes.iter().map(|(_, _, _, price)| *price).collect();
925 prices.sort();
926 let median_price = prices
927 .get(prices.len() / 2)
928 .copied()
929 .unwrap_or(Amount::ZERO);
930 let per_chunk_cost = median_price * Amount::from(3u64);
931
932 let chunk_count_u64 = u64::try_from(chunk_count).unwrap_or(u64::MAX);
933 let total_storage = per_chunk_cost * Amount::from(chunk_count_u64);
934
935 // Estimate gas cost from realistic per-transaction budgets rather
936 // than a flat per-chunk or per-wave number.
937 //
938 // - Single mode: `batch_pay` packs up to UPLOAD_WAVE_SIZE chunks'
939 // close-group quotes into one `pay_for_quotes` call on Arbitrum.
940 // The dominant cost is one SSTORE per entry plus base tx overhead,
941 // so we use GAS_PER_WAVE_TX (≈1.5M) as a conservative upper bound
942 // on a full wave and multiply by the number of waves. The previous
943 // per-wave figure of 150k was closer to a single-entry transfer
944 // and understated cost by 5–10x for full waves.
945 // - Merkle mode: one tx per sub-batch that verifies a merkle tree
946 // and posts a pool commitment (GAS_PER_MERKLE_TX ≈ 500k each).
947 //
948 // Gas is priced at ARBITRUM_GAS_PRICE_WEI (~0.1 gwei, a typical
949 // Arbitrum baseline). Treat the result as advisory, not a commitment.
950 let waves = u128::try_from(chunk_count.div_ceil(UPLOAD_WAVE_SIZE)).unwrap_or(u128::MAX);
951 let merkle_batches = u128::try_from(chunk_count.div_ceil(MAX_LEAVES)).unwrap_or(u128::MAX);
952 let estimated_gas: u128 = if uses_merkle {
953 merkle_batches
954 .saturating_mul(GAS_PER_MERKLE_TX)
955 .saturating_mul(ARBITRUM_GAS_PRICE_WEI)
956 } else {
957 waves
958 .saturating_mul(GAS_PER_WAVE_TX)
959 .saturating_mul(ARBITRUM_GAS_PRICE_WEI)
960 };
961
962 info!(
963 "Estimate: {chunk_count} chunks, storage={total_storage} atto, gas~={estimated_gas} wei"
964 );
965
966 Ok(UploadCostEstimate {
967 file_size,
968 chunk_count,
969 storage_cost_atto: total_storage.to_string(),
970 estimated_gas_cost_wei: estimated_gas.to_string(),
971 payment_mode: if uses_merkle {
972 PaymentMode::Merkle
973 } else {
974 PaymentMode::Single
975 },
976 })
977 }
978
979 /// Phase 1 of external-signer upload: encrypt file and prepare chunks.
980 ///
981 /// Equivalent to [`Client::file_prepare_upload_with_visibility`] with
982 /// [`Visibility::Private`] — see that method for details.
983 pub async fn file_prepare_upload(&self, path: &Path) -> Result<PreparedUpload> {
984 self.file_prepare_upload_with_progress(path, Visibility::Private, None)
985 .await
986 }
987
988 /// Phase 1 of external-signer upload with explicit [`Visibility`] control.
989 ///
990 /// Equivalent to [`Client::file_prepare_upload_with_progress`] with
991 /// `progress: None` — see that method for details.
992 pub async fn file_prepare_upload_with_visibility(
993 &self,
994 path: &Path,
995 visibility: Visibility,
996 ) -> Result<PreparedUpload> {
997 self.file_prepare_upload_with_progress(path, visibility, None)
998 .await
999 }
1000
1001 /// Phase 1 of external-signer upload with progress events.
1002 ///
1003 /// Requires an EVM network (for contract price queries) but NOT a wallet.
1004 /// Returns a [`PreparedUpload`] containing the data map, prepared chunks,
1005 /// and a [`PaymentIntent`] that the external signer uses to construct
1006 /// and submit the on-chain payment transaction.
1007 ///
1008 /// When `visibility` is [`Visibility::Public`], the serialized `DataMap`
1009 /// is bundled into the payment batch as an additional chunk and its
1010 /// address is recorded on the returned [`PreparedUpload`]. After
1011 /// [`Client::finalize_upload`] (or `_merkle`) succeeds, that address is
1012 /// surfaced via [`FileUploadResult::data_map_address`] so the uploader
1013 /// can share a single address from which anyone can retrieve the file.
1014 ///
1015 /// When `progress` is `Some`, [`UploadEvent`]s are emitted on the channel
1016 /// during encryption ([`UploadEvent::Encrypting`] / [`UploadEvent::Encrypted`])
1017 /// and per-chunk quoting ([`UploadEvent::ChunkQuoted`]). Storage events are
1018 /// emitted later by [`Client::finalize_upload_with_progress`] /
1019 /// [`Client::finalize_upload_merkle_with_progress`].
1020 ///
1021 /// **Memory note:** Encryption uses disk spilling for bounded memory, but
1022 /// the returned [`PreparedUpload`] holds all chunk content in memory (each
1023 /// [`PreparedChunk`] contains a `Bytes` with the full chunk data). This is
1024 /// inherent to the two-phase external-signer protocol — the chunks must
1025 /// stay in memory until [`Client::finalize_upload`] stores them. For very
1026 /// large files, prefer [`Client::file_upload`] which streams directly.
1027 ///
1028 /// # Errors
1029 ///
1030 /// Returns an error if there is insufficient disk space, the file cannot
1031 /// be read, encryption fails, or quote collection fails.
1032 pub async fn file_prepare_upload_with_progress(
1033 &self,
1034 path: &Path,
1035 visibility: Visibility,
1036 progress: Option<mpsc::Sender<UploadEvent>>,
1037 ) -> Result<PreparedUpload> {
1038 debug!(
1039 "Preparing file upload for external signing (visibility={visibility:?}): {}",
1040 path.display()
1041 );
1042
1043 let file_size = std::fs::metadata(path)?.len();
1044 check_disk_space_for_spill(file_size)?;
1045
1046 let (spill, data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
1047
1048 info!(
1049 "Encrypted {} into {} chunks for external signing (spilled to disk)",
1050 path.display(),
1051 spill.len()
1052 );
1053
1054 // Read each chunk from disk and collect quotes concurrently.
1055 // Note: all PreparedChunks accumulate in memory because the external-signer
1056 // protocol requires them for finalize_upload. NOT memory-bounded for large files.
1057 let mut chunk_data: Vec<Bytes> = spill
1058 .addresses
1059 .iter()
1060 .map(|addr| spill.read_chunk(addr))
1061 .collect::<std::result::Result<Vec<_>, _>>()?;
1062
1063 // For public uploads, bundle the serialized DataMap as an extra chunk
1064 // in the same payment batch. This lets the external signer pay for
1065 // the data chunks and the DataMap chunk in one flow, and lets the
1066 // finalize step return the DataMap's chunk address as the shareable
1067 // retrieval address.
1068 let data_map_address = match visibility {
1069 Visibility::Private => None,
1070 Visibility::Public => {
1071 let serialized = rmp_serde::to_vec(&data_map).map_err(|e| {
1072 Error::Serialization(format!("Failed to serialize DataMap: {e}"))
1073 })?;
1074 let bytes = Bytes::from(serialized);
1075 let address = compute_address(&bytes);
1076 info!(
1077 "Public upload: bundling DataMap chunk ({} bytes) at address {}",
1078 bytes.len(),
1079 hex::encode(address)
1080 );
1081 chunk_data.push(bytes);
1082 Some(address)
1083 }
1084 };
1085
1086 let chunk_count = chunk_data.len();
1087
1088 if let Some(ref tx) = progress {
1089 let _ = tx
1090 .send(UploadEvent::Encrypted {
1091 total_chunks: chunk_count,
1092 })
1093 .await;
1094 }
1095
1096 let (payment_info, already_stored_addresses) = if should_use_merkle(
1097 chunk_count,
1098 PaymentMode::Auto,
1099 ) {
1100 // Merkle path: build tree, collect candidate pools, return for external payment.
1101 info!("Using merkle batch preparation for {chunk_count} file chunks");
1102
1103 let chunk_entries: Vec<([u8; 32], u64)> = chunk_data
1104 .iter()
1105 .map(|chunk| {
1106 let size = u64::try_from(chunk.len())
1107 .map_err(|e| Error::InvalidData(format!("chunk size too large: {e}")))?;
1108 Ok((compute_address(chunk), size))
1109 })
1110 .collect::<Result<Vec<_>>>()?;
1111
1112 let merkle_plan = self
1113 .plan_merkle_upload(chunk_entries, DATA_TYPE_CHUNK, progress.as_ref())
1114 .await?;
1115
1116 if merkle_plan.to_upload.is_empty() {
1117 info!("All {chunk_count} file chunks already stored; no external payment needed");
1118 (
1119 ExternalPaymentInfo::WaveBatch {
1120 prepared_chunks: Vec::new(),
1121 payment_intent: PaymentIntent::from_prepared_chunks(&[]),
1122 },
1123 merkle_plan.already_stored,
1124 )
1125 } else {
1126 let chunk_data =
1127 chunk_contents_for_upload_addresses(chunk_data, &merkle_plan.to_upload)?;
1128
1129 if !should_use_merkle(merkle_plan.to_upload.len(), PaymentMode::Auto) {
1130 info!(
1131 "{} file chunks need upload after merkle preflight; preparing wave-batch payment",
1132 merkle_plan.to_upload.len()
1133 );
1134 let (payment_info, mut wave_already_stored) = self
1135 .prepare_wave_batch_external_chunks(
1136 chunk_data,
1137 progress.as_ref(),
1138 chunk_count,
1139 )
1140 .await?;
1141 let mut already_stored = merkle_plan.already_stored;
1142 already_stored.append(&mut wave_already_stored);
1143 (payment_info, already_stored)
1144 } else {
1145 match self
1146 .prepare_merkle_batch_external(
1147 &merkle_plan.to_upload,
1148 DATA_TYPE_CHUNK,
1149 merkle_plan.to_upload_avg_size(),
1150 )
1151 .await
1152 {
1153 Ok(prepared_batch) => {
1154 info!(
1155 "File prepared for external merkle signing: {} chunks, depth={} ({})",
1156 merkle_plan.to_upload.len(),
1157 prepared_batch.depth,
1158 path.display()
1159 );
1160
1161 (
1162 ExternalPaymentInfo::Merkle {
1163 prepared_batch,
1164 chunk_contents: chunk_data,
1165 chunk_addresses: merkle_plan.to_upload,
1166 },
1167 merkle_plan.already_stored,
1168 )
1169 }
1170 Err(Error::InsufficientPeers(ref msg)) => {
1171 info!(
1172 "External merkle preparation needs more peers ({msg}); preparing wave-batch payment"
1173 );
1174 let (payment_info, mut wave_already_stored) = self
1175 .prepare_wave_batch_external_chunks(
1176 chunk_data,
1177 progress.as_ref(),
1178 chunk_count,
1179 )
1180 .await?;
1181 let mut already_stored = merkle_plan.already_stored;
1182 already_stored.append(&mut wave_already_stored);
1183 (payment_info, already_stored)
1184 }
1185 Err(e) => return Err(e),
1186 }
1187 }
1188 }
1189 } else {
1190 self.prepare_wave_batch_external_chunks(chunk_data, progress.as_ref(), chunk_count)
1191 .await?
1192 };
1193
1194 // Surface the "DataMap chunk was already on the network" case
1195 // so debugging "why is data_map_address set but no storage cost
1196 // appears for it?" doesn't require reading the source. See the
1197 // `data_map_address` doc comment for why this is still a valid
1198 // `Some(addr)` outcome.
1199 if let Some(addr) = data_map_address {
1200 let data_map_needs_payment = match &payment_info {
1201 ExternalPaymentInfo::WaveBatch {
1202 prepared_chunks, ..
1203 } => prepared_chunks.iter().any(|c| c.address == addr),
1204 ExternalPaymentInfo::Merkle {
1205 chunk_addresses, ..
1206 } => chunk_addresses.contains(&addr),
1207 };
1208 if !data_map_needs_payment {
1209 info!(
1210 "Public upload: DataMap chunk {} was already stored \
1211 on the network — address is retrievable without a \
1212 new payment",
1213 hex::encode(addr)
1214 );
1215 }
1216 }
1217
1218 Ok(PreparedUpload {
1219 data_map,
1220 payment_info,
1221 data_map_address,
1222 already_stored_addresses,
1223 total_chunks: chunk_count,
1224 })
1225 }
1226
1227 async fn prepare_wave_batch_external_chunks(
1228 &self,
1229 chunk_data: Vec<Bytes>,
1230 progress: Option<&mpsc::Sender<UploadEvent>>,
1231 progress_total: usize,
1232 ) -> Result<(ExternalPaymentInfo, Vec<[u8; 32]>)> {
1233 let chunk_count = chunk_data.len();
1234 let chunks_with_addr: Vec<(Bytes, [u8; 32])> = chunk_data
1235 .into_iter()
1236 .map(|content| {
1237 let address = compute_address(&content);
1238 (content, address)
1239 })
1240 .collect();
1241
1242 // Wave-batch path: collect quotes per chunk concurrently, emitting
1243 // a `ChunkQuoted` event after each completion so callers can drive
1244 // a progress bar through the slow quote phase.
1245 let quote_limiter = self.controller().quote.clone();
1246 let quote_concurrency = quote_limiter.current().min(chunk_count.max(1));
1247 let mut quote_stream = stream::iter(chunks_with_addr)
1248 .map(|(content, address)| {
1249 let limiter = quote_limiter.clone();
1250 async move {
1251 let result = observe_op(
1252 &limiter,
1253 || async move { self.prepare_chunk_payment(content).await },
1254 classify_error,
1255 )
1256 .await;
1257 (address, result)
1258 }
1259 })
1260 .buffer_unordered(quote_concurrency);
1261
1262 let mut prepared_chunks = Vec::with_capacity(chunk_count);
1263 let mut already_stored = Vec::new();
1264 let mut quoted = 0usize;
1265 while let Some((address, result)) = quote_stream.next().await {
1266 match result? {
1267 Some(prepared) => prepared_chunks.push(prepared),
1268 None => already_stored.push(address),
1269 }
1270 quoted += 1;
1271 if let Some(tx) = progress {
1272 let _ = tx.try_send(UploadEvent::ChunkQuoted {
1273 quoted,
1274 total: progress_total,
1275 });
1276 }
1277 }
1278
1279 let payment_intent = PaymentIntent::from_prepared_chunks(&prepared_chunks);
1280 info!(
1281 "Prepared external wave-batch payment: {} chunks, {} already stored, total {} atto",
1282 prepared_chunks.len(),
1283 already_stored.len(),
1284 payment_intent.total_amount,
1285 );
1286
1287 Ok((
1288 ExternalPaymentInfo::WaveBatch {
1289 prepared_chunks,
1290 payment_intent,
1291 },
1292 already_stored,
1293 ))
1294 }
1295
1296 /// Phase 2 of external-signer upload (wave-batch): finalize with externally-signed tx hashes.
1297 ///
1298 /// Takes a [`PreparedUpload`] that used wave-batch payment and a map
1299 /// of `quote_hash -> tx_hash` provided by the external signer after on-chain
1300 /// payment. Builds payment proofs and stores chunks on the network.
1301 ///
1302 /// # Errors
1303 ///
1304 /// Returns an error if the prepared upload used merkle payment (use
1305 /// [`Client::finalize_upload_merkle`] instead), proof construction fails,
1306 /// or any chunk cannot be stored.
1307 pub async fn finalize_upload(
1308 &self,
1309 prepared: PreparedUpload,
1310 tx_hash_map: &HashMap<QuoteHash, TxHash>,
1311 ) -> Result<FileUploadResult> {
1312 self.finalize_upload_with_progress(prepared, tx_hash_map, None)
1313 .await
1314 }
1315
1316 /// Phase 2 of external-signer upload (wave-batch) with progress events.
1317 ///
1318 /// Same as [`Client::finalize_upload`] but emits [`UploadEvent::ChunkStored`]
1319 /// on the provided channel as each chunk is successfully stored.
1320 ///
1321 /// # Errors
1322 ///
1323 /// Same as [`Client::finalize_upload`].
1324 pub async fn finalize_upload_with_progress(
1325 &self,
1326 prepared: PreparedUpload,
1327 tx_hash_map: &HashMap<QuoteHash, TxHash>,
1328 progress: Option<mpsc::Sender<UploadEvent>>,
1329 ) -> Result<FileUploadResult> {
1330 let data_map_address = prepared.data_map_address;
1331 let already_stored_addresses = prepared.already_stored_addresses;
1332 let already_stored_count = already_stored_addresses.len();
1333 let total_chunks = prepared.total_chunks;
1334 match prepared.payment_info {
1335 ExternalPaymentInfo::WaveBatch {
1336 prepared_chunks,
1337 payment_intent: _,
1338 } => {
1339 let paid_chunks = finalize_batch_payment(prepared_chunks, tx_hash_map)?;
1340 let wave_result = self
1341 .store_paid_chunks_with_events(
1342 paid_chunks,
1343 progress.as_ref(),
1344 already_stored_count,
1345 total_chunks,
1346 )
1347 .await;
1348 if !wave_result.failed.is_empty() {
1349 let failed_count = wave_result.failed.len();
1350 let stored_count = already_stored_count + wave_result.stored.len();
1351 let mut stored = already_stored_addresses;
1352 stored.extend(wave_result.stored);
1353 return Err(Error::PartialUpload {
1354 stored,
1355 stored_count,
1356 failed: wave_result.failed,
1357 failed_count,
1358 total_chunks,
1359 reason: "finalize_upload: chunk storage failed after retries".into(),
1360 });
1361 }
1362 let chunks_stored = already_stored_count + wave_result.stored.len();
1363
1364 info!("External-signer upload finalized: {chunks_stored} chunks stored");
1365
1366 let mut stats = WaveAggregateStats::default();
1367 stats.absorb(&wave_result);
1368
1369 Ok(FileUploadResult {
1370 data_map: prepared.data_map,
1371 chunks_stored,
1372 chunks_failed: 0,
1373 total_chunks,
1374 payment_mode_used: PaymentMode::Single,
1375 storage_cost_atto: "0".into(),
1376 gas_cost_wei: 0,
1377 data_map_address,
1378 chunk_attempts_total: stats.chunk_attempts_total,
1379 store_durations_ms: stats.store_durations_ms,
1380 retries_histogram: stats.retries_histogram,
1381 })
1382 }
1383 ExternalPaymentInfo::Merkle { .. } => Err(Error::Payment(
1384 "Cannot finalize merkle upload with wave-batch tx hashes. \
1385 Use finalize_upload_merkle() instead."
1386 .to_string(),
1387 )),
1388 }
1389 }
1390
1391 /// Phase 2 of external-signer upload (merkle): finalize with winner pool hash.
1392 ///
1393 /// Takes a [`PreparedUpload`] that used merkle payment and the `winner_pool_hash`
1394 /// returned by the on-chain merkle payment transaction. Generates proofs and
1395 /// stores chunks on the network.
1396 ///
1397 /// # Errors
1398 ///
1399 /// Returns an error if the prepared upload used wave-batch payment (use
1400 /// [`Client::finalize_upload`] instead), proof generation fails,
1401 /// or any chunk cannot be stored.
1402 pub async fn finalize_upload_merkle(
1403 &self,
1404 prepared: PreparedUpload,
1405 winner_pool_hash: [u8; 32],
1406 ) -> Result<FileUploadResult> {
1407 self.finalize_upload_merkle_with_progress(prepared, winner_pool_hash, None)
1408 .await
1409 }
1410
1411 /// Phase 2 of external-signer upload (merkle) with progress events.
1412 ///
1413 /// Same as [`Client::finalize_upload_merkle`] but emits [`UploadEvent::ChunkStored`]
1414 /// on the provided channel as each chunk is successfully stored.
1415 ///
1416 /// # Errors
1417 ///
1418 /// Same as [`Client::finalize_upload_merkle`].
1419 pub async fn finalize_upload_merkle_with_progress(
1420 &self,
1421 prepared: PreparedUpload,
1422 winner_pool_hash: [u8; 32],
1423 progress: Option<mpsc::Sender<UploadEvent>>,
1424 ) -> Result<FileUploadResult> {
1425 let data_map_address = prepared.data_map_address;
1426 let already_stored_count = prepared.already_stored_addresses.len();
1427 let total_chunks = prepared.total_chunks;
1428 match prepared.payment_info {
1429 ExternalPaymentInfo::Merkle {
1430 prepared_batch,
1431 chunk_contents,
1432 chunk_addresses,
1433 } => {
1434 let batch_result = finalize_merkle_batch(prepared_batch, winner_pool_hash)?;
1435 let outcome = self
1436 .merkle_upload_chunks(
1437 chunk_contents,
1438 chunk_addresses,
1439 &batch_result,
1440 progress.as_ref(),
1441 already_stored_count,
1442 total_chunks,
1443 )
1444 .await?;
1445
1446 info!(
1447 "External-signer merkle upload finalized: {} chunks stored, {} failed",
1448 outcome.stored, outcome.failed
1449 );
1450
1451 Ok(FileUploadResult {
1452 data_map: prepared.data_map,
1453 chunks_stored: outcome.stored,
1454 chunks_failed: outcome.failed,
1455 total_chunks,
1456 payment_mode_used: PaymentMode::Merkle,
1457 storage_cost_atto: "0".into(),
1458 gas_cost_wei: 0,
1459 data_map_address,
1460 chunk_attempts_total: outcome.stats.chunk_attempts_total,
1461 store_durations_ms: outcome.stats.store_durations_ms,
1462 retries_histogram: outcome.stats.retries_histogram,
1463 })
1464 }
1465 ExternalPaymentInfo::WaveBatch { .. } => Err(Error::Payment(
1466 "Cannot finalize wave-batch upload with merkle winner hash. \
1467 Use finalize_upload() instead."
1468 .to_string(),
1469 )),
1470 }
1471 }
1472
1473 /// Upload a file with a specific payment mode.
1474 ///
1475 /// Before encryption, checks that the temp directory has enough free
1476 /// disk space for the spilled chunks (~1.1× source file size).
1477 ///
1478 /// Encrypted chunks are spilled to a temp directory during encryption
1479 /// so that only their 32-byte addresses stay in memory. At upload time,
1480 /// chunks are read back one wave at a time (~64 × 4 MB ≈ 256 MB peak).
1481 ///
1482 /// # Errors
1483 ///
1484 /// Returns an error if there is insufficient disk space, the file cannot
1485 /// be read, encryption fails, or any chunk cannot be stored.
1486 #[allow(clippy::too_many_lines)]
1487 pub async fn file_upload_with_mode(
1488 &self,
1489 path: &Path,
1490 mode: PaymentMode,
1491 ) -> Result<FileUploadResult> {
1492 self.file_upload_with_progress(path, mode, None).await
1493 }
1494
1495 /// Upload a file with progress events sent to the given channel.
1496 ///
1497 /// Same as [`Client::file_upload_with_mode`] but sends [`UploadEvent`]s to the
1498 /// provided channel for UI progress feedback.
1499 #[allow(clippy::too_many_lines)]
1500 pub async fn file_upload_with_progress(
1501 &self,
1502 path: &Path,
1503 mode: PaymentMode,
1504 progress: Option<mpsc::Sender<UploadEvent>>,
1505 ) -> Result<FileUploadResult> {
1506 debug!(
1507 "Streaming file upload with mode {mode:?}: {}",
1508 path.display()
1509 );
1510
1511 // Pre-flight: verify enough temp disk space for the chunk spill.
1512 let file_size = std::fs::metadata(path)?.len();
1513 check_disk_space_for_spill(file_size)?;
1514
1515 // Phase 1: Encrypt file and spill chunks to temp directory.
1516 // Only 32-byte addresses stay in memory — chunk data lives on disk.
1517 let (spill, data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
1518
1519 let chunk_count = spill.len();
1520 info!(
1521 "Encrypted {} into {chunk_count} chunks (spilled to disk)",
1522 path.display()
1523 );
1524 if let Some(ref tx) = progress {
1525 let _ = tx
1526 .send(UploadEvent::Encrypted {
1527 total_chunks: chunk_count,
1528 })
1529 .await;
1530 }
1531
1532 // Phase 2: Decide payment mode and upload in waves from disk.
1533 //
1534 // For the merkle path, attempt to resume from a cached
1535 // receipt before paying again. The cache is keyed by the
1536 // CANONICAL source path so `./foo`, `/abs/foo`, and any
1537 // symlink alias all resolve to the same cache entry — a
1538 // crash-and-retry from a different cwd or via a different
1539 // alias still hits the receipt. Canonicalize may fail (the
1540 // file could have been moved between phase 1 and here); we
1541 // fall back to the display string in that case, which
1542 // preserves pre-fix behaviour rather than dropping cache
1543 // resume entirely.
1544 let file_path_key = std::fs::canonicalize(path)
1545 .map(|p| p.display().to_string())
1546 .unwrap_or_else(|_| path.display().to_string());
1547 let (chunks_stored, actual_mode, storage_cost_atto, gas_cost_wei, stats) = if self
1548 .should_use_merkle(chunk_count, mode)
1549 {
1550 info!("Using merkle batch payment for {chunk_count} file chunks");
1551
1552 let cached_merkle =
1553 crate::data::client::cached_merkle::try_load_for_file(&file_path_key)
1554 .map(|(_cache_path, cached)| cached);
1555
1556 let merkle_plan = match self
1557 .plan_merkle_upload(spill.chunk_entries()?, DATA_TYPE_CHUNK, progress.as_ref())
1558 .await
1559 {
1560 Ok(plan) => plan,
1561 Err(e) => {
1562 if let Some(cached) = cached_merkle
1563 .as_ref()
1564 .filter(|cached| cached_merkle_covers_addresses(cached, &spill.addresses))
1565 {
1566 info!(
1567 "Merkle preflight failed ({e}); \
1568 resuming with cached merkle proofs"
1569 );
1570 let (stored, sc, gc, stats) = self
1571 .upload_waves_merkle(
1572 &spill,
1573 &spill.addresses,
1574 cached,
1575 &[],
1576 progress.as_ref(),
1577 )
1578 .await?;
1579 crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
1580 return Ok(FileUploadResult {
1581 data_map,
1582 chunks_stored: stored,
1583 chunks_failed: 0,
1584 total_chunks: chunk_count,
1585 payment_mode_used: PaymentMode::Merkle,
1586 storage_cost_atto: sc,
1587 gas_cost_wei: gc,
1588 data_map_address: None,
1589 chunk_attempts_total: stats.chunk_attempts_total,
1590 store_durations_ms: stats.store_durations_ms,
1591 retries_histogram: stats.retries_histogram,
1592 });
1593 }
1594 match &e {
1595 Error::InsufficientPeers(msg) if mode == PaymentMode::Auto => {
1596 info!(
1597 "Merkle preflight needs more peers ({msg}), \
1598 falling back to wave-batch"
1599 );
1600 let (stored, sc, gc, fb_stats) = self
1601 .upload_waves_single(
1602 &spill,
1603 progress.as_ref(),
1604 Some(&file_path_key),
1605 )
1606 .await?;
1607 crate::data::client::cached_single::try_delete_for_file(&file_path_key);
1608 return Ok(FileUploadResult {
1609 data_map,
1610 chunks_stored: stored,
1611 chunks_failed: 0,
1612 total_chunks: chunk_count,
1613 payment_mode_used: PaymentMode::Single,
1614 storage_cost_atto: sc,
1615 gas_cost_wei: gc,
1616 data_map_address: None,
1617 chunk_attempts_total: fb_stats.chunk_attempts_total,
1618 store_durations_ms: fb_stats.store_durations_ms,
1619 retries_histogram: fb_stats.retries_histogram,
1620 });
1621 }
1622 _ => return Err(e),
1623 }
1624 }
1625 };
1626
1627 if merkle_plan.to_upload.is_empty() {
1628 info!("All {chunk_count} merkle chunks already stored; skipping payment");
1629 crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
1630 crate::data::client::cached_single::try_delete_for_file(&file_path_key);
1631 (
1632 chunk_count,
1633 PaymentMode::Merkle,
1634 "0".to_string(),
1635 0,
1636 WaveAggregateStats::default(),
1637 )
1638 } else if !self.should_use_merkle(merkle_plan.to_upload.len(), mode) {
1639 let remaining_chunks = merkle_plan.to_upload.len();
1640 if let Some(cached) = cached_merkle
1641 .as_ref()
1642 .filter(|cached| cached_merkle_covers_addresses(cached, &merkle_plan.to_upload))
1643 {
1644 info!(
1645 "{remaining_chunks} chunks remain below merkle threshold; \
1646 reusing cached merkle proofs"
1647 );
1648 let (stored, sc, gc, stats) = self
1649 .upload_waves_merkle(
1650 &spill,
1651 &merkle_plan.to_upload,
1652 cached,
1653 &merkle_plan.already_stored,
1654 progress.as_ref(),
1655 )
1656 .await?;
1657 crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
1658 (stored, PaymentMode::Merkle, sc, gc, stats)
1659 } else {
1660 if cached_merkle.is_some() {
1661 info!(
1662 "{remaining_chunks} chunks remain below merkle threshold, \
1663 and the cached merkle receipt does not cover them. \
1664 Discarding cache and using single-node payment."
1665 );
1666 crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
1667 } else {
1668 info!(
1669 "{remaining_chunks} chunks need upload after merkle preflight; \
1670 using single-node payment"
1671 );
1672 }
1673 let (stored, sc, gc, stats) = self
1674 .upload_spill_addresses_single(
1675 &spill,
1676 &merkle_plan.to_upload,
1677 progress.as_ref(),
1678 merkle_plan.already_stored.len(),
1679 chunk_count,
1680 Some(&file_path_key),
1681 )
1682 .await?;
1683 crate::data::client::cached_single::try_delete_for_file(&file_path_key);
1684 (stored, PaymentMode::Single, sc, gc, stats)
1685 }
1686 } else {
1687 let batch_result = if let Some(cached) = cached_merkle.as_ref() {
1688 // Validate the cache against the chunks that still need
1689 // storage. Extra proofs are harmless: a previous attempt
1690 // may have paid for chunks that are now already stored.
1691 if cached_merkle_covers_addresses(cached, &merkle_plan.to_upload) {
1692 info!(
1693 "Skipping merkle payment phase; resuming with \
1694 cached proofs for {} remaining chunks",
1695 merkle_plan.to_upload.len()
1696 );
1697 Ok(cached.clone())
1698 } else {
1699 info!(
1700 "Cached merkle receipt does not cover the current \
1701 remaining chunks (cached={}, remaining={}). \
1702 Discarding cache and paying fresh.",
1703 cached.proofs.len(),
1704 merkle_plan.to_upload.len()
1705 );
1706 crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
1707 self.pay_for_merkle_batch(
1708 &merkle_plan.to_upload,
1709 DATA_TYPE_CHUNK,
1710 merkle_plan.to_upload_avg_size(),
1711 )
1712 .await
1713 .inspect(|result| {
1714 crate::data::client::cached_merkle::try_save(&file_path_key, result);
1715 })
1716 }
1717 } else {
1718 self.pay_for_merkle_batch(
1719 &merkle_plan.to_upload,
1720 DATA_TYPE_CHUNK,
1721 merkle_plan.to_upload_avg_size(),
1722 )
1723 .await
1724 .inspect(|result| {
1725 // Save BEFORE the store phase so a crash
1726 // mid-upload leaves a resumable receipt.
1727 crate::data::client::cached_merkle::try_save(&file_path_key, result);
1728 })
1729 };
1730
1731 let batch_result = match batch_result {
1732 Ok(result) => result,
1733 Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => {
1734 info!("Merkle needs more peers ({msg}), falling back to wave-batch");
1735 let (stored, sc, gc, fb_stats) = self
1736 .upload_spill_addresses_single(
1737 &spill,
1738 &merkle_plan.to_upload,
1739 progress.as_ref(),
1740 merkle_plan.already_stored.len(),
1741 chunk_count,
1742 Some(&file_path_key),
1743 )
1744 .await?;
1745 crate::data::client::cached_single::try_delete_for_file(&file_path_key);
1746 return Ok(FileUploadResult {
1747 data_map,
1748 chunks_stored: stored,
1749 chunks_failed: 0,
1750 total_chunks: chunk_count,
1751 payment_mode_used: PaymentMode::Single,
1752 storage_cost_atto: sc,
1753 gas_cost_wei: gc,
1754 data_map_address: None,
1755 chunk_attempts_total: fb_stats.chunk_attempts_total,
1756 store_durations_ms: fb_stats.store_durations_ms,
1757 retries_histogram: fb_stats.retries_histogram,
1758 });
1759 }
1760 Err(e) => return Err(e),
1761 };
1762
1763 let (stored, sc, gc, stats) = self
1764 .upload_waves_merkle(
1765 &spill,
1766 &merkle_plan.to_upload,
1767 &batch_result,
1768 &merkle_plan.already_stored,
1769 progress.as_ref(),
1770 )
1771 .await?;
1772 // Upload succeeded end-to-end; the cached receipt is
1773 // no longer needed.
1774 crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
1775 (stored, PaymentMode::Merkle, sc, gc, stats)
1776 }
1777 } else {
1778 let (stored, sc, gc, stats) = self
1779 .upload_waves_single(&spill, progress.as_ref(), Some(&file_path_key))
1780 .await?;
1781 // Full file success: drop any cached single-node receipt.
1782 crate::data::client::cached_single::try_delete_for_file(&file_path_key);
1783 (stored, PaymentMode::Single, sc, gc, stats)
1784 };
1785
1786 info!(
1787 "File uploaded with {actual_mode:?}: {chunks_stored} chunks stored ({})",
1788 path.display()
1789 );
1790
1791 Ok(FileUploadResult {
1792 data_map,
1793 chunks_stored,
1794 chunks_failed: 0,
1795 total_chunks: chunk_count,
1796 payment_mode_used: actual_mode,
1797 storage_cost_atto,
1798 gas_cost_wei,
1799 data_map_address: None,
1800 chunk_attempts_total: stats.chunk_attempts_total,
1801 store_durations_ms: stats.store_durations_ms,
1802 retries_histogram: stats.retries_histogram,
1803 })
1804 }
1805
1806 /// Encrypt a file and spill chunks to a temp directory.
1807 ///
1808 /// Logs progress every 100 chunks so users get feedback during
1809 /// multi-GB encryptions.
1810 ///
1811 /// Returns the spill buffer (addresses on disk) and the `DataMap`.
1812 async fn encrypt_file_to_spill(
1813 &self,
1814 path: &Path,
1815 progress: Option<&mpsc::Sender<UploadEvent>>,
1816 ) -> Result<(ChunkSpill, DataMap)> {
1817 let (mut chunk_rx, datamap_rx, handle) = spawn_file_encryption(path.to_path_buf())?;
1818
1819 let mut spill = ChunkSpill::new()?;
1820 while let Some(content) = chunk_rx.recv().await {
1821 spill.push(&content)?;
1822 let chunks_done = spill.len();
1823 if let Some(tx) = progress {
1824 if chunks_done.is_multiple_of(10) {
1825 let _ = tx.send(UploadEvent::Encrypting { chunks_done }).await;
1826 }
1827 }
1828 if chunks_done % 100 == 0 {
1829 let mb = spill.total_bytes() / (1024 * 1024);
1830 info!(
1831 "Encryption progress: {chunks_done} chunks spilled ({mb} MB) — {}",
1832 path.display()
1833 );
1834 }
1835 }
1836
1837 // Await encryption completion to catch errors before paying.
1838 handle
1839 .await
1840 .map_err(|e| Error::Encryption(format!("encryption task panicked: {e}")))?
1841 .map_err(|e| Error::Encryption(format!("encryption failed: {e}")))?;
1842
1843 let data_map = datamap_rx
1844 .await
1845 .map_err(|_| Error::Encryption("no DataMap from encryption thread".to_string()))?;
1846
1847 Ok((spill, data_map))
1848 }
1849
1850 /// Upload chunks from a spill using wave-based per-chunk (single) payments.
1851 ///
1852 /// Reads one wave at a time from disk, prepares quotes, pays, and stores.
1853 /// Peak memory: ~`UPLOAD_WAVE_SIZE × MAX_CHUNK_SIZE` (~256 MB).
1854 ///
1855 /// Returns `(chunks_stored, storage_cost_atto, gas_cost_wei)`.
1856 async fn upload_waves_single(
1857 &self,
1858 spill: &ChunkSpill,
1859 progress: Option<&mpsc::Sender<UploadEvent>>,
1860 resume_key: Option<&str>,
1861 ) -> Result<(usize, String, u128, WaveAggregateStats)> {
1862 self.upload_spill_addresses_single(
1863 spill,
1864 &spill.addresses,
1865 progress,
1866 0,
1867 spill.len(),
1868 resume_key,
1869 )
1870 .await
1871 }
1872
1873 async fn upload_spill_addresses_single(
1874 &self,
1875 spill: &ChunkSpill,
1876 addresses: &[[u8; 32]],
1877 progress: Option<&mpsc::Sender<UploadEvent>>,
1878 stored_offset: usize,
1879 total_chunks: usize,
1880 resume_key: Option<&str>,
1881 ) -> Result<(usize, String, u128, WaveAggregateStats)> {
1882 let mut total_stored = stored_offset;
1883 let mut total_storage = Amount::ZERO;
1884 let mut total_gas: u128 = 0;
1885 let mut agg_stats = WaveAggregateStats::default();
1886 let waves: Vec<&[[u8; 32]]> = addresses.chunks(UPLOAD_WAVE_SIZE).collect();
1887 let wave_count = waves.len();
1888
1889 for (wave_idx, wave_addrs) in waves.into_iter().enumerate() {
1890 let wave_num = wave_idx + 1;
1891 let wave_data: Vec<Bytes> = wave_addrs
1892 .iter()
1893 .map(|addr| spill.read_chunk(addr))
1894 .collect::<Result<Vec<_>>>()?;
1895
1896 info!(
1897 "Wave {wave_num}/{wave_count}: quoting {} chunks — {total_stored}/{total_chunks} stored so far",
1898 wave_data.len()
1899 );
1900 if let Some(tx) = progress {
1901 let _ = tx
1902 .send(UploadEvent::QuotingChunks {
1903 wave: wave_num,
1904 total_waves: wave_count,
1905 chunks_in_wave: wave_data.len(),
1906 })
1907 .await;
1908 }
1909 let (addresses, wave_storage, wave_gas, wave_stats) = self
1910 .batch_upload_chunks_with_events(
1911 wave_data,
1912 progress,
1913 total_stored,
1914 total_chunks,
1915 resume_key,
1916 )
1917 .await?;
1918 total_stored += addresses.len();
1919 if let Ok(cost) = wave_storage.parse::<Amount>() {
1920 total_storage += cost;
1921 }
1922 total_gas = total_gas.saturating_add(wave_gas);
1923 // Merge per-call stats (each call already aggregates across the
1924 // waves it ran internally, so a simple sum/extend is correct).
1925 agg_stats.chunk_attempts_total = agg_stats
1926 .chunk_attempts_total
1927 .saturating_add(wave_stats.chunk_attempts_total);
1928 agg_stats
1929 .store_durations_ms
1930 .extend(wave_stats.store_durations_ms);
1931 for (slot, count) in agg_stats
1932 .retries_histogram
1933 .iter_mut()
1934 .zip(wave_stats.retries_histogram.iter())
1935 {
1936 *slot = slot.saturating_add(*count);
1937 }
1938 if let Some(tx) = progress {
1939 let _ = tx
1940 .send(UploadEvent::WaveComplete {
1941 wave: wave_num,
1942 total_waves: wave_count,
1943 stored_so_far: total_stored,
1944 total: total_chunks,
1945 })
1946 .await;
1947 }
1948 }
1949
1950 Ok((
1951 total_stored,
1952 total_storage.to_string(),
1953 total_gas,
1954 agg_stats,
1955 ))
1956 }
1957
1958 /// Upload chunks from a spill using pre-computed merkle proofs.
1959 ///
1960 /// Reads one wave at a time from disk, pairs each chunk with its proof,
1961 /// and uploads concurrently. Peak memory: ~`UPLOAD_WAVE_SIZE × MAX_CHUNK_SIZE`.
1962 ///
1963 /// A chunk that is transiently short of quorum (`InsufficientPeers`) does
1964 /// **not** abort the file: each wave is driven through
1965 /// [`merkle_store_with_retry`], which collects such chunks and retries them
1966 /// — re-collecting their close group and reusing the same proof — for up to
1967 /// [`MERKLE_STORE_MAX_ATTEMPTS`] rounds with a [`MERKLE_RETRY_BACKOFF`] wait
1968 /// between rounds. Retry is per-wave to preserve the streaming memory bound.
1969 /// Non-quorum errors (e.g. a missing proof) stay fatal and abort immediately.
1970 ///
1971 /// Returns `(chunks_stored, storage_cost_atto, gas_cost_wei)` on success.
1972 /// Costs come from the `batch_result` which was populated during payment.
1973 ///
1974 /// # Errors
1975 ///
1976 /// Returns [`Error::PartialUpload`] if any chunk is still short of quorum
1977 /// after all retries across every wave (other chunks remain stored), or the
1978 /// underlying error for a non-quorum failure.
1979 async fn upload_waves_merkle(
1980 &self,
1981 spill: &ChunkSpill,
1982 addresses: &[[u8; 32]],
1983 batch_result: &MerkleBatchPaymentResult,
1984 already_stored_addresses: &[[u8; 32]],
1985 progress: Option<&mpsc::Sender<UploadEvent>>,
1986 ) -> Result<(usize, String, u128, WaveAggregateStats)> {
1987 let mut total_stored = already_stored_addresses.len();
1988 let total_chunks = total_stored + addresses.len();
1989 let mut stored_addresses: Vec<[u8; 32]> = already_stored_addresses.to_vec();
1990 let mut failed: Vec<([u8; 32], String)> = Vec::new();
1991 let mut agg_stats = WaveAggregateStats::default();
1992
1993 // Chunks without a merkle proof were never paid for: a partial
1994 // `pay_for_merkle_multi_batch` result carries proofs only for the
1995 // sub-batches whose on-chain payment succeeded. Such a chunk cannot be
1996 // stored, so record it as failed (surfaced via `PartialUpload` once the
1997 // storable chunks have been attempted) rather than letting its
1998 // "missing proof" error abort the whole file and discard every other
1999 // chunk's progress.
2000 let (to_store, missing_proof) =
2001 partition_addresses_by_proof(addresses, &batch_result.proofs);
2002 if !missing_proof.is_empty() {
2003 warn!(
2004 "{} chunk(s) lack a merkle proof (partial payment); reporting them as failed",
2005 missing_proof.len()
2006 );
2007 for addr in &missing_proof {
2008 failed.push((
2009 *addr,
2010 format!("Missing merkle proof for chunk {}", hex::encode(addr)),
2011 ));
2012 }
2013 }
2014
2015 let waves: Vec<&[[u8; 32]]> = to_store.chunks(UPLOAD_WAVE_SIZE).collect();
2016 let wave_count = waves.len();
2017
2018 let store_limiter = self.controller().store.clone();
2019
2020 // Store one chunk to its (freshly re-collected) close group, reusing the
2021 // chunk's merkle proof. Shared across every retry round so a converged
2022 // routing table yields a fresh group. Only `InsufficientPeers` is
2023 // recoverable; a missing proof stays fatal. Mirrors the external-signer
2024 // path's closure in `merkle_upload_chunks`.
2025 let store_one = |addr: [u8; 32], content: Bytes| {
2026 let limiter = store_limiter.clone();
2027 let proof_bytes = batch_result.proofs.get(&addr).cloned();
2028 async move {
2029 let started = std::time::Instant::now();
2030 let proof = proof_bytes.ok_or_else(|| {
2031 Error::Payment(format!(
2032 "Missing merkle proof for chunk {}",
2033 hex::encode(addr)
2034 ))
2035 })?;
2036 let peers = self.close_group_peers(&addr).await?;
2037 observe_op(
2038 &limiter,
2039 || async move { self.chunk_put_to_close_group(content, proof, &peers).await },
2040 classify_error,
2041 )
2042 .await
2043 .map(|_| started)
2044 }
2045 };
2046
2047 for (wave_idx, wave_addrs) in waves.into_iter().enumerate() {
2048 let wave_num = wave_idx + 1;
2049 let wave = spill.read_wave(wave_addrs)?;
2050
2051 info!(
2052 "Wave {wave_num}/{wave_count}: storing {} chunks (merkle) — {total_stored}/{total_chunks} stored so far",
2053 wave.len()
2054 );
2055
2056 // Clamp fan-out to wave size — partial last wave should
2057 // not pay for extra slots (see PERF-RESULTS.md).
2058 let store_concurrency = store_limiter.current().min(wave.len().max(1));
2059 let chunks: Vec<([u8; 32], Bytes)> = wave
2060 .into_iter()
2061 .map(|(content, addr)| (addr, content))
2062 .collect();
2063
2064 // Retry quorum-short chunks instead of aborting on the first miss.
2065 // `stored_offset` is the running cumulative count so the progress
2066 // events the driver emits stay correctly numbered across waves.
2067 let outcome = match merkle_store_with_retry(
2068 chunks,
2069 store_concurrency,
2070 MERKLE_STORE_MAX_ATTEMPTS,
2071 MERKLE_RETRY_BACKOFF,
2072 progress,
2073 total_stored,
2074 total_chunks,
2075 &store_one,
2076 )
2077 .await
2078 {
2079 Ok(outcome) => outcome,
2080 Err(e) => {
2081 // A non-quorum store error is fatal for the retry helper
2082 // (missing proofs were filtered out above, so this is a
2083 // genuine network/store failure, e.g. a DHT lookup error).
2084 // Surface it at the file boundary as `PartialUpload` so the
2085 // chunks already stored in earlier waves — and any
2086 // missing-proof chunks already recorded — are preserved for
2087 // resume, rather than discarded with a generic error.
2088 warn!("merkle wave {wave_num}/{wave_count} aborted: {e}");
2089 let failed_count = failed.len();
2090 return Err(Error::PartialUpload {
2091 stored: stored_addresses,
2092 stored_count: total_stored,
2093 failed,
2094 failed_count,
2095 total_chunks,
2096 reason: format!("merkle chunk store aborted: {e}"),
2097 });
2098 }
2099 };
2100
2101 // Record which of this wave's chunks landed and which exhausted
2102 // their retries, so a permanently-failed chunk can surface as
2103 // `PartialUpload` once the whole file has been attempted.
2104 let wave_failed: HashSet<[u8; 32]> = outcome
2105 .failed_addresses
2106 .iter()
2107 .map(|(addr, _)| *addr)
2108 .collect();
2109 for addr in wave_addrs {
2110 if !wave_failed.contains(addr) {
2111 stored_addresses.push(*addr);
2112 }
2113 }
2114 failed.extend(outcome.failed_addresses);
2115 total_stored = outcome.stored;
2116
2117 // Merge per-wave stats (durations, attempts, per-round histogram).
2118 agg_stats.chunk_attempts_total = agg_stats
2119 .chunk_attempts_total
2120 .saturating_add(outcome.stats.chunk_attempts_total);
2121 agg_stats
2122 .store_durations_ms
2123 .extend(outcome.stats.store_durations_ms);
2124 for (slot, count) in agg_stats
2125 .retries_histogram
2126 .iter_mut()
2127 .zip(outcome.stats.retries_histogram.iter())
2128 {
2129 *slot = slot.saturating_add(*count);
2130 }
2131
2132 if let Some(tx) = progress {
2133 let _ = tx
2134 .send(UploadEvent::WaveComplete {
2135 wave: wave_num,
2136 total_waves: wave_count,
2137 stored_so_far: total_stored,
2138 total: total_chunks,
2139 })
2140 .await;
2141 }
2142 }
2143
2144 // A file with any permanently-failed chunk is not fully stored — surface
2145 // it as `PartialUpload`, but only after retries across all waves are
2146 // exhausted (never silently succeed with missing chunks).
2147 if !failed.is_empty() {
2148 let failed_count = failed.len();
2149 warn!(
2150 "merkle upload incomplete: {failed_count}/{total_chunks} chunks short of quorum after retries"
2151 );
2152 return Err(Error::PartialUpload {
2153 stored: stored_addresses,
2154 stored_count: total_stored,
2155 failed,
2156 failed_count,
2157 total_chunks,
2158 reason: format!(
2159 "{failed_count} chunk(s) short of quorum after {MERKLE_STORE_MAX_ATTEMPTS} attempts"
2160 ),
2161 });
2162 }
2163
2164 Ok((
2165 total_stored,
2166 batch_result.storage_cost_atto.clone(),
2167 batch_result.gas_cost_wei,
2168 agg_stats,
2169 ))
2170 }
2171
2172 /// Download and decrypt a file from the network, writing it to disk.
2173 ///
2174 /// Uses `streaming_decrypt` so that only one batch of chunks lives in
2175 /// memory at a time, avoiding OOM on large files. Chunks are fetched
2176 /// concurrently within each batch, then decrypted data is written to
2177 /// disk incrementally.
2178 ///
2179 /// Returns the number of bytes written.
2180 ///
2181 /// # Panics
2182 ///
2183 /// Requires a multi-threaded Tokio runtime (`flavor = "multi_thread"`).
2184 /// Will panic if called from a `current_thread` runtime because
2185 /// `streaming_decrypt` takes a synchronous callback that must bridge
2186 /// back to async via `block_in_place`.
2187 ///
2188 /// # Errors
2189 ///
2190 /// Returns an error if any chunk cannot be retrieved, decryption fails,
2191 /// or the file cannot be written.
2192 #[allow(clippy::unused_async)]
2193 pub async fn file_download(&self, data_map: &DataMap, output: &Path) -> Result<u64> {
2194 self.file_download_with_progress(data_map, output, None)
2195 .await
2196 }
2197
2198 /// Download and decrypt a file with progress events.
2199 ///
2200 /// Same as [`Client::file_download`] but sends [`DownloadEvent`]s for UI feedback.
2201 ///
2202 /// Progress reporting:
2203 /// 1. Resolves hierarchical DataMaps to the root level first (reports as
2204 /// `ChunksFetched` with `total: 0` during resolution)
2205 /// 2. Once the root DataMap is known, sends `total_chunks` with accurate count
2206 /// 3. Fetches data chunks with accurate `fetched/total` progress
2207 #[allow(clippy::unused_async)]
2208 pub async fn file_download_with_progress(
2209 &self,
2210 data_map: &DataMap,
2211 output: &Path,
2212 progress: Option<mpsc::Sender<DownloadEvent>>,
2213 ) -> Result<u64> {
2214 debug!("Downloading file to {}", output.display());
2215
2216 let handle = Handle::current();
2217
2218 // Phase 1: Resolve hierarchical DataMap to root level.
2219 // This fetches child DataMap chunks (typically 3) to discover the real chunk count.
2220 let root_map = if data_map.is_child() {
2221 let dm_chunks = data_map.len();
2222 if let Some(ref tx) = progress {
2223 let _ = tx.try_send(DownloadEvent::ResolvingDataMap {
2224 total_map_chunks: dm_chunks,
2225 });
2226 }
2227
2228 let resolve_progress = progress.clone();
2229 let resolve_counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
2230
2231 let resolved = tokio::task::block_in_place(|| {
2232 let counter_ref = resolve_counter.clone();
2233 let progress_ref = resolve_progress.clone();
2234 let fetch_limiter = self.controller().fetch.clone();
2235 let fetch = |batch: &[(usize, XorName)]| {
2236 let batch_owned: Vec<(usize, XorName)> = batch.to_vec();
2237 let counter = counter_ref.clone();
2238 let prog = progress_ref.clone();
2239 let limiter = fetch_limiter.clone();
2240 handle.block_on(async {
2241 // Use rebucketed_unordered so the in-flight cap
2242 // is re-read from the limiter as each slot frees.
2243 // `buffer_unordered` snapshots the cap once at
2244 // pipeline build, which means observe_op
2245 // signals from inside chunk_get cannot reduce
2246 // concurrency on the current batch — exactly
2247 // the case where load-shedding is needed.
2248 let mut results = rebucketed_unordered(
2249 &limiter,
2250 batch_owned,
2251 |(idx, hash): (usize, XorName)| {
2252 let counter = counter.clone();
2253 let prog = prog.clone();
2254 async move {
2255 let addr = hash.0;
2256 // chunk_get_observed feeds the
2257 // adaptive fetch limiter once per
2258 // call via chunk_get_outcome
2259 // (Ok(None) -> Timeout is the
2260 // load-shedding signal for
2261 // sustained close-group exhaustion).
2262 let chunk = self
2263 .chunk_get_observed(&addr)
2264 .await
2265 .map_err(|e| {
2266 self_encryption::Error::Generic(format!(
2267 "DataMap resolution failed: {e}"
2268 ))
2269 })?
2270 .ok_or_else(|| {
2271 self_encryption::Error::Generic(format!(
2272 "DataMap chunk not found: {}",
2273 hex::encode(addr)
2274 ))
2275 })?;
2276 let fetched = counter
2277 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
2278 + 1;
2279 if let Some(ref tx) = prog {
2280 let _ =
2281 tx.try_send(DownloadEvent::MapChunkFetched { fetched });
2282 }
2283 Ok::<_, self_encryption::Error>((idx, chunk.content))
2284 }
2285 },
2286 )
2287 .await?;
2288 // CRITICAL: self_encryption::get_root_data_map_parallel
2289 // pairs the returned Vec POSITIONALLY with the input
2290 // hashes via .zip() and discards our idx field.
2291 // rebucketed_unordered preserves first-completion
2292 // order, so sort by idx to restore input order
2293 // before returning.
2294 results.sort_by_key(|(idx, _)| *idx);
2295 Ok(results)
2296 })
2297 };
2298 get_root_data_map_parallel(data_map.clone(), &fetch)
2299 })
2300 .map_err(|e| Error::Encryption(format!("DataMap resolution failed: {e}")))?;
2301
2302 info!(
2303 "Resolved hierarchical DataMap: {} data chunks",
2304 resolved.len()
2305 );
2306 resolved
2307 } else {
2308 data_map.clone()
2309 };
2310
2311 // Phase 2: Now we know the real chunk count.
2312 let total_chunks = root_map.len();
2313 if let Some(ref tx) = progress {
2314 let _ = tx.try_send(DownloadEvent::DataMapResolved { total_chunks });
2315 }
2316
2317 // Phase 3: Fetch and decrypt data chunks with accurate progress.
2318 let fetched_counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
2319 let fetched_for_closure = fetched_counter.clone();
2320 let progress_for_closure = progress.clone();
2321
2322 let fetch_limiter_outer = self.controller().fetch.clone();
2323 let usable_memory = usable_memory_bytes();
2324 let configured_batch_floor = stream_decrypt_batch_size();
2325 let fetch_cap = fetch_limiter_outer.current();
2326 let decrypt_batch_size = adaptive_stream_decrypt_batch_size(
2327 total_chunks,
2328 fetch_cap,
2329 configured_batch_floor,
2330 usable_memory,
2331 );
2332 info!(
2333 total_chunks,
2334 fetch_cap,
2335 configured_batch_floor,
2336 ?usable_memory,
2337 decrypt_batch_size,
2338 "Selected adaptive stream decrypt batch size"
2339 );
2340
2341 let stream = streaming_decrypt_with_batch_size(
2342 &root_map,
2343 |batch: &[(usize, XorName)]| {
2344 let batch_owned: Vec<(usize, XorName)> = batch.to_vec();
2345 let fetched_ref = fetched_for_closure.clone();
2346 let progress_ref = progress_for_closure.clone();
2347 let fetch_limiter = fetch_limiter_outer.clone();
2348
2349 tokio::task::block_in_place(|| {
2350 handle.block_on(async {
2351 // First pass: try every chunk in the batch via
2352 // chunk_get_observed (which already does its own
2353 // first-attempt + retry sweep). A chunk that
2354 // returns Ok(None) here is NOT a fatal failure
2355 // — it's a candidate for a deferred retry below.
2356 // We carry the chunk's XorName through so the
2357 // retry pass can re-fetch by address.
2358 //
2359 // The closure ONLY returns Err on a true
2360 // protocol/network error from chunk_get (the
2361 // Err variant). Ok(None) is encoded as
2362 // `Err(addr)` in the inner Result so the outer
2363 // rebucketed pass doesn't early-abort on it.
2364 type BatchEntry =
2365 (usize, std::result::Result<bytes::Bytes, XorName>);
2366 let raw: Vec<BatchEntry> = rebucketed_unordered(
2367 &fetch_limiter,
2368 batch_owned,
2369 |(idx, hash): (usize, XorName)| {
2370 let fetched_ref = fetched_ref.clone();
2371 let progress_ref = progress_ref.clone();
2372 async move {
2373 let addr = hash.0;
2374 let addr_hex = hex::encode(addr);
2375 match self.chunk_get_observed(&addr).await {
2376 Ok(Some(chunk)) => {
2377 let fetched = fetched_ref.fetch_add(
2378 1,
2379 std::sync::atomic::Ordering::Relaxed,
2380 ) + 1;
2381 info!("Downloaded {fetched}/{total_chunks}");
2382 if let Some(ref tx) = progress_ref {
2383 let _ = tx.try_send(
2384 DownloadEvent::ChunksFetched {
2385 fetched,
2386 total: total_chunks,
2387 },
2388 );
2389 }
2390 Ok::<BatchEntry, self_encryption::Error>((
2391 idx,
2392 Ok(chunk.content),
2393 ))
2394 }
2395 // chunk_get returned Ok(None): defer
2396 // this chunk for a later retry rather
2397 // than aborting the whole batch.
2398 Ok(None) => Ok((idx, Err(hash))),
2399 // A transient error for one chunk
2400 // (e.g. its close-group DHT walk
2401 // erroring on this pass) must not
2402 // abort a multi-hundred-chunk
2403 // download. Defer it to the retry
2404 // rounds, same as Ok(None); only a
2405 // chunk that survives all deferred
2406 // rounds is fatal.
2407 Err(e) => {
2408 info!(
2409 "First-pass fetch error for {addr_hex}: {e}; deferring"
2410 );
2411 Ok((idx, Err(hash)))
2412 }
2413 }
2414 }
2415 },
2416 )
2417 .await?;
2418
2419 // Partition: things we already have vs the
2420 // deferred set we need to retry.
2421 let mut results: Vec<(usize, bytes::Bytes)> = Vec::new();
2422 let mut deferred: Vec<(usize, XorName)> = Vec::new();
2423 for (idx, inner) in raw {
2424 match inner {
2425 Ok(bytes) => results.push((idx, bytes)),
2426 Err(hash) => deferred.push((idx, hash)),
2427 }
2428 }
2429
2430 // Deferred retry pass: retry the deferred chunks
2431 // in CONCURRENT rounds (reusing the fetch
2432 // limiter's cap), not serially. The first round
2433 // fires immediately — most deferrals on a
2434 // healthy-but-lossy link are peer-side noise
2435 // that clears in well under a second, and
2436 // serializing them behind mandatory multi-second
2437 // sleeps was the single biggest throughput sink
2438 // on such links (a batch deferring ~20 chunks
2439 // burned minutes of near-zero throughput even
2440 // though every chunk succeeded on its first
2441 // retry). Only chunks that survive a round get a
2442 // longer back-off before the next, so genuine
2443 // saturation still gets time to settle.
2444 if !deferred.is_empty() {
2445 // Round delays in seconds. Round 0 is
2446 // immediate; later rounds back off to ride
2447 // out sustained saturation.
2448 const DEFERRED_ROUND_DELAYS_SECS: [u64; 3] = [0, 15, 45];
2449 info!(
2450 "Deferring {} chunk(s) for concurrent retry after batch settles",
2451 deferred.len()
2452 );
2453 let mut remaining = deferred;
2454 for (round, &delay_secs) in DEFERRED_ROUND_DELAYS_SECS
2455 .iter()
2456 .enumerate()
2457 {
2458 if remaining.is_empty() {
2459 break;
2460 }
2461 if delay_secs > 0 {
2462 tokio::time::sleep(std::time::Duration::from_secs(
2463 delay_secs,
2464 ))
2465 .await;
2466 }
2467 info!(
2468 "Deferred retry round {}/{}: {} chunk(s)",
2469 round + 1,
2470 DEFERRED_ROUND_DELAYS_SECS.len(),
2471 remaining.len(),
2472 );
2473 let round_input = std::mem::take(&mut remaining);
2474 let round_results: Vec<BatchEntry> = rebucketed_unordered(
2475 &fetch_limiter,
2476 round_input,
2477 |(idx, hash): (usize, XorName)| {
2478 let fetched_ref = fetched_ref.clone();
2479 let progress_ref = progress_ref.clone();
2480 async move {
2481 let addr = hash.0;
2482 // Both Ok(None) and a transient
2483 // Err re-defer the chunk to the
2484 // next round rather than
2485 // aborting; only the final
2486 // round's leftovers are fatal.
2487 match self.chunk_get_observed(&addr).await {
2488 Ok(Some(chunk)) => {
2489 let fetched = fetched_ref.fetch_add(
2490 1,
2491 std::sync::atomic::Ordering::Relaxed,
2492 ) + 1;
2493 info!(
2494 "Downloaded {fetched}/{total_chunks} (deferred retry)"
2495 );
2496 if let Some(ref tx) = progress_ref {
2497 let _ = tx.try_send(
2498 DownloadEvent::ChunksFetched {
2499 fetched,
2500 total: total_chunks,
2501 },
2502 );
2503 }
2504 Ok::<BatchEntry, self_encryption::Error>((
2505 idx,
2506 Ok(chunk.content),
2507 ))
2508 }
2509 Ok(None) => Ok((idx, Err(hash))),
2510 Err(e) => {
2511 info!(
2512 "Deferred retry for {} hit transient error: {e}; re-deferring",
2513 hex::encode(addr)
2514 );
2515 Ok((idx, Err(hash)))
2516 }
2517 }
2518 }
2519 },
2520 )
2521 .await?;
2522 for (idx, inner) in round_results {
2523 match inner {
2524 Ok(bytes) => results.push((idx, bytes)),
2525 Err(hash) => remaining.push((idx, hash)),
2526 }
2527 }
2528 }
2529 if let Some((_, hash)) = remaining.first() {
2530 return Err(self_encryption::Error::Generic(format!(
2531 "Chunk not found after {} deferred retry rounds: {}",
2532 DEFERRED_ROUND_DELAYS_SECS.len(),
2533 hex::encode(hash.0),
2534 )));
2535 }
2536 }
2537
2538 // streaming_decrypt itself sort_by_keys before
2539 // zipping, but the same closure is also passed
2540 // through get_root_data_map_parallel internally
2541 // (see self_encryption::stream_decrypt.rs::new), and
2542 // THAT path zips positionally without sorting. Sort
2543 // here so both consumers see input order.
2544 results.sort_by_key(|(idx, _)| *idx);
2545 Ok(results)
2546 })
2547 })
2548 },
2549 decrypt_batch_size,
2550 )
2551 .map_err(|e| Error::Encryption(format!("streaming decrypt failed: {e}")))?;
2552
2553 // Write decrypted chunks to a temp file, then rename atomically.
2554 let parent = output.parent().unwrap_or_else(|| Path::new("."));
2555 let unique: u64 = rand::random();
2556 let tmp_path = parent.join(format!(".ant_download_{}_{unique}.tmp", std::process::id()));
2557
2558 let write_result = (|| -> Result<u64> {
2559 let mut file = std::fs::File::create(&tmp_path)?;
2560 let mut bytes_written = 0u64;
2561 for chunk_result in stream {
2562 let chunk_bytes = chunk_result
2563 .map_err(|e| Error::Encryption(format!("decryption failed: {e}")))?;
2564 file.write_all(&chunk_bytes)?;
2565 bytes_written += chunk_bytes.len() as u64;
2566 }
2567 file.flush()?;
2568 Ok(bytes_written)
2569 })();
2570
2571 match write_result {
2572 Ok(bytes_written) => match std::fs::rename(&tmp_path, output) {
2573 Ok(()) => {
2574 info!(
2575 "File downloaded: {bytes_written} bytes written to {}",
2576 output.display()
2577 );
2578 Ok(bytes_written)
2579 }
2580 Err(rename_err) => {
2581 if let Err(cleanup_err) = std::fs::remove_file(&tmp_path) {
2582 warn!(
2583 "Failed to remove temp download file {}: {cleanup_err}",
2584 tmp_path.display()
2585 );
2586 }
2587 Err(rename_err.into())
2588 }
2589 },
2590 Err(e) => {
2591 if let Err(cleanup_err) = std::fs::remove_file(&tmp_path) {
2592 warn!(
2593 "Failed to remove temp download file {}: {cleanup_err}",
2594 tmp_path.display()
2595 );
2596 }
2597 Err(e)
2598 }
2599 }
2600 }
2601}
2602
2603#[cfg(test)]
2604#[allow(clippy::unwrap_used)]
2605mod tests {
2606 use super::*;
2607
2608 #[test]
2609 fn disk_space_check_passes_for_small_file() {
2610 // A 1 KB file should always pass the disk space check
2611 check_disk_space_for_spill(1024).unwrap();
2612 }
2613
2614 #[test]
2615 fn disk_space_check_fails_for_absurd_size() {
2616 // Requesting space for a 1 exabyte file should fail on any real system
2617 let result = check_disk_space_for_spill(u64::MAX / 2);
2618 assert!(result.is_err());
2619 let err = result.unwrap_err();
2620 assert!(
2621 matches!(err, Error::InsufficientDiskSpace(_)),
2622 "expected InsufficientDiskSpace, got: {err}"
2623 );
2624 }
2625
2626 #[test]
2627 fn adaptive_stream_decrypt_batch_size_tracks_fetch_headroom() {
2628 let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, Some(u64::MAX));
2629
2630 assert_eq!(batch_size, 64 * DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER);
2631 }
2632
2633 #[test]
2634 fn adaptive_stream_decrypt_batch_size_caps_to_total_chunks() {
2635 let batch_size = adaptive_stream_decrypt_batch_size(12, 64, 10, Some(u64::MAX));
2636
2637 assert_eq!(batch_size, 12);
2638 }
2639
2640 #[test]
2641 fn adaptive_stream_decrypt_batch_size_honours_configured_floor() {
2642 let batch_size = adaptive_stream_decrypt_batch_size(1_000, 1, 32, None);
2643
2644 assert_eq!(batch_size, 32);
2645 }
2646
2647 #[test]
2648 fn adaptive_stream_decrypt_batch_size_does_not_expand_without_memory_reading() {
2649 let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, None);
2650
2651 assert_eq!(batch_size, 10);
2652 }
2653
2654 #[test]
2655 fn adaptive_stream_decrypt_batch_size_caps_to_memory_budget() {
2656 let estimated_bytes_per_chunk = (self_encryption::MAX_CHUNK_SIZE as u64)
2657 .saturating_mul(DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER)
2658 .max(1);
2659 let usable_memory = estimated_bytes_per_chunk
2660 .saturating_mul(16)
2661 .saturating_mul(DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR);
2662 let batch_size = adaptive_stream_decrypt_batch_size(1_000, 256, 10, Some(usable_memory));
2663
2664 assert_eq!(batch_size, 16);
2665 }
2666
2667 #[test]
2668 fn adaptive_stream_decrypt_batch_size_keeps_one_chunk_when_memory_is_tight() {
2669 let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, Some(1));
2670
2671 assert_eq!(batch_size, 1);
2672 }
2673
2674 #[test]
2675 fn cached_merkle_covers_only_when_all_addresses_have_proofs() {
2676 let covered = compute_address(&Bytes::from_static(b"covered"));
2677 let extra = compute_address(&Bytes::from_static(b"extra"));
2678 let missing = compute_address(&Bytes::from_static(b"missing"));
2679 let cached = MerkleBatchPaymentResult {
2680 proofs: HashMap::from([(covered, vec![1]), (extra, vec![2])]),
2681 chunk_count: 2,
2682 storage_cost_atto: "0".to_string(),
2683 gas_cost_wei: 0,
2684 merkle_payment_timestamp: 0,
2685 };
2686
2687 assert!(cached_merkle_covers_addresses(&cached, &[covered]));
2688 assert!(cached_merkle_covers_addresses(&cached, &[covered, extra]));
2689 assert!(!cached_merkle_covers_addresses(
2690 &cached,
2691 &[covered, missing]
2692 ));
2693 }
2694
2695 /// A partial merkle payment leaves some addresses without a proof. Those
2696 /// must be split out so `upload_waves_merkle` reports them as failed
2697 /// (`PartialUpload`) instead of aborting the whole file — preserving the
2698 /// addresses' original order in each group.
2699 #[test]
2700 fn partition_addresses_by_proof_splits_paid_and_unpaid() {
2701 let paid_a = [1u8; 32];
2702 let unpaid_b = [2u8; 32];
2703 let paid_c = [3u8; 32];
2704 let unpaid_d = [4u8; 32];
2705 let proofs: HashMap<[u8; 32], Vec<u8>> =
2706 HashMap::from([(paid_a, vec![0xaa]), (paid_c, vec![0xcc])]);
2707
2708 let (to_store, missing) =
2709 partition_addresses_by_proof(&[paid_a, unpaid_b, paid_c, unpaid_d], &proofs);
2710
2711 assert_eq!(to_store, vec![paid_a, paid_c]);
2712 assert_eq!(missing, vec![unpaid_b, unpaid_d]);
2713 }
2714
2715 #[test]
2716 fn partition_addresses_by_proof_handles_all_or_nothing() {
2717 let a = [5u8; 32];
2718 let b = [6u8; 32];
2719
2720 // No proofs at all → every address is missing.
2721 let empty: HashMap<[u8; 32], Vec<u8>> = HashMap::new();
2722 let (to_store, missing) = partition_addresses_by_proof(&[a, b], &empty);
2723 assert!(to_store.is_empty());
2724 assert_eq!(missing, vec![a, b]);
2725
2726 // All proofs present → nothing missing.
2727 let full: HashMap<[u8; 32], Vec<u8>> = HashMap::from([(a, vec![1]), (b, vec![2])]);
2728 let (to_store, missing) = partition_addresses_by_proof(&[a, b], &full);
2729 assert_eq!(to_store, vec![a, b]);
2730 assert!(missing.is_empty());
2731 }
2732
2733 #[test]
2734 fn chunk_spill_round_trip() {
2735 let mut spill = ChunkSpill::new().unwrap();
2736 let data1 = vec![0xAA; 1024];
2737 let data2 = vec![0xBB; 2048];
2738
2739 spill.push(&data1).unwrap();
2740 spill.push(&data2).unwrap();
2741
2742 assert_eq!(spill.len(), 2);
2743 assert_eq!(spill.total_bytes(), 1024 + 2048);
2744 let chunk_entries = spill.chunk_entries().unwrap();
2745 let entry_total: u64 = chunk_entries.iter().map(|(_, size)| *size).sum();
2746 assert_eq!(entry_total, 1024 + 2048);
2747
2748 // Read back and verify
2749 let chunk1 = spill.read_chunk(spill.addresses.first().unwrap()).unwrap();
2750 assert_eq!(&chunk1[..], &data1[..]);
2751
2752 let chunk2 = spill.read_chunk(spill.addresses.get(1).unwrap()).unwrap();
2753 assert_eq!(&chunk2[..], &data2[..]);
2754
2755 // Verify waves with 1-chunk wave size
2756 let waves: Vec<_> = spill.addresses.chunks(1).collect();
2757 assert_eq!(waves.len(), 2);
2758 }
2759
2760 #[test]
2761 fn chunk_spill_cleanup_on_drop() {
2762 let dir;
2763 {
2764 let spill = ChunkSpill::new().unwrap();
2765 dir = spill.dir.clone();
2766 assert!(dir.exists());
2767 }
2768 // After drop, the directory should be cleaned up
2769 assert!(!dir.exists(), "spill dir should be removed on drop");
2770 }
2771
2772 #[test]
2773 fn chunk_spill_deduplicates_identical_content() {
2774 let mut spill = ChunkSpill::new().unwrap();
2775 let data = vec![0xCC; 512];
2776
2777 spill.push(&data).unwrap();
2778 spill.push(&data).unwrap(); // same content, should be skipped
2779 spill.push(&data).unwrap(); // again
2780
2781 assert_eq!(spill.len(), 1, "duplicate chunks should be deduplicated");
2782 assert_eq!(
2783 spill.total_bytes(),
2784 512,
2785 "total_bytes should count unique only"
2786 );
2787
2788 // Different content should still be added
2789 let data2 = vec![0xDD; 256];
2790 spill.push(&data2).unwrap();
2791 assert_eq!(spill.len(), 2);
2792 assert_eq!(spill.total_bytes(), 512 + 256);
2793 }
2794}
2795
2796/// Compile-time assertions that Client file method futures are Send.
2797#[cfg(test)]
2798mod send_assertions {
2799 use super::*;
2800
2801 fn _assert_send<T: Send>(_: &T) {}
2802
2803 #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
2804 async fn _file_upload_is_send(client: &Client) {
2805 let fut = client.file_upload(Path::new("/dev/null"));
2806 _assert_send(&fut);
2807 }
2808
2809 #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
2810 async fn _file_upload_with_mode_is_send(client: &Client) {
2811 let fut = client.file_upload_with_mode(Path::new("/dev/null"), PaymentMode::Auto);
2812 _assert_send(&fut);
2813 }
2814
2815 #[allow(
2816 dead_code,
2817 unreachable_code,
2818 unused_variables,
2819 clippy::diverging_sub_expression
2820 )]
2821 async fn _file_download_is_send(client: &Client) {
2822 let dm: DataMap = todo!();
2823 let fut = client.file_download(&dm, Path::new("/dev/null"));
2824 _assert_send(&fut);
2825 }
2826}