ant_core/data/client/file.rs
1//! File operations using streaming self-encryption.
2//!
3//! Upload files directly from disk without loading them entirely into memory.
4//! Uses `stream_encrypt` to process files in 8KB chunks, encrypting and
5//! uploading each piece as it's produced.
6//!
7//! Encrypted chunks are spilled to a temporary directory during encryption
8//! so that peak memory usage is bounded to one wave (~256 MB for 64 × 4 MB
9//! chunks) regardless of file size.
10//!
11//! For in-memory data uploads, see the `data` module.
12
13use crate::data::client::adaptive::{observe_op, rebucketed_unordered};
14use crate::data::client::batch::{
15 finalize_batch_payment, PaymentIntent, PreparedChunk, WaveAggregateStats,
16};
17use crate::data::client::classify_error;
18use crate::data::client::merkle::{
19 chunk_contents_for_upload_addresses, finalize_merkle_batch, merkle_deferred_retry,
20 merkle_store_with_retry, should_use_merkle, MerkleBatchPaymentResult, PaymentMode,
21 PreparedMerkleBatch, DEFERRED_ROUND_DELAYS_SECS,
22};
23use crate::data::client::Client;
24use crate::data::error::{Error, PartialUploadSpend, Result};
25use ant_protocol::evm::{Amount, PaymentQuote, QuoteHash, TxHash, MAX_LEAVES};
26use ant_protocol::transport::{MultiAddr, PeerId};
27use ant_protocol::{compute_address, DATA_TYPE_CHUNK};
28use bytes::Bytes;
29use fs2::FileExt;
30use futures::stream::{self, StreamExt};
31use self_encryption::{
32 get_root_data_map_parallel, stream_decrypt_batch_size, stream_encrypt,
33 streaming_decrypt_with_batch_size, DataMap,
34};
35use std::collections::{HashMap, HashSet};
36use std::io::Write;
37use std::num::NonZeroUsize;
38use std::path::{Path, PathBuf};
39use std::sync::{Arc, Mutex};
40use tokio::runtime::Handle;
41use tokio::sync::mpsc;
42use tracing::{debug, info, warn};
43use xor_name::XorName;
44
45/// Progress events emitted during file upload for UI feedback.
46#[derive(Debug, Clone)]
47pub enum UploadEvent {
48 /// A chunk has been encrypted and spilled to disk.
49 Encrypting { chunks_done: usize },
50 /// File encryption complete.
51 Encrypted { total_chunks: usize },
52 /// Starting quote collection for a wave.
53 QuotingChunks {
54 wave: usize,
55 total_waves: usize,
56 chunks_in_wave: usize,
57 },
58 /// A chunk has been quoted (peer discovery + price received).
59 /// This is the slow phase — each quote involves network round-trips.
60 ChunkQuoted { quoted: usize, total: usize },
61 /// A chunk has been stored on the network.
62 ChunkStored { stored: usize, total: usize },
63 /// A wave has completed.
64 WaveComplete {
65 wave: usize,
66 total_waves: usize,
67 stored_so_far: usize,
68 total: usize,
69 },
70}
71
72/// Progress events emitted during file download for UI feedback.
73#[derive(Debug, Clone)]
74pub enum DownloadEvent {
75 /// Resolving hierarchical DataMap to discover real chunk count.
76 ResolvingDataMap { total_map_chunks: usize },
77 /// A DataMap chunk has been fetched during resolution.
78 MapChunkFetched { fetched: usize },
79 /// DataMap resolved — total data chunk count now known.
80 DataMapResolved { total_chunks: usize },
81 /// Data chunks are being fetched from the network.
82 ChunksFetched { fetched: usize, total: usize },
83}
84
85/// One entry in the per-chunk quote list returned by
86/// [`Client::get_store_quotes`]: the responding peer, its addresses, the
87/// signed quote it returned, and the payment amount it is demanding.
88type QuoteEntry = (PeerId, Vec<MultiAddr>, PaymentQuote, Amount);
89
90/// Number of chunks per upload wave (matches batch.rs PAYMENT_WAVE_SIZE).
91const UPLOAD_WAVE_SIZE: usize = 64;
92
93/// Stream decrypt batches should be larger than fetch fan-out so
94/// the rolling fetch scheduler can keep launching new chunk GETs as earlier
95/// ones complete, instead of stopping at each self-encryption batch boundary.
96const DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER: usize = 4;
97
98/// Use at most this fraction of currently usable RAM for one decrypt batch.
99const DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR: u64 = 4;
100
101/// A decrypt batch briefly holds encrypted chunk bytes, decrypted chunk bytes,
102/// and Vec/Bytes overhead. Use a conservative multiplier rather than assuming
103/// payload bytes alone.
104const DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER: u64 = 3;
105
106/// Maximum number of distinct chunk addresses to sample when probing for a
107/// representative quote in [`Client::estimate_upload_cost`].
108///
109/// Bounded small so we never spend more than a couple of round-trips on the
110/// `AlreadyStored` retry path, which only matters when many leading chunks
111/// of a file already live on the network.
112const ESTIMATE_SAMPLE_CAP: usize = 5;
113
114/// Pick up to `cap` chunk indices spread evenly across `[0, total)`, always
115/// including the first and last chunk.
116///
117/// Sampling the *first* N chunks biases the probe: a file sharing a leading
118/// prefix with a prior upload (compressed archives, similar headers) reports
119/// those chunks as `AlreadyStored` even when the tail is new, so a positional
120/// sample looks in the worst possible place. Spreading the sample means a
121/// single new chunk anywhere in the file yields a real price.
122///
123/// Returns `[0]` for a single chunk and every index when `total <= cap`, so
124/// [`Client::estimate_upload_cost`] can still detect the "whole file sampled"
125/// case. Indices are strictly increasing.
126fn distributed_sample_indices(total: usize, cap: usize) -> Vec<usize> {
127 if total == 0 {
128 return Vec::new();
129 }
130 let sample_limit = total.min(cap);
131 if sample_limit <= 1 {
132 return vec![0];
133 }
134 let mut indices: Vec<usize> = (0..sample_limit)
135 .map(|i| i * (total - 1) / (sample_limit - 1))
136 .collect();
137 indices.dedup(); // defensive: already strictly increasing for cap >= 2
138 indices
139}
140
141/// Gas used by one `pay_for_quotes` transaction that packs up to
142/// `UPLOAD_WAVE_SIZE` (quote_hash, rewards_address, amount) entries.
143///
144/// `batch_pay` in `batch.rs` flattens every chunk's close-group quotes into a
145/// single EVM call, so the dominant cost is the SSTOREs for each entry plus
146/// the base tx overhead. On Arbitrum that is roughly
147/// `21_000 + 64 × (20_000 + small)` ≈ 1.3M; we round up to 1.5M as a
148/// conservative per-wave upper bound.
149const GAS_PER_WAVE_TX: u128 = 1_500_000;
150
151/// Gas used by one merkle batch payment transaction.
152///
153/// One on-chain tx per merkle sub-batch, but each tx verifies a merkle tree
154/// and posts a pool commitment, so budget higher than a plain transfer.
155const GAS_PER_MERKLE_TX: u128 = 500_000;
156
157/// Advisory gas price (wei/gas) used to turn the gas estimate into an ETH
158/// figure when no live gas oracle is consulted.
159///
160/// Arbitrum One typically settles around 0.1 gwei on quiet blocks; we use
161/// that as the default so the CLI prints a sensible order-of-magnitude
162/// number. Users should treat the reported gas cost as an estimate, not a
163/// commitment — real gas is bid at submission time.
164const ARBITRUM_GAS_PRICE_WEI: u128 = 100_000_000;
165
166/// Extra headroom percentage for disk space check.
167///
168/// Encrypted chunks are slightly larger than the source data due to padding
169/// and self-encryption overhead. We require file_size + 10% free space in
170/// the temp directory to account for this.
171const DISK_SPACE_HEADROOM_PERCENT: u64 = 10;
172
173/// Temporary on-disk buffer for encrypted chunks.
174///
175/// During file encryption, chunks are written to a temp directory so that
176/// only their 32-byte addresses stay in memory. At upload time chunks are
177/// read back one wave at a time, keeping peak RAM at ~`UPLOAD_WAVE_SIZE × 4 MB`.
178/// Grace period (in seconds) before a spill dir is eligible for stale cleanup.
179///
180/// This is a small TOCTOU guard covering the sub-millisecond window inside
181/// [`ChunkSpill::new`] between `create_dir` and `try_lock_exclusive`. Once a
182/// dir is older than this and its lockfile is releasable, the owning process
183/// is gone and the dir is safe to reap — regardless of how old it is.
184///
185/// The previous policy waited 24 h before reaping any orphan, which meant
186/// that any non-graceful exit (SIGKILL, kernel OOM, panic abort) leaked its
187/// spill dir until the next day's upload — and on a host being restart-looped
188/// by systemd, orphans could fill the disk well within that window.
189const SPILL_STALE_GRACE_SECS: u64 = 30;
190
191/// Prefix for spill directory names to distinguish from user files.
192const SPILL_DIR_PREFIX: &str = "spill_";
193
194/// Lockfile name inside each spill dir to signal active use.
195const SPILL_LOCK_NAME: &str = ".lock";
196
197struct ChunkSpill {
198 /// Directory holding spilled chunk files (named by hex address).
199 dir: PathBuf,
200 /// Lockfile held for the lifetime of this spill (prevents stale cleanup).
201 _lock: std::fs::File,
202 /// Deduplicated list of chunk addresses.
203 addresses: Vec<[u8; 32]>,
204 /// Tracks seen addresses for deduplication.
205 seen: HashSet<[u8; 32]>,
206 /// Byte size per spilled chunk address.
207 sizes: HashMap<[u8; 32], u64>,
208 /// Running total of unique chunk byte sizes (for average-size calculation).
209 total_bytes: u64,
210}
211
212impl ChunkSpill {
213 /// Return the parent directory for all spill dirs: `<data_dir>/spill/`.
214 fn spill_root() -> Result<PathBuf> {
215 use crate::config;
216 let root = config::data_dir()
217 .map_err(|e| Error::Config(format!("cannot determine data dir for spill: {e}")))?
218 .join("spill");
219 Ok(root)
220 }
221
222 /// Create a new spill directory under `<data_dir>/spill/`.
223 ///
224 /// Directory name is `spill_<timestamp>_<random>` so orphans can be
225 /// identified by prefix and cleaned up by age. A lockfile inside the
226 /// dir prevents concurrent cleanup from deleting an active spill.
227 fn new() -> Result<Self> {
228 let root = Self::spill_root()?;
229 std::fs::create_dir_all(&root)?;
230
231 // Clean up stale spill dirs from previous crashed runs.
232 Self::cleanup_stale(&root);
233
234 let now = std::time::SystemTime::now()
235 .duration_since(std::time::UNIX_EPOCH)
236 .unwrap_or_default()
237 .as_secs();
238 let unique: u64 = rand::random();
239 let dir = root.join(format!("{SPILL_DIR_PREFIX}{now}_{unique}"));
240 std::fs::create_dir(&dir)?;
241
242 // Create and hold a lockfile for the lifetime of this spill.
243 // cleanup_stale() will skip dirs with locked files.
244 let lock_path = dir.join(SPILL_LOCK_NAME);
245 let lock_file = std::fs::File::create(&lock_path).map_err(|e| {
246 Error::Io(std::io::Error::new(
247 e.kind(),
248 format!("failed to create spill lockfile: {e}"),
249 ))
250 })?;
251 lock_file.try_lock_exclusive().map_err(|e| {
252 Error::Io(std::io::Error::new(
253 e.kind(),
254 format!("failed to lock spill lockfile: {e}"),
255 ))
256 })?;
257
258 Ok(Self {
259 dir,
260 _lock: lock_file,
261 addresses: Vec::new(),
262 seen: HashSet::new(),
263 sizes: HashMap::new(),
264 total_bytes: 0,
265 })
266 }
267
268 /// Clean up stale spill directories. Best-effort, errors are logged.
269 ///
270 /// A spill dir is reaped when:
271 /// 1. Its name starts with `SPILL_DIR_PREFIX` (ignores unrelated files)
272 /// 2. It is an actual directory, not a symlink (prevents symlink attacks)
273 /// 3. Its timestamp is older than `SPILL_STALE_GRACE_SECS` (TOCTOU guard)
274 /// 4. Its lockfile is releasable — i.e. no live process holds it
275 ///
276 /// The lockfile is the primary correctness gate: a releasable lock means
277 /// the owning `ChunkSpill` has been dropped or the process is gone, so
278 /// the dir is fair game. The grace period covers only the brief window
279 /// inside [`Self::new`] between `create_dir` and `try_lock_exclusive`.
280 ///
281 /// Safe to call concurrently from multiple processes.
282 fn cleanup_stale(root: &Path) {
283 let now = std::time::SystemTime::now()
284 .duration_since(std::time::UNIX_EPOCH)
285 .unwrap_or_default()
286 .as_secs();
287
288 if now == 0 {
289 // Clock is broken (before Unix epoch). Skip cleanup to avoid
290 // misidentifying dirs as stale.
291 warn!("System clock before Unix epoch, skipping spill cleanup");
292 return;
293 }
294
295 let entries = match std::fs::read_dir(root) {
296 Ok(entries) => entries,
297 Err(_) => return,
298 };
299
300 for entry in entries.flatten() {
301 let name = entry.file_name();
302 let name_str = name.to_string_lossy();
303
304 // Only process dirs with our prefix.
305 let suffix = match name_str.strip_prefix(SPILL_DIR_PREFIX) {
306 Some(s) => s,
307 None => continue,
308 };
309
310 // Parse timestamp: "spill_<timestamp>_<random>"
311 let timestamp: u64 = match suffix.split('_').next().and_then(|s| s.parse().ok()) {
312 Some(ts) => ts,
313 None => continue,
314 };
315
316 if now.saturating_sub(timestamp) < SPILL_STALE_GRACE_SECS {
317 continue;
318 }
319
320 // Safety: only delete actual directories, not symlinks.
321 let file_type = match entry.file_type() {
322 Ok(ft) => ft,
323 Err(_) => continue,
324 };
325 if !file_type.is_dir() {
326 continue;
327 }
328
329 let path = entry.path();
330
331 // Check lockfile: if locked, the dir is in active use -- skip it.
332 let lock_path = path.join(SPILL_LOCK_NAME);
333 if let Ok(lock_file) = std::fs::File::open(&lock_path) {
334 use fs2::FileExt;
335 if lock_file.try_lock_exclusive().is_err() {
336 // Lock held by another process -- dir is active.
337 debug!("Skipping active spill dir: {}", path.display());
338 continue;
339 }
340 // We acquired the lock, so no one else holds it.
341 // Drop it before deleting.
342 drop(lock_file);
343 }
344
345 info!("Cleaning up stale spill dir: {}", path.display());
346 if let Err(e) = std::fs::remove_dir_all(&path) {
347 warn!("Failed to clean up stale spill dir {}: {e}", path.display());
348 }
349 }
350 }
351
352 /// Run stale spill cleanup. Call at client startup or periodically.
353 #[allow(dead_code)]
354 pub(crate) fn run_cleanup() {
355 if let Ok(root) = Self::spill_root() {
356 Self::cleanup_stale(&root);
357 }
358 }
359
360 /// Write one encrypted chunk to disk and record its address.
361 ///
362 /// Deduplicates by content address: if the same chunk was already
363 /// spilled, the write and accounting are skipped. This prevents
364 /// double-uploads and inflated quoting metrics.
365 fn push(&mut self, content: &[u8]) -> Result<()> {
366 let address = compute_address(content);
367 if !self.seen.insert(address) {
368 return Ok(());
369 }
370 let path = self.dir.join(hex::encode(address));
371 std::fs::write(&path, content)?;
372 let content_len = content.len() as u64;
373 self.sizes.insert(address, content_len);
374 self.total_bytes += content_len;
375 self.addresses.push(address);
376 Ok(())
377 }
378
379 /// Number of chunks stored.
380 fn len(&self) -> usize {
381 self.addresses.len()
382 }
383
384 /// Total bytes of all spilled chunks.
385 fn total_bytes(&self) -> u64 {
386 self.total_bytes
387 }
388
389 /// Address and byte-size pairs for all spilled chunks.
390 fn chunk_entries(&self) -> Result<Vec<([u8; 32], u64)>> {
391 self.addresses
392 .iter()
393 .map(|address| {
394 self.sizes
395 .get(address)
396 .copied()
397 .map(|size| (*address, size))
398 .ok_or_else(|| {
399 Error::Storage(format!(
400 "missing size for spilled chunk {}",
401 hex::encode(address)
402 ))
403 })
404 })
405 .collect()
406 }
407
408 /// Read a single chunk back from disk by address.
409 fn read_chunk(&self, address: &[u8; 32]) -> Result<Bytes> {
410 let path = self.dir.join(hex::encode(address));
411 let data = std::fs::read(&path).map_err(|e| {
412 Error::Io(std::io::Error::new(
413 e.kind(),
414 format!("reading spilled chunk {}: {e}", hex::encode(address)),
415 ))
416 })?;
417 Ok(Bytes::from(data))
418 }
419
420 /// Read a wave of chunks from disk.
421 fn read_wave(&self, wave_addrs: &[[u8; 32]]) -> Result<Vec<(Bytes, [u8; 32])>> {
422 let mut out = Vec::with_capacity(wave_addrs.len());
423 for addr in wave_addrs {
424 let content = self.read_chunk(addr)?;
425 out.push((content, *addr));
426 }
427 Ok(out)
428 }
429
430 /// Clean up the spill directory.
431 fn cleanup(&self) {
432 if let Err(e) = std::fs::remove_dir_all(&self.dir) {
433 warn!(
434 "Failed to clean up chunk spill dir {}: {e}",
435 self.dir.display()
436 );
437 }
438 }
439}
440
441impl Drop for ChunkSpill {
442 fn drop(&mut self) {
443 self.cleanup();
444 }
445}
446
447fn cached_merkle_covers_addresses(
448 cached: &MerkleBatchPaymentResult,
449 addresses: &[[u8; 32]],
450) -> bool {
451 addresses
452 .iter()
453 .all(|addr| cached.proofs.contains_key(addr))
454}
455
456/// Split `addresses` into `(to_store, missing_proof)`: those that have a merkle
457/// proof in `proofs`, and those that don't.
458///
459/// A partial [`MerkleBatchPaymentResult`] (from a `pay_for_merkle_multi_batch`
460/// where a later sub-batch's payment failed) carries proofs only for the
461/// already-paid sub-batches, so unpaid chunks reach the upload path with no
462/// proof. `upload_waves_merkle` reports those as failed via
463/// [`Error::PartialUpload`] rather than aborting the whole file. Order within
464/// each group follows `addresses`.
465fn partition_addresses_by_proof(
466 addresses: &[[u8; 32]],
467 proofs: &HashMap<[u8; 32], Vec<u8>>,
468) -> (Vec<[u8; 32]>, Vec<[u8; 32]>) {
469 addresses
470 .iter()
471 .copied()
472 .partition(|addr| proofs.contains_key(addr))
473}
474
475/// Build a `PartialUpload` after a fatal merkle store error, with accurate
476/// counts.
477///
478/// A fatal abort can leave chunks in three states: confirmed stored (in
479/// `stored_addresses`), known-failed (in `known_failed` — missing proofs, the
480/// quorum shortfalls and the fatal chunk seen so far), and "in flight when the
481/// abort hit" (neither). Rather than trust the helpers to enumerate the last
482/// group, this derives the failed set authoritatively as *every* `addresses`
483/// entry not in `stored_addresses`, preferring a known per-chunk message and
484/// falling back to the fatal `reason`. That guarantees
485/// `stored_count + failed_count` accounts for the whole file — fixing the
486/// under-reporting where a fatal wave could surface `failed_count = 0` and omit
487/// same-pass successes.
488fn partial_upload_after_fatal(
489 addresses: &[[u8; 32]],
490 stored_addresses: Vec<[u8; 32]>,
491 stored_count: usize,
492 total_chunks: usize,
493 known_failed: Vec<([u8; 32], String)>,
494 spend: PartialUploadSpend,
495 reason: String,
496) -> Error {
497 let stored_set: HashSet<[u8; 32]> = stored_addresses.iter().copied().collect();
498 let mut failed_map: HashMap<[u8; 32], String> = HashMap::new();
499 for (addr, msg) in known_failed {
500 if !stored_set.contains(&addr) {
501 failed_map.entry(addr).or_insert(msg);
502 }
503 }
504 for addr in addresses {
505 if !stored_set.contains(addr) {
506 failed_map.entry(*addr).or_insert_with(|| reason.clone());
507 }
508 }
509 let failed: Vec<([u8; 32], String)> = failed_map.into_iter().collect();
510 let failed_count = failed.len();
511 Error::PartialUpload {
512 stored: stored_addresses,
513 stored_count,
514 failed,
515 failed_count,
516 total_chunks,
517 spend: Box::new(spend),
518 reason,
519 }
520}
521
522/// One wave's contribution to a single-node upload, distilled from its
523/// `batch_upload_chunks_with_events` result.
524#[derive(Debug)]
525struct SingleWaveOutcome {
526 /// Addresses confirmed stored in this wave.
527 stored: Vec<[u8; 32]>,
528 /// Chunks that failed after retries in this wave.
529 failed: Vec<([u8; 32], String)>,
530 /// Storage cost paid on-chain for this wave, in atto-tokens.
531 storage_atto: Amount,
532 /// Gas paid on-chain for this wave, in wei.
533 gas_wei: u128,
534 /// Per-wave store/retry statistics. Empty for a quorum-short wave, whose
535 /// `PartialUpload` carries no stats.
536 stats: WaveAggregateStats,
537}
538
539/// Fold one wave's batch-upload result for the single-node path.
540///
541/// A `PartialUpload` (chunks short of quorum after retries) is **recoverable**:
542/// its stored/failed chunks and on-chain spend are returned so the caller
543/// records them and continues to the next wave, making the file make maximum
544/// progress exactly like `upload_waves_merkle`. Every other error is **fatal**
545/// (wallet/payment-infrastructure failures, missing proofs, spill reads) and is
546/// returned via `Err` to abort the file. Because `UPLOAD_WAVE_SIZE ==
547/// PAYMENT_WAVE_SIZE`, each batch call is exactly one payment wave, so folding a
548/// `PartialUpload` leaves nothing un-attempted within the wave.
549fn fold_single_wave(
550 result: Result<(Vec<[u8; 32]>, String, u128, WaveAggregateStats)>,
551) -> Result<SingleWaveOutcome> {
552 match result {
553 Ok((stored, storage, gas, stats)) => Ok(SingleWaveOutcome {
554 stored,
555 failed: Vec::new(),
556 storage_atto: storage.parse().unwrap_or(Amount::ZERO),
557 gas_wei: gas,
558 stats,
559 }),
560 Err(Error::PartialUpload {
561 stored,
562 failed,
563 spend,
564 ..
565 }) => Ok(SingleWaveOutcome {
566 stored,
567 failed,
568 storage_atto: spend.storage_cost_atto.parse().unwrap_or(Amount::ZERO),
569 gas_wei: spend.gas_cost_wei,
570 stats: WaveAggregateStats::default(),
571 }),
572 Err(e) => Err(e),
573 }
574}
575
576/// Check that the spill directory has enough free space for the spilled chunks.
577///
578/// `file_size` is the source file's byte count. We require
579/// `file_size + 10%` free space to account for self-encryption overhead.
580fn check_disk_space_for_spill(file_size: u64) -> Result<()> {
581 let spill_root = ChunkSpill::spill_root()?;
582
583 // Ensure the root exists so fs2 can query it.
584 std::fs::create_dir_all(&spill_root)?;
585
586 let available = fs2::available_space(&spill_root).map_err(|e| {
587 Error::Io(std::io::Error::new(
588 e.kind(),
589 format!(
590 "failed to query disk space on {}: {e}",
591 spill_root.display()
592 ),
593 ))
594 })?;
595
596 // Use integer arithmetic to avoid f64 precision loss on large file sizes.
597 let headroom = file_size / DISK_SPACE_HEADROOM_PERCENT;
598 let required = file_size.saturating_add(headroom);
599
600 if available < required {
601 let avail_mb = available / (1024 * 1024);
602 let req_mb = required / (1024 * 1024);
603 return Err(Error::InsufficientDiskSpace(format!(
604 "need ~{req_mb} MB in spill dir ({}) but only {avail_mb} MB available",
605 spill_root.display()
606 )));
607 }
608
609 debug!(
610 "Disk space check passed: {available} bytes available, {required} bytes required (spill: {})",
611 spill_root.display()
612 );
613 Ok(())
614}
615
616fn usable_memory_bytes() -> Option<u64> {
617 let mut system = sysinfo::System::new();
618 system.refresh_memory();
619
620 let available_memory = system.available_memory();
621 let free_memory = system.free_memory();
622 let used_memory = system.used_memory();
623 let total_memory = system.total_memory();
624 let unused_memory = total_memory.saturating_sub(used_memory);
625
626 let mut usable = [available_memory, free_memory, unused_memory]
627 .into_iter()
628 .filter(|bytes| *bytes > 0)
629 .max();
630
631 let cgroup_free_memory = system
632 .cgroup_limits()
633 .filter(|limits| limits.total_memory > 0)
634 .map(|limits| limits.free_memory);
635 if let Some(cgroup_free_memory) = cgroup_free_memory {
636 usable = Some(usable.unwrap_or(u64::MAX).min(cgroup_free_memory));
637 }
638
639 debug!(
640 available_memory,
641 free_memory,
642 used_memory,
643 total_memory,
644 cgroup_free_memory,
645 usable_memory = ?usable,
646 "Detected usable memory for stream decrypt batch sizing"
647 );
648
649 usable
650}
651
652fn stream_decrypt_batch_memory_cap(usable_memory_bytes: u64) -> usize {
653 let budget = usable_memory_bytes / DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR;
654 let estimated_bytes_per_chunk = (self_encryption::MAX_CHUNK_SIZE as u64)
655 .saturating_mul(DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER)
656 .max(1);
657 let cap = (budget / estimated_bytes_per_chunk).max(1);
658
659 usize::try_from(cap).unwrap_or(usize::MAX)
660}
661
662fn adaptive_stream_decrypt_batch_size(
663 total_chunks: usize,
664 fetch_cap: usize,
665 configured_batch_floor: usize,
666 usable_memory_bytes: Option<u64>,
667) -> usize {
668 let fetch_target = fetch_cap
669 .max(1)
670 .saturating_mul(DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER);
671 let requested = match usable_memory_bytes {
672 Some(bytes) => {
673 let memory_cap = stream_decrypt_batch_memory_cap(bytes);
674 configured_batch_floor
675 .max(fetch_target)
676 .max(1)
677 .min(memory_cap)
678 }
679 None => configured_batch_floor.max(1),
680 };
681
682 requested.min(total_chunks.max(1)).max(1)
683}
684
685/// Whether the data map is published to the network for address-based retrieval.
686///
687/// A private upload stores only the data chunks and returns the `DataMap` to
688/// the caller — only someone holding that `DataMap` can reconstruct the file.
689/// A public upload additionally stores the serialized `DataMap` as a chunk on
690/// the network, yielding a single chunk address that anyone can use to
691/// retrieve the `DataMap` (via [`Client::data_map_fetch`]) and then the file.
692#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
693pub enum Visibility {
694 /// Keep the data map local; only the holder can retrieve the file.
695 #[default]
696 Private,
697 /// Publish the data map as a network chunk so anyone with the returned
698 /// address can retrieve and decrypt the file.
699 Public,
700}
701
702/// Confidence attached to an [`UploadCostEstimate`]'s `storage_cost_atto`.
703///
704/// `estimate_upload_cost` prices a file by sampling a few of its chunk
705/// addresses and extrapolating. When every sampled chunk is already stored
706/// there is no live price to extrapolate from, so a `"0"` cost can mean either
707/// "provably free" (the whole file was sampled) or only "probably free" (the
708/// tail was unsampled). This lets callers tell those apart instead of treating
709/// every `"0"` as unconditionally free.
710#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, serde::Serialize, serde::Deserialize)]
711#[serde(rename_all = "snake_case")]
712pub enum CostEstimateConfidence {
713 /// At least one sampled chunk returned a live quote; `storage_cost_atto`
714 /// is extrapolated from a real per-chunk price. The normal case.
715 #[default]
716 PricedSample,
717 /// Every chunk in the file was sampled and every one was already stored.
718 /// `storage_cost_atto` is exactly `"0"` — the upload is genuinely free.
719 VerifiedAllAlreadyStored,
720 /// Every *sampled* chunk was already stored, but not all chunks were
721 /// sampled. `storage_cost_atto` is `"0"` as a best-effort guess; the real
722 /// upload reconciles the true cost at payment time. Render this as "likely
723 /// already stored", not a guaranteed-free price.
724 AllSamplesAlreadyStoredIncomplete,
725}
726
727/// Estimated cost of uploading a file, returned by
728/// [`Client::estimate_upload_cost`].
729///
730/// Marked `#[non_exhaustive]` so adding a field later is not a breaking change
731/// for downstream consumers that construct or pattern-match on this struct.
732#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
733#[non_exhaustive]
734pub struct UploadCostEstimate {
735 /// Original file size in bytes.
736 pub file_size: u64,
737 /// Number of chunks the file would be split into (data chunks only,
738 /// does not include the DataMap chunk added during public uploads).
739 pub chunk_count: usize,
740 /// Estimated total storage cost in atto (token smallest unit).
741 pub storage_cost_atto: String,
742 /// Estimated gas cost in wei as a string. This is a rough heuristic
743 /// based on chunk count and payment mode, NOT a live gas price query.
744 pub estimated_gas_cost_wei: String,
745 /// Payment mode that would be used.
746 pub payment_mode: PaymentMode,
747 /// How much to trust `storage_cost_atto`. See [`CostEstimateConfidence`].
748 #[serde(default)]
749 pub confidence: CostEstimateConfidence,
750}
751
752/// Result of a file upload: the `DataMap` needed to retrieve the file.
753///
754/// Marked `#[non_exhaustive]` so adding a new field in future is not a
755/// breaking change for downstream consumers that construct or pattern-match
756/// on this struct.
757#[derive(Debug, Clone)]
758#[non_exhaustive]
759pub struct FileUploadResult {
760 /// The data map containing chunk metadata for reconstruction.
761 pub data_map: DataMap,
762 /// Number of chunks stored on the network.
763 pub chunks_stored: usize,
764 /// Number of chunks that failed to store. Always 0 for a successful
765 /// upload — partial-failure information is conveyed via
766 /// [`crate::data::Error::PartialUpload`] instead.
767 pub chunks_failed: usize,
768 /// Total number of chunks in the upload, including chunks that were
769 /// already stored and skipped. On full success this equals `chunks_stored`.
770 pub total_chunks: usize,
771 /// Which payment mode was actually used (not just requested).
772 pub payment_mode_used: PaymentMode,
773 /// Total storage cost paid in token units (atto). "0" if all chunks already existed.
774 pub storage_cost_atto: String,
775 /// Total gas cost in wei. 0 if no on-chain transactions were made.
776 pub gas_cost_wei: u128,
777 /// Chunk address of the serialized `DataMap`, set only for
778 /// [`Visibility::Public`] uploads. **`Some` means this address is
779 /// retrievable from the network (via [`Client::data_map_fetch`])**, not
780 /// necessarily that *this* upload paid to store it — if the serialized
781 /// `DataMap` hashed to a chunk that was already on the network (same
782 /// file uploaded before; deterministic via self-encryption), the address
783 /// is still returned but no storage payment was made for it.
784 pub data_map_address: Option<[u8; 32]>,
785 /// Sum of chunk-store RPC attempts across the upload
786 /// (`>= chunks_stored` on full success; more if any chunk retried).
787 /// `0` for paths that don't run the wave store loop.
788 pub chunk_attempts_total: usize,
789 /// Per-chunk store wall-clock in ms (length == `chunks_stored` on full
790 /// success, empty for paths that don't run the wave store loop).
791 pub store_durations_ms: Vec<u64>,
792 /// Count of stored chunks that succeeded on each retry round
793 /// (index 0 = first attempt, 1 = first retry, etc.). All zeros for
794 /// paths that don't run the wave store loop.
795 pub retries_histogram: [usize; 4],
796}
797
798/// Payment information for external signing — either wave-batch or merkle.
799#[derive(Debug)]
800pub enum ExternalPaymentInfo {
801 /// Wave-batch: individual (quote_hash, rewards_address, amount) tuples.
802 WaveBatch {
803 /// Chunks ready for payment (needed for finalize).
804 prepared_chunks: Vec<PreparedChunk>,
805 /// Payment intent for external signing.
806 payment_intent: PaymentIntent,
807 },
808 /// Merkle: single on-chain call with depth, pool commitments, timestamp.
809 Merkle {
810 /// The prepared merkle batch (public fields sent to frontend, private fields stay in Rust).
811 prepared_batch: PreparedMerkleBatch,
812 /// Raw chunk contents that still need upload after the preflight check.
813 chunk_contents: Vec<Bytes>,
814 /// Chunk addresses that still need upload after the preflight check.
815 chunk_addresses: Vec<[u8; 32]>,
816 },
817}
818
819/// Prepared upload ready for external payment.
820///
821/// Contains everything needed to construct the on-chain payment transaction
822/// externally (e.g. via WalletConnect in a desktop app) and then finalize
823/// the upload without a Rust-side wallet.
824///
825/// Note: This struct stays in Rust memory — only the public fields of
826/// `payment_info` are sent to the frontend. `PreparedChunk` contains
827/// non-serializable network types, so the full struct cannot derive `Serialize`.
828///
829/// Marked `#[non_exhaustive]` so adding a new field in future is not a
830/// breaking change for downstream consumers.
831#[derive(Debug)]
832#[non_exhaustive]
833pub struct PreparedUpload {
834 /// The data map for later retrieval.
835 pub data_map: DataMap,
836 /// Payment information for chunks that still need payment after the
837 /// already-stored preflight. This may be wave-batch even when the original
838 /// chunk count was merkle-eligible if the remaining count is below the
839 /// merkle threshold.
840 pub payment_info: ExternalPaymentInfo,
841 /// Chunk address of the serialized `DataMap` when this upload was
842 /// prepared with [`Visibility::Public`]. `Some` means the address is
843 /// retrievable on the network after finalization — either because this
844 /// upload paid to store the chunk in `payment_info`, or because the
845 /// chunk was already on the network (deterministic self-encryption).
846 /// Carried through to [`FileUploadResult::data_map_address`].
847 pub data_map_address: Option<[u8; 32]>,
848 /// Chunk addresses already present on the network when this upload was
849 /// prepared. These do not require payment or PUT during finalization.
850 pub already_stored_addresses: Vec<[u8; 32]>,
851 /// Total chunk count for the upload, including already-stored chunks.
852 pub total_chunks: usize,
853}
854
855/// Return type for [`spawn_file_encryption`]: chunk receiver, `DataMap` oneshot, join handle.
856type EncryptionChannels = (
857 tokio::sync::mpsc::Receiver<Bytes>,
858 tokio::sync::oneshot::Receiver<DataMap>,
859 tokio::task::JoinHandle<Result<()>>,
860);
861
862/// Spawn a blocking task that streams file encryption through a channel.
863fn spawn_file_encryption(path: PathBuf) -> Result<EncryptionChannels> {
864 let metadata = std::fs::metadata(&path)?;
865 let data_size = usize::try_from(metadata.len())
866 .map_err(|e| Error::Encryption(format!("file size exceeds platform usize: {e}")))?;
867
868 let (chunk_tx, chunk_rx) = tokio::sync::mpsc::channel(2);
869 let (datamap_tx, datamap_rx) = tokio::sync::oneshot::channel();
870
871 let handle = tokio::task::spawn_blocking(move || {
872 let file = std::fs::File::open(&path)?;
873 let mut reader = std::io::BufReader::new(file);
874
875 let read_error: Arc<Mutex<Option<std::io::Error>>> = Arc::new(Mutex::new(None));
876 let read_error_clone = Arc::clone(&read_error);
877
878 let data_iter = std::iter::from_fn(move || {
879 let mut buffer = vec![0u8; 8192];
880 match std::io::Read::read(&mut reader, &mut buffer) {
881 Ok(0) => None,
882 Ok(n) => {
883 buffer.truncate(n);
884 Some(Bytes::from(buffer))
885 }
886 Err(e) => {
887 let mut guard = read_error_clone
888 .lock()
889 .unwrap_or_else(|poisoned| poisoned.into_inner());
890 *guard = Some(e);
891 None
892 }
893 }
894 });
895
896 let mut stream = stream_encrypt(data_size, data_iter)
897 .map_err(|e| Error::Encryption(format!("stream_encrypt failed: {e}")))?;
898
899 for chunk_result in stream.chunks() {
900 // Check for captured read errors immediately after each chunk.
901 // stream_encrypt sees None (EOF) when a read fails, so it stops
902 // producing chunks. We must detect this before sending the
903 // partial results to avoid uploading a truncated DataMap.
904 {
905 let guard = read_error
906 .lock()
907 .unwrap_or_else(|poisoned| poisoned.into_inner());
908 if let Some(ref e) = *guard {
909 return Err(Error::Io(std::io::Error::new(e.kind(), e.to_string())));
910 }
911 }
912
913 let (_hash, content) = chunk_result
914 .map_err(|e| Error::Encryption(format!("chunk encryption failed: {e}")))?;
915 if chunk_tx.blocking_send(content).is_err() {
916 return Err(Error::Encryption("upload receiver dropped".to_string()));
917 }
918 }
919
920 // Final check: read error after last chunk (stream saw EOF).
921 {
922 let guard = read_error
923 .lock()
924 .unwrap_or_else(|poisoned| poisoned.into_inner());
925 if let Some(ref e) = *guard {
926 return Err(Error::Io(std::io::Error::new(e.kind(), e.to_string())));
927 }
928 }
929
930 let datamap = stream
931 .into_datamap()
932 .ok_or_else(|| Error::Encryption("no DataMap after encryption".to_string()))?;
933 if datamap_tx.send(datamap).is_err() {
934 warn!("DataMap receiver dropped — upload may have been cancelled");
935 }
936 Ok(())
937 });
938
939 Ok((chunk_rx, datamap_rx, handle))
940}
941
942/// RAII guard for the staging temp file used during a disk download.
943///
944/// Removes the file on drop — including a panic unwind out of the
945/// `block_in_place` decrypt loop — unless [`commit`](Self::commit) has
946/// promoted it to its final path. Centralizes the cleanup the explicit error
947/// arms used to repeat.
948struct TempDownload {
949 /// `Some` while the staging file may need cleanup; `None` once committed.
950 path: Option<PathBuf>,
951}
952
953impl TempDownload {
954 fn new(path: PathBuf) -> Self {
955 Self { path: Some(path) }
956 }
957
958 /// Path of the staging file (valid until `commit`).
959 fn path(&self) -> &Path {
960 self.path
961 .as_deref()
962 .expect("TempDownload::path called after commit")
963 }
964
965 /// Rename the staged file to `dest`. On success the guard is defused so
966 /// `Drop` is a no-op; on failure the guard stays armed and `Drop` removes
967 /// the orphaned temp file.
968 fn commit(mut self, dest: &Path) -> std::io::Result<()> {
969 std::fs::rename(self.path(), dest)?; // err → guard armed → Drop cleans up
970 self.path = None; // success → nothing left to clean
971 Ok(())
972 }
973}
974
975impl Drop for TempDownload {
976 fn drop(&mut self) {
977 if let Some(path) = self.path.take() {
978 if let Err(e) = std::fs::remove_file(&path) {
979 // Absent file is fine (never created / already gone).
980 if e.kind() != std::io::ErrorKind::NotFound {
981 warn!(
982 "Failed to remove temp download file {}: {e}",
983 path.display()
984 );
985 }
986 }
987 }
988 }
989}
990
991impl Client {
992 /// Upload a file to the network using streaming self-encryption.
993 ///
994 /// Automatically selects merkle batch payment for files that produce
995 /// 64+ chunks (saves gas). Encrypted chunks are spilled to a temp
996 /// directory so peak memory stays at ~256 MB regardless of file size.
997 ///
998 /// # Errors
999 ///
1000 /// Returns an error if the file cannot be read, encryption fails,
1001 /// or any chunk cannot be stored.
1002 pub async fn file_upload(&self, path: &Path) -> Result<FileUploadResult> {
1003 self.file_upload_with_mode(path, PaymentMode::Auto).await
1004 }
1005
1006 /// Estimate the cost of uploading a file without actually uploading.
1007 ///
1008 /// Encrypts the file to determine chunk count and sizes, then requests
1009 /// a single quote from the network for a representative chunk. The
1010 /// per-chunk price is extrapolated to the total chunk count.
1011 ///
1012 /// The estimate is fast (~2-5s) and does not require a wallet. Spilled
1013 /// chunks are cleaned up automatically when the function returns.
1014 ///
1015 /// Gas cost is an advisory heuristic, not a live gas-oracle query. It is
1016 /// derived from realistic per-transaction budgets (`GAS_PER_WAVE_TX`,
1017 /// `GAS_PER_MERKLE_TX`) priced at `ARBITRUM_GAS_PRICE_WEI`. Real gas
1018 /// varies with network conditions.
1019 ///
1020 /// Sampled chunk addresses are spread across the whole file (not the first
1021 /// N) so a shared leading prefix doesn't bias the sample. When a sample
1022 /// returns a live quote the per-chunk price is extrapolated and the result
1023 /// is tagged [`CostEstimateConfidence::PricedSample`].
1024 ///
1025 /// When every sampled chunk is already stored the result is still `Ok`
1026 /// with `storage_cost_atto: "0"`, tagged either
1027 /// [`CostEstimateConfidence::VerifiedAllAlreadyStored`] when the whole file
1028 /// was sampled (exactly free) or
1029 /// [`CostEstimateConfidence::AllSamplesAlreadyStoredIncomplete`] when the
1030 /// tail was unsampled (a best-effort guess that payment reconciles).
1031 ///
1032 /// # Errors
1033 ///
1034 /// Returns an error if the file cannot be read, encryption fails, or the
1035 /// network cannot provide a quote.
1036 pub async fn estimate_upload_cost(
1037 &self,
1038 path: &Path,
1039 mode: PaymentMode,
1040 progress: Option<mpsc::Sender<UploadEvent>>,
1041 ) -> Result<UploadCostEstimate> {
1042 let file_size = std::fs::metadata(path).map_err(Error::Io)?.len();
1043
1044 if file_size < 3 {
1045 return Err(Error::InvalidData(
1046 "File too small: self-encryption requires at least 3 bytes".into(),
1047 ));
1048 }
1049
1050 check_disk_space_for_spill(file_size)?;
1051
1052 info!(
1053 "Estimating upload cost for {} ({file_size} bytes)",
1054 path.display()
1055 );
1056
1057 let (spill, _data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
1058 let chunk_count = spill.len();
1059
1060 if let Some(ref tx) = progress {
1061 let _ = tx
1062 .send(UploadEvent::Encrypted {
1063 total_chunks: chunk_count,
1064 })
1065 .await;
1066 }
1067
1068 info!("Encrypted into {chunk_count} chunks, requesting quote");
1069 let uses_merkle = should_use_merkle(chunk_count, mode);
1070
1071 // Sample chunk addresses spread evenly across the file (see
1072 // `distributed_sample_indices`) rather than the first N. A single
1073 // AlreadyStored result says nothing about the rest of the file, and a
1074 // positional sample lands on a shared leading prefix in the worst case,
1075 // so we spread the probe and only treat the whole file as "fully
1076 // stored" when every sample comes back stored.
1077 let sample_indices = distributed_sample_indices(spill.addresses.len(), ESTIMATE_SAMPLE_CAP);
1078 let mut sampled = 0usize;
1079 let mut all_already_stored = true;
1080 let mut quotes_opt: Option<Vec<QuoteEntry>> = None;
1081
1082 for &idx in &sample_indices {
1083 let addr = &spill.addresses[idx];
1084 sampled += 1;
1085 let chunk_bytes = spill.read_chunk(addr)?;
1086 let data_size = u64::try_from(chunk_bytes.len())
1087 .map_err(|e| Error::InvalidData(format!("chunk size too large: {e}")))?;
1088 let result = if uses_merkle {
1089 self.get_store_quotes_with_fault_tolerance(addr, data_size, DATA_TYPE_CHUNK)
1090 .await
1091 } else {
1092 self.get_store_quotes(addr, data_size, DATA_TYPE_CHUNK)
1093 .await
1094 };
1095 match result {
1096 Ok(q) => {
1097 quotes_opt = Some(q);
1098 all_already_stored = false;
1099 break;
1100 }
1101 Err(Error::AlreadyStored) => {
1102 debug!(
1103 "Sample chunk {} already stored; trying next address ({sampled}/{})",
1104 hex::encode(addr),
1105 sample_indices.len()
1106 );
1107 continue;
1108 }
1109 Err(e) => return Err(e),
1110 }
1111 }
1112
1113 let quotes = match quotes_opt {
1114 Some(q) => q,
1115 None if all_already_stored && sampled == chunk_count => {
1116 // Every address in the file was sampled and every one is
1117 // already on the network — a zero-cost estimate is exact here.
1118 info!("All {chunk_count} chunks already stored; returning zero-cost estimate");
1119 return Ok(UploadCostEstimate {
1120 file_size,
1121 chunk_count,
1122 storage_cost_atto: "0".into(),
1123 estimated_gas_cost_wei: "0".into(),
1124 payment_mode: if uses_merkle {
1125 PaymentMode::Merkle
1126 } else {
1127 PaymentMode::Single
1128 },
1129 confidence: CostEstimateConfidence::VerifiedAllAlreadyStored,
1130 });
1131 }
1132 None => {
1133 // Every sampled chunk was already stored but the tail was not
1134 // sampled, so there is no live price to extrapolate. The
1135 // estimate is display-only and payment reconciles the true
1136 // cost, so return an optimistic zero flagged as incomplete
1137 // rather than erroring — callers still get a value to show.
1138 info!(
1139 "All {sampled}/{chunk_count} sampled chunks already stored; \
1140 returning incomplete zero-cost estimate"
1141 );
1142 return Ok(UploadCostEstimate {
1143 file_size,
1144 chunk_count,
1145 storage_cost_atto: "0".into(),
1146 estimated_gas_cost_wei: "0".into(),
1147 payment_mode: if uses_merkle {
1148 PaymentMode::Merkle
1149 } else {
1150 PaymentMode::Single
1151 },
1152 confidence: CostEstimateConfidence::AllSamplesAlreadyStoredIncomplete,
1153 });
1154 }
1155 };
1156
1157 // Use the median price × 3 (matches SingleNodePayment::from_quotes
1158 // which pays 3x the median to incentivize reliable storage).
1159 let mut prices: Vec<Amount> = quotes.iter().map(|(_, _, _, price)| *price).collect();
1160 prices.sort();
1161 let median_price = prices
1162 .get(prices.len() / 2)
1163 .copied()
1164 .unwrap_or(Amount::ZERO);
1165 let per_chunk_cost = median_price * Amount::from(3u64);
1166
1167 let chunk_count_u64 = u64::try_from(chunk_count).unwrap_or(u64::MAX);
1168 let total_storage = per_chunk_cost * Amount::from(chunk_count_u64);
1169
1170 // Estimate gas cost from realistic per-transaction budgets rather
1171 // than a flat per-chunk or per-wave number.
1172 //
1173 // - Single mode: `batch_pay` packs up to UPLOAD_WAVE_SIZE chunks'
1174 // close-group quotes into one `pay_for_quotes` call on Arbitrum.
1175 // The dominant cost is one SSTORE per entry plus base tx overhead,
1176 // so we use GAS_PER_WAVE_TX (≈1.5M) as a conservative upper bound
1177 // on a full wave and multiply by the number of waves. The previous
1178 // per-wave figure of 150k was closer to a single-entry transfer
1179 // and understated cost by 5–10x for full waves.
1180 // - Merkle mode: one tx per sub-batch that verifies a merkle tree
1181 // and posts a pool commitment (GAS_PER_MERKLE_TX ≈ 500k each).
1182 //
1183 // Gas is priced at ARBITRUM_GAS_PRICE_WEI (~0.1 gwei, a typical
1184 // Arbitrum baseline). Treat the result as advisory, not a commitment.
1185 let waves = u128::try_from(chunk_count.div_ceil(UPLOAD_WAVE_SIZE)).unwrap_or(u128::MAX);
1186 let merkle_batches = u128::try_from(chunk_count.div_ceil(MAX_LEAVES)).unwrap_or(u128::MAX);
1187 let estimated_gas: u128 = if uses_merkle {
1188 merkle_batches
1189 .saturating_mul(GAS_PER_MERKLE_TX)
1190 .saturating_mul(ARBITRUM_GAS_PRICE_WEI)
1191 } else {
1192 waves
1193 .saturating_mul(GAS_PER_WAVE_TX)
1194 .saturating_mul(ARBITRUM_GAS_PRICE_WEI)
1195 };
1196
1197 info!(
1198 "Estimate: {chunk_count} chunks, storage={total_storage} atto, gas~={estimated_gas} wei"
1199 );
1200
1201 Ok(UploadCostEstimate {
1202 file_size,
1203 chunk_count,
1204 storage_cost_atto: total_storage.to_string(),
1205 estimated_gas_cost_wei: estimated_gas.to_string(),
1206 payment_mode: if uses_merkle {
1207 PaymentMode::Merkle
1208 } else {
1209 PaymentMode::Single
1210 },
1211 confidence: CostEstimateConfidence::PricedSample,
1212 })
1213 }
1214
1215 /// Phase 1 of external-signer upload: encrypt file and prepare chunks.
1216 ///
1217 /// Equivalent to [`Client::file_prepare_upload_with_visibility`] with
1218 /// [`Visibility::Private`] — see that method for details.
1219 pub async fn file_prepare_upload(&self, path: &Path) -> Result<PreparedUpload> {
1220 self.file_prepare_upload_with_progress(path, Visibility::Private, None)
1221 .await
1222 }
1223
1224 /// Phase 1 of external-signer upload with explicit [`Visibility`] control.
1225 ///
1226 /// Equivalent to [`Client::file_prepare_upload_with_progress`] with
1227 /// `progress: None` — see that method for details.
1228 pub async fn file_prepare_upload_with_visibility(
1229 &self,
1230 path: &Path,
1231 visibility: Visibility,
1232 ) -> Result<PreparedUpload> {
1233 self.file_prepare_upload_with_progress(path, visibility, None)
1234 .await
1235 }
1236
1237 /// Phase 1 of external-signer upload with progress events.
1238 ///
1239 /// Requires an EVM network (for contract price queries) but NOT a wallet.
1240 /// Returns a [`PreparedUpload`] containing the data map, prepared chunks,
1241 /// and a [`PaymentIntent`] that the external signer uses to construct
1242 /// and submit the on-chain payment transaction.
1243 ///
1244 /// When `visibility` is [`Visibility::Public`], the serialized `DataMap`
1245 /// is bundled into the payment batch as an additional chunk and its
1246 /// address is recorded on the returned [`PreparedUpload`]. After
1247 /// [`Client::finalize_upload`] (or `_merkle`) succeeds, that address is
1248 /// surfaced via [`FileUploadResult::data_map_address`] so the uploader
1249 /// can share a single address from which anyone can retrieve the file.
1250 ///
1251 /// When `progress` is `Some`, [`UploadEvent`]s are emitted on the channel
1252 /// during encryption ([`UploadEvent::Encrypting`] / [`UploadEvent::Encrypted`])
1253 /// and per-chunk quoting ([`UploadEvent::ChunkQuoted`]). Storage events are
1254 /// emitted later by [`Client::finalize_upload_with_progress`] /
1255 /// [`Client::finalize_upload_merkle_with_progress`].
1256 ///
1257 /// **Memory note:** Encryption uses disk spilling for bounded memory, but
1258 /// the returned [`PreparedUpload`] holds all chunk content in memory (each
1259 /// [`PreparedChunk`] contains a `Bytes` with the full chunk data). This is
1260 /// inherent to the two-phase external-signer protocol — the chunks must
1261 /// stay in memory until [`Client::finalize_upload`] stores them. For very
1262 /// large files, prefer [`Client::file_upload`] which streams directly.
1263 ///
1264 /// # Errors
1265 ///
1266 /// Returns an error if there is insufficient disk space, the file cannot
1267 /// be read, encryption fails, or quote collection fails.
1268 pub async fn file_prepare_upload_with_progress(
1269 &self,
1270 path: &Path,
1271 visibility: Visibility,
1272 progress: Option<mpsc::Sender<UploadEvent>>,
1273 ) -> Result<PreparedUpload> {
1274 debug!(
1275 "Preparing file upload for external signing (visibility={visibility:?}): {}",
1276 path.display()
1277 );
1278
1279 let file_size = std::fs::metadata(path)?.len();
1280 check_disk_space_for_spill(file_size)?;
1281
1282 let (spill, data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
1283
1284 info!(
1285 "Encrypted {} into {} chunks for external signing (spilled to disk)",
1286 path.display(),
1287 spill.len()
1288 );
1289
1290 // Read each chunk from disk and collect quotes concurrently.
1291 // Note: all PreparedChunks accumulate in memory because the external-signer
1292 // protocol requires them for finalize_upload. NOT memory-bounded for large files.
1293 let mut chunk_data: Vec<Bytes> = spill
1294 .addresses
1295 .iter()
1296 .map(|addr| spill.read_chunk(addr))
1297 .collect::<std::result::Result<Vec<_>, _>>()?;
1298
1299 // For public uploads, bundle the serialized DataMap as an extra chunk
1300 // in the same payment batch. This lets the external signer pay for
1301 // the data chunks and the DataMap chunk in one flow, and lets the
1302 // finalize step return the DataMap's chunk address as the shareable
1303 // retrieval address.
1304 let data_map_address = match visibility {
1305 Visibility::Private => None,
1306 Visibility::Public => {
1307 let serialized = rmp_serde::to_vec(&data_map).map_err(|e| {
1308 Error::Serialization(format!("Failed to serialize DataMap: {e}"))
1309 })?;
1310 let bytes = Bytes::from(serialized);
1311 let address = compute_address(&bytes);
1312 info!(
1313 "Public upload: bundling DataMap chunk ({} bytes) at address {}",
1314 bytes.len(),
1315 hex::encode(address)
1316 );
1317 chunk_data.push(bytes);
1318 Some(address)
1319 }
1320 };
1321
1322 let chunk_count = chunk_data.len();
1323
1324 if let Some(ref tx) = progress {
1325 let _ = tx
1326 .send(UploadEvent::Encrypted {
1327 total_chunks: chunk_count,
1328 })
1329 .await;
1330 }
1331
1332 let (payment_info, already_stored_addresses) = if should_use_merkle(
1333 chunk_count,
1334 PaymentMode::Auto,
1335 ) {
1336 // Merkle path: build tree, collect candidate pools, return for external payment.
1337 info!("Using merkle batch preparation for {chunk_count} file chunks");
1338
1339 let chunk_entries: Vec<([u8; 32], u64)> = chunk_data
1340 .iter()
1341 .map(|chunk| {
1342 let size = u64::try_from(chunk.len())
1343 .map_err(|e| Error::InvalidData(format!("chunk size too large: {e}")))?;
1344 Ok((compute_address(chunk), size))
1345 })
1346 .collect::<Result<Vec<_>>>()?;
1347
1348 let merkle_plan = self
1349 .plan_merkle_upload(chunk_entries, DATA_TYPE_CHUNK, progress.as_ref())
1350 .await?;
1351
1352 if merkle_plan.to_upload.is_empty() {
1353 info!("All {chunk_count} file chunks already stored; no external payment needed");
1354 (
1355 ExternalPaymentInfo::WaveBatch {
1356 prepared_chunks: Vec::new(),
1357 payment_intent: PaymentIntent::from_prepared_chunks(&[]),
1358 },
1359 merkle_plan.already_stored,
1360 )
1361 } else {
1362 let chunk_data =
1363 chunk_contents_for_upload_addresses(chunk_data, &merkle_plan.to_upload)?;
1364
1365 if !should_use_merkle(merkle_plan.to_upload.len(), PaymentMode::Auto) {
1366 info!(
1367 "{} file chunks need upload after merkle preflight; preparing wave-batch payment",
1368 merkle_plan.to_upload.len()
1369 );
1370 let (payment_info, mut wave_already_stored) = self
1371 .prepare_wave_batch_external_chunks(
1372 chunk_data,
1373 progress.as_ref(),
1374 chunk_count,
1375 )
1376 .await?;
1377 let mut already_stored = merkle_plan.already_stored;
1378 already_stored.append(&mut wave_already_stored);
1379 (payment_info, already_stored)
1380 } else {
1381 match self
1382 .prepare_merkle_batch_external(
1383 &merkle_plan.to_upload,
1384 DATA_TYPE_CHUNK,
1385 merkle_plan.to_upload_avg_size(),
1386 )
1387 .await
1388 {
1389 Ok(prepared_batch) => {
1390 info!(
1391 "File prepared for external merkle signing: {} chunks, depth={} ({})",
1392 merkle_plan.to_upload.len(),
1393 prepared_batch.depth,
1394 path.display()
1395 );
1396
1397 (
1398 ExternalPaymentInfo::Merkle {
1399 prepared_batch,
1400 chunk_contents: chunk_data,
1401 chunk_addresses: merkle_plan.to_upload,
1402 },
1403 merkle_plan.already_stored,
1404 )
1405 }
1406 Err(Error::InsufficientPeers(ref msg)) => {
1407 info!(
1408 "External merkle preparation needs more peers ({msg}); preparing wave-batch payment"
1409 );
1410 let (payment_info, mut wave_already_stored) = self
1411 .prepare_wave_batch_external_chunks(
1412 chunk_data,
1413 progress.as_ref(),
1414 chunk_count,
1415 )
1416 .await?;
1417 let mut already_stored = merkle_plan.already_stored;
1418 already_stored.append(&mut wave_already_stored);
1419 (payment_info, already_stored)
1420 }
1421 Err(e) => return Err(e),
1422 }
1423 }
1424 }
1425 } else {
1426 self.prepare_wave_batch_external_chunks(chunk_data, progress.as_ref(), chunk_count)
1427 .await?
1428 };
1429
1430 // Surface the "DataMap chunk was already on the network" case
1431 // so debugging "why is data_map_address set but no storage cost
1432 // appears for it?" doesn't require reading the source. See the
1433 // `data_map_address` doc comment for why this is still a valid
1434 // `Some(addr)` outcome.
1435 if let Some(addr) = data_map_address {
1436 let data_map_needs_payment = match &payment_info {
1437 ExternalPaymentInfo::WaveBatch {
1438 prepared_chunks, ..
1439 } => prepared_chunks.iter().any(|c| c.address == addr),
1440 ExternalPaymentInfo::Merkle {
1441 chunk_addresses, ..
1442 } => chunk_addresses.contains(&addr),
1443 };
1444 if !data_map_needs_payment {
1445 info!(
1446 "Public upload: DataMap chunk {} was already stored \
1447 on the network — address is retrievable without a \
1448 new payment",
1449 hex::encode(addr)
1450 );
1451 }
1452 }
1453
1454 Ok(PreparedUpload {
1455 data_map,
1456 payment_info,
1457 data_map_address,
1458 already_stored_addresses,
1459 total_chunks: chunk_count,
1460 })
1461 }
1462
1463 async fn prepare_wave_batch_external_chunks(
1464 &self,
1465 chunk_data: Vec<Bytes>,
1466 progress: Option<&mpsc::Sender<UploadEvent>>,
1467 progress_total: usize,
1468 ) -> Result<(ExternalPaymentInfo, Vec<[u8; 32]>)> {
1469 let chunk_count = chunk_data.len();
1470 let chunks_with_addr: Vec<(Bytes, [u8; 32])> = chunk_data
1471 .into_iter()
1472 .map(|content| {
1473 let address = compute_address(&content);
1474 (content, address)
1475 })
1476 .collect();
1477
1478 // Wave-batch path: collect quotes per chunk concurrently, emitting
1479 // a `ChunkQuoted` event after each completion so callers can drive
1480 // a progress bar through the slow quote phase.
1481 let quote_limiter = self.controller().quote.clone();
1482 let quote_concurrency = quote_limiter.current().min(chunk_count.max(1));
1483 let mut quote_stream = stream::iter(chunks_with_addr)
1484 .map(|(content, address)| {
1485 let limiter = quote_limiter.clone();
1486 async move {
1487 let result = observe_op(
1488 &limiter,
1489 || async move { self.prepare_chunk_payment(content).await },
1490 classify_error,
1491 )
1492 .await;
1493 (address, result)
1494 }
1495 })
1496 .buffer_unordered(quote_concurrency);
1497
1498 let mut prepared_chunks = Vec::with_capacity(chunk_count);
1499 let mut already_stored = Vec::new();
1500 let mut quoted = 0usize;
1501 while let Some((address, result)) = quote_stream.next().await {
1502 match result? {
1503 Some(prepared) => prepared_chunks.push(prepared),
1504 None => already_stored.push(address),
1505 }
1506 quoted += 1;
1507 if let Some(tx) = progress {
1508 let _ = tx.try_send(UploadEvent::ChunkQuoted {
1509 quoted,
1510 total: progress_total,
1511 });
1512 }
1513 }
1514
1515 let payment_intent = PaymentIntent::from_prepared_chunks(&prepared_chunks);
1516 info!(
1517 "Prepared external wave-batch payment: {} chunks, {} already stored, total {} atto",
1518 prepared_chunks.len(),
1519 already_stored.len(),
1520 payment_intent.total_amount,
1521 );
1522
1523 Ok((
1524 ExternalPaymentInfo::WaveBatch {
1525 prepared_chunks,
1526 payment_intent,
1527 },
1528 already_stored,
1529 ))
1530 }
1531
1532 /// Phase 2 of external-signer upload (wave-batch): finalize with externally-signed tx hashes.
1533 ///
1534 /// Takes a [`PreparedUpload`] that used wave-batch payment and a map
1535 /// of `quote_hash -> tx_hash` provided by the external signer after on-chain
1536 /// payment. Builds payment proofs and stores chunks on the network.
1537 ///
1538 /// # Errors
1539 ///
1540 /// Returns an error if the prepared upload used merkle payment (use
1541 /// [`Client::finalize_upload_merkle`] instead), proof construction fails,
1542 /// or any chunk cannot be stored.
1543 pub async fn finalize_upload(
1544 &self,
1545 prepared: PreparedUpload,
1546 tx_hash_map: &HashMap<QuoteHash, TxHash>,
1547 ) -> Result<FileUploadResult> {
1548 self.finalize_upload_with_progress(prepared, tx_hash_map, None)
1549 .await
1550 }
1551
1552 /// Phase 2 of external-signer upload (wave-batch) with progress events.
1553 ///
1554 /// Same as [`Client::finalize_upload`] but emits [`UploadEvent::ChunkStored`]
1555 /// on the provided channel as each chunk is successfully stored.
1556 ///
1557 /// # Errors
1558 ///
1559 /// Same as [`Client::finalize_upload`].
1560 pub async fn finalize_upload_with_progress(
1561 &self,
1562 prepared: PreparedUpload,
1563 tx_hash_map: &HashMap<QuoteHash, TxHash>,
1564 progress: Option<mpsc::Sender<UploadEvent>>,
1565 ) -> Result<FileUploadResult> {
1566 let data_map_address = prepared.data_map_address;
1567 let already_stored_addresses = prepared.already_stored_addresses;
1568 let already_stored_count = already_stored_addresses.len();
1569 let total_chunks = prepared.total_chunks;
1570 match prepared.payment_info {
1571 ExternalPaymentInfo::WaveBatch {
1572 prepared_chunks,
1573 payment_intent,
1574 } => {
1575 let paid_chunks = finalize_batch_payment(prepared_chunks, tx_hash_map)?;
1576 let wave_result = self
1577 .store_paid_chunks_with_events(
1578 paid_chunks,
1579 progress.as_ref(),
1580 already_stored_count,
1581 total_chunks,
1582 )
1583 .await;
1584 if !wave_result.failed.is_empty() {
1585 let failed_count = wave_result.failed.len();
1586 let stored_count = already_stored_count + wave_result.stored.len();
1587 let mut stored = already_stored_addresses;
1588 stored.extend(wave_result.stored);
1589 return Err(Error::PartialUpload {
1590 stored,
1591 stored_count,
1592 failed: wave_result.failed,
1593 failed_count,
1594 total_chunks,
1595 // Report the storage spend known from the payment intent
1596 // the external signer was handed. Gas is paid by the
1597 // signer out-of-band, so it stays unknown (0).
1598 spend: Box::new(PartialUploadSpend {
1599 storage_cost_atto: payment_intent.total_amount.to_string(),
1600 gas_cost_wei: 0,
1601 }),
1602 reason: "finalize_upload: chunk storage failed after retries".into(),
1603 });
1604 }
1605 let chunks_stored = already_stored_count + wave_result.stored.len();
1606
1607 info!("External-signer upload finalized: {chunks_stored} chunks stored");
1608
1609 let mut stats = WaveAggregateStats::default();
1610 stats.absorb(&wave_result);
1611
1612 Ok(FileUploadResult {
1613 data_map: prepared.data_map,
1614 chunks_stored,
1615 chunks_failed: 0,
1616 total_chunks,
1617 payment_mode_used: PaymentMode::Single,
1618 // Storage spend is known from the payment intent; gas is
1619 // paid by the external signer out-of-band (unknown here).
1620 storage_cost_atto: payment_intent.total_amount.to_string(),
1621 gas_cost_wei: 0,
1622 data_map_address,
1623 chunk_attempts_total: stats.chunk_attempts_total,
1624 store_durations_ms: stats.store_durations_ms,
1625 retries_histogram: stats.retries_histogram,
1626 })
1627 }
1628 ExternalPaymentInfo::Merkle { .. } => Err(Error::Payment(
1629 "Cannot finalize merkle upload with wave-batch tx hashes. \
1630 Use finalize_upload_merkle() instead."
1631 .to_string(),
1632 )),
1633 }
1634 }
1635
1636 /// Phase 2 of external-signer upload (merkle): finalize with winner pool hash.
1637 ///
1638 /// Takes a [`PreparedUpload`] that used merkle payment and the `winner_pool_hash`
1639 /// returned by the on-chain merkle payment transaction. Generates proofs and
1640 /// stores chunks on the network.
1641 ///
1642 /// # Errors
1643 ///
1644 /// Returns an error if the prepared upload used wave-batch payment (use
1645 /// [`Client::finalize_upload`] instead), proof generation fails,
1646 /// or any chunk cannot be stored.
1647 pub async fn finalize_upload_merkle(
1648 &self,
1649 prepared: PreparedUpload,
1650 winner_pool_hash: [u8; 32],
1651 ) -> Result<FileUploadResult> {
1652 self.finalize_upload_merkle_with_progress(prepared, winner_pool_hash, None)
1653 .await
1654 }
1655
1656 /// Phase 2 of external-signer upload (merkle) with progress events.
1657 ///
1658 /// Same as [`Client::finalize_upload_merkle`] but emits [`UploadEvent::ChunkStored`]
1659 /// on the provided channel as each chunk is successfully stored.
1660 ///
1661 /// # Errors
1662 ///
1663 /// Same as [`Client::finalize_upload_merkle`].
1664 pub async fn finalize_upload_merkle_with_progress(
1665 &self,
1666 prepared: PreparedUpload,
1667 winner_pool_hash: [u8; 32],
1668 progress: Option<mpsc::Sender<UploadEvent>>,
1669 ) -> Result<FileUploadResult> {
1670 let data_map_address = prepared.data_map_address;
1671 let already_stored_count = prepared.already_stored_addresses.len();
1672 let total_chunks = prepared.total_chunks;
1673 match prepared.payment_info {
1674 ExternalPaymentInfo::Merkle {
1675 prepared_batch,
1676 chunk_contents,
1677 chunk_addresses,
1678 } => {
1679 let batch_result = finalize_merkle_batch(prepared_batch, winner_pool_hash)?;
1680 let outcome = self
1681 .merkle_upload_chunks(
1682 chunk_contents,
1683 chunk_addresses,
1684 &batch_result,
1685 progress.as_ref(),
1686 already_stored_count,
1687 total_chunks,
1688 )
1689 .await?;
1690
1691 info!(
1692 "External-signer merkle upload finalized: {} chunks stored, {} failed",
1693 outcome.stored, outcome.failed
1694 );
1695
1696 Ok(FileUploadResult {
1697 data_map: prepared.data_map,
1698 chunks_stored: outcome.stored,
1699 chunks_failed: outcome.failed,
1700 total_chunks,
1701 payment_mode_used: PaymentMode::Merkle,
1702 storage_cost_atto: "0".into(),
1703 gas_cost_wei: 0,
1704 data_map_address,
1705 chunk_attempts_total: outcome.stats.chunk_attempts_total,
1706 store_durations_ms: outcome.stats.store_durations_ms,
1707 retries_histogram: outcome.stats.retries_histogram,
1708 })
1709 }
1710 ExternalPaymentInfo::WaveBatch { .. } => Err(Error::Payment(
1711 "Cannot finalize wave-batch upload with merkle winner hash. \
1712 Use finalize_upload() instead."
1713 .to_string(),
1714 )),
1715 }
1716 }
1717
1718 /// Upload a file with a specific payment mode.
1719 ///
1720 /// Before encryption, checks that the temp directory has enough free
1721 /// disk space for the spilled chunks (~1.1× source file size).
1722 ///
1723 /// Encrypted chunks are spilled to a temp directory during encryption
1724 /// so that only their 32-byte addresses stay in memory. At upload time,
1725 /// chunks are read back one wave at a time (~64 × 4 MB ≈ 256 MB peak).
1726 ///
1727 /// # Errors
1728 ///
1729 /// Returns an error if there is insufficient disk space, the file cannot
1730 /// be read, encryption fails, or any chunk cannot be stored.
1731 #[allow(clippy::too_many_lines)]
1732 pub async fn file_upload_with_mode(
1733 &self,
1734 path: &Path,
1735 mode: PaymentMode,
1736 ) -> Result<FileUploadResult> {
1737 self.file_upload_with_progress(path, mode, None).await
1738 }
1739
1740 /// Upload a file publicly, storing the serialized [`DataMap`] as part of
1741 /// the same upload payment batch.
1742 ///
1743 /// The returned [`FileUploadResult::data_map_address`] can be shared for
1744 /// public downloads via [`Client::data_map_fetch`].
1745 #[allow(clippy::too_many_lines)]
1746 pub async fn file_upload_public_with_mode(
1747 &self,
1748 path: &Path,
1749 mode: PaymentMode,
1750 ) -> Result<FileUploadResult> {
1751 self.file_upload_with_visibility_and_progress(path, mode, Visibility::Public, None)
1752 .await
1753 }
1754
1755 /// Upload a file with progress events sent to the given channel.
1756 ///
1757 /// Same as [`Client::file_upload_with_mode`] but sends [`UploadEvent`]s to the
1758 /// provided channel for UI progress feedback.
1759 #[allow(clippy::too_many_lines)]
1760 pub async fn file_upload_with_progress(
1761 &self,
1762 path: &Path,
1763 mode: PaymentMode,
1764 progress: Option<mpsc::Sender<UploadEvent>>,
1765 ) -> Result<FileUploadResult> {
1766 self.file_upload_with_visibility_and_progress(path, mode, Visibility::Private, progress)
1767 .await
1768 }
1769
1770 /// Public file upload with progress events.
1771 ///
1772 /// Same as [`Client::file_upload_public_with_mode`] but sends
1773 /// [`UploadEvent`]s to the provided channel for UI progress feedback.
1774 #[allow(clippy::too_many_lines)]
1775 pub async fn file_upload_public_with_progress(
1776 &self,
1777 path: &Path,
1778 mode: PaymentMode,
1779 progress: Option<mpsc::Sender<UploadEvent>>,
1780 ) -> Result<FileUploadResult> {
1781 self.file_upload_with_visibility_and_progress(path, mode, Visibility::Public, progress)
1782 .await
1783 }
1784
1785 #[allow(clippy::too_many_lines)]
1786 async fn file_upload_with_visibility_and_progress(
1787 &self,
1788 path: &Path,
1789 mode: PaymentMode,
1790 visibility: Visibility,
1791 progress: Option<mpsc::Sender<UploadEvent>>,
1792 ) -> Result<FileUploadResult> {
1793 debug!(
1794 "Streaming file upload with mode {mode:?}, visibility {visibility:?}: {}",
1795 path.display()
1796 );
1797
1798 // Pre-flight: verify enough temp disk space for the chunk spill.
1799 let file_size = std::fs::metadata(path)?.len();
1800 check_disk_space_for_spill(file_size)?;
1801
1802 // Phase 1: Encrypt file and spill chunks to temp directory.
1803 // Only 32-byte addresses stay in memory — chunk data lives on disk.
1804 let (mut spill, data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
1805
1806 let data_map_address = match visibility {
1807 Visibility::Private => None,
1808 Visibility::Public => {
1809 let serialized = rmp_serde::to_vec(&data_map).map_err(|e| {
1810 Error::Serialization(format!("Failed to serialize DataMap: {e}"))
1811 })?;
1812 let address = compute_address(&serialized);
1813 info!(
1814 "Public upload: adding DataMap chunk ({} bytes) at address {} to payment batch",
1815 serialized.len(),
1816 hex::encode(address)
1817 );
1818 spill.push(&serialized)?;
1819 Some(address)
1820 }
1821 };
1822
1823 let chunk_count = spill.len();
1824 info!(
1825 "Encrypted {} into {chunk_count} chunks (spilled to disk)",
1826 path.display()
1827 );
1828 if let Some(ref tx) = progress {
1829 let _ = tx
1830 .send(UploadEvent::Encrypted {
1831 total_chunks: chunk_count,
1832 })
1833 .await;
1834 }
1835
1836 // Phase 2: Decide payment mode and upload in waves from disk.
1837 //
1838 // For the merkle path, attempt to resume from a cached
1839 // receipt before paying again. The cache is keyed by the
1840 // CANONICAL source path so `./foo`, `/abs/foo`, and any
1841 // symlink alias all resolve to the same cache entry — a
1842 // crash-and-retry from a different cwd or via a different
1843 // alias still hits the receipt. Canonicalize may fail (the
1844 // file could have been moved between phase 1 and here); we
1845 // fall back to the display string in that case, which
1846 // preserves pre-fix behaviour rather than dropping cache
1847 // resume entirely.
1848 let file_path_key = std::fs::canonicalize(path)
1849 .map(|p| p.display().to_string())
1850 .unwrap_or_else(|_| path.display().to_string());
1851 let (chunks_stored, actual_mode, storage_cost_atto, gas_cost_wei, stats) = if self
1852 .should_use_merkle(chunk_count, mode)
1853 {
1854 info!("Using merkle batch payment for {chunk_count} file chunks");
1855
1856 let cached_merkle =
1857 crate::data::client::cached_merkle::try_load_for_file(&file_path_key)
1858 .map(|(_cache_path, cached)| cached);
1859
1860 let merkle_plan = match self
1861 .plan_merkle_upload(spill.chunk_entries()?, DATA_TYPE_CHUNK, progress.as_ref())
1862 .await
1863 {
1864 Ok(plan) => plan,
1865 Err(e) => {
1866 if let Some(cached) = cached_merkle
1867 .as_ref()
1868 .filter(|cached| cached_merkle_covers_addresses(cached, &spill.addresses))
1869 {
1870 info!(
1871 "Merkle preflight failed ({e}); \
1872 resuming with cached merkle proofs"
1873 );
1874 let (stored, sc, gc, stats) = self
1875 .upload_waves_merkle(
1876 &spill,
1877 &spill.addresses,
1878 cached,
1879 &[],
1880 progress.as_ref(),
1881 )
1882 .await?;
1883 crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
1884 return Ok(FileUploadResult {
1885 data_map,
1886 chunks_stored: stored,
1887 chunks_failed: 0,
1888 total_chunks: chunk_count,
1889 payment_mode_used: PaymentMode::Merkle,
1890 storage_cost_atto: sc,
1891 gas_cost_wei: gc,
1892 data_map_address,
1893 chunk_attempts_total: stats.chunk_attempts_total,
1894 store_durations_ms: stats.store_durations_ms,
1895 retries_histogram: stats.retries_histogram,
1896 });
1897 }
1898 match &e {
1899 Error::InsufficientPeers(msg) if mode == PaymentMode::Auto => {
1900 info!(
1901 "Merkle preflight needs more peers ({msg}), \
1902 falling back to wave-batch"
1903 );
1904 let (stored, sc, gc, fb_stats) = self
1905 .upload_waves_single(
1906 &spill,
1907 progress.as_ref(),
1908 Some(&file_path_key),
1909 )
1910 .await?;
1911 crate::data::client::cached_single::try_delete_for_file(&file_path_key);
1912 return Ok(FileUploadResult {
1913 data_map,
1914 chunks_stored: stored,
1915 chunks_failed: 0,
1916 total_chunks: chunk_count,
1917 payment_mode_used: PaymentMode::Single,
1918 storage_cost_atto: sc,
1919 gas_cost_wei: gc,
1920 data_map_address,
1921 chunk_attempts_total: fb_stats.chunk_attempts_total,
1922 store_durations_ms: fb_stats.store_durations_ms,
1923 retries_histogram: fb_stats.retries_histogram,
1924 });
1925 }
1926 _ => return Err(e),
1927 }
1928 }
1929 };
1930
1931 if merkle_plan.to_upload.is_empty() {
1932 info!("All {chunk_count} merkle chunks already stored; skipping payment");
1933 crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
1934 crate::data::client::cached_single::try_delete_for_file(&file_path_key);
1935 (
1936 chunk_count,
1937 PaymentMode::Merkle,
1938 "0".to_string(),
1939 0,
1940 WaveAggregateStats::default(),
1941 )
1942 } else if !self.should_use_merkle(merkle_plan.to_upload.len(), mode) {
1943 let remaining_chunks = merkle_plan.to_upload.len();
1944 if let Some(cached) = cached_merkle
1945 .as_ref()
1946 .filter(|cached| cached_merkle_covers_addresses(cached, &merkle_plan.to_upload))
1947 {
1948 info!(
1949 "{remaining_chunks} chunks remain below merkle threshold; \
1950 reusing cached merkle proofs"
1951 );
1952 let (stored, sc, gc, stats) = self
1953 .upload_waves_merkle(
1954 &spill,
1955 &merkle_plan.to_upload,
1956 cached,
1957 &merkle_plan.already_stored,
1958 progress.as_ref(),
1959 )
1960 .await?;
1961 crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
1962 (stored, PaymentMode::Merkle, sc, gc, stats)
1963 } else {
1964 if cached_merkle.is_some() {
1965 info!(
1966 "{remaining_chunks} chunks remain below merkle threshold, \
1967 and the cached merkle receipt does not cover them. \
1968 Discarding cache and using single-node payment."
1969 );
1970 crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
1971 } else {
1972 info!(
1973 "{remaining_chunks} chunks need upload after merkle preflight; \
1974 using single-node payment"
1975 );
1976 }
1977 let (stored, sc, gc, stats) = self
1978 .upload_spill_addresses_single(
1979 &spill,
1980 &merkle_plan.to_upload,
1981 progress.as_ref(),
1982 &merkle_plan.already_stored,
1983 chunk_count,
1984 Some(&file_path_key),
1985 )
1986 .await?;
1987 crate::data::client::cached_single::try_delete_for_file(&file_path_key);
1988 (stored, PaymentMode::Single, sc, gc, stats)
1989 }
1990 } else {
1991 let batch_result = if let Some(cached) = cached_merkle.as_ref() {
1992 // Validate the cache against the chunks that still need
1993 // storage. Extra proofs are harmless: a previous attempt
1994 // may have paid for chunks that are now already stored.
1995 if cached_merkle_covers_addresses(cached, &merkle_plan.to_upload) {
1996 info!(
1997 "Skipping merkle payment phase; resuming with \
1998 cached proofs for {} remaining chunks",
1999 merkle_plan.to_upload.len()
2000 );
2001 Ok(cached.clone())
2002 } else {
2003 info!(
2004 "Cached merkle receipt does not cover the current \
2005 remaining chunks (cached={}, remaining={}). \
2006 Discarding cache and paying fresh.",
2007 cached.proofs.len(),
2008 merkle_plan.to_upload.len()
2009 );
2010 crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
2011 self.pay_for_merkle_batch(
2012 &merkle_plan.to_upload,
2013 DATA_TYPE_CHUNK,
2014 merkle_plan.to_upload_avg_size(),
2015 )
2016 .await
2017 .inspect(|result| {
2018 crate::data::client::cached_merkle::try_save(&file_path_key, result);
2019 })
2020 }
2021 } else {
2022 self.pay_for_merkle_batch(
2023 &merkle_plan.to_upload,
2024 DATA_TYPE_CHUNK,
2025 merkle_plan.to_upload_avg_size(),
2026 )
2027 .await
2028 .inspect(|result| {
2029 // Save BEFORE the store phase so a crash
2030 // mid-upload leaves a resumable receipt.
2031 crate::data::client::cached_merkle::try_save(&file_path_key, result);
2032 })
2033 };
2034
2035 let batch_result = match batch_result {
2036 Ok(result) => result,
2037 Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => {
2038 info!("Merkle needs more peers ({msg}), falling back to wave-batch");
2039 let (stored, sc, gc, fb_stats) = self
2040 .upload_spill_addresses_single(
2041 &spill,
2042 &merkle_plan.to_upload,
2043 progress.as_ref(),
2044 &merkle_plan.already_stored,
2045 chunk_count,
2046 Some(&file_path_key),
2047 )
2048 .await?;
2049 crate::data::client::cached_single::try_delete_for_file(&file_path_key);
2050 return Ok(FileUploadResult {
2051 data_map,
2052 chunks_stored: stored,
2053 chunks_failed: 0,
2054 total_chunks: chunk_count,
2055 payment_mode_used: PaymentMode::Single,
2056 storage_cost_atto: sc,
2057 gas_cost_wei: gc,
2058 data_map_address,
2059 chunk_attempts_total: fb_stats.chunk_attempts_total,
2060 store_durations_ms: fb_stats.store_durations_ms,
2061 retries_histogram: fb_stats.retries_histogram,
2062 });
2063 }
2064 Err(e) => return Err(e),
2065 };
2066
2067 let (stored, sc, gc, stats) = self
2068 .upload_waves_merkle(
2069 &spill,
2070 &merkle_plan.to_upload,
2071 &batch_result,
2072 &merkle_plan.already_stored,
2073 progress.as_ref(),
2074 )
2075 .await?;
2076 // Upload succeeded end-to-end; the cached receipt is
2077 // no longer needed.
2078 crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
2079 (stored, PaymentMode::Merkle, sc, gc, stats)
2080 }
2081 } else {
2082 let (stored, sc, gc, stats) = self
2083 .upload_waves_single(&spill, progress.as_ref(), Some(&file_path_key))
2084 .await?;
2085 // Full file success: drop any cached single-node receipt.
2086 crate::data::client::cached_single::try_delete_for_file(&file_path_key);
2087 (stored, PaymentMode::Single, sc, gc, stats)
2088 };
2089
2090 info!(
2091 "File uploaded with {actual_mode:?}: {chunks_stored} chunks stored ({})",
2092 path.display()
2093 );
2094
2095 Ok(FileUploadResult {
2096 data_map,
2097 chunks_stored,
2098 chunks_failed: 0,
2099 total_chunks: chunk_count,
2100 payment_mode_used: actual_mode,
2101 storage_cost_atto,
2102 gas_cost_wei,
2103 data_map_address,
2104 chunk_attempts_total: stats.chunk_attempts_total,
2105 store_durations_ms: stats.store_durations_ms,
2106 retries_histogram: stats.retries_histogram,
2107 })
2108 }
2109
2110 /// Encrypt a file and spill chunks to a temp directory.
2111 ///
2112 /// Logs progress every 100 chunks so users get feedback during
2113 /// multi-GB encryptions.
2114 ///
2115 /// Returns the spill buffer (addresses on disk) and the `DataMap`.
2116 async fn encrypt_file_to_spill(
2117 &self,
2118 path: &Path,
2119 progress: Option<&mpsc::Sender<UploadEvent>>,
2120 ) -> Result<(ChunkSpill, DataMap)> {
2121 let (mut chunk_rx, datamap_rx, handle) = spawn_file_encryption(path.to_path_buf())?;
2122
2123 let mut spill = ChunkSpill::new()?;
2124 while let Some(content) = chunk_rx.recv().await {
2125 spill.push(&content)?;
2126 let chunks_done = spill.len();
2127 if let Some(tx) = progress {
2128 if chunks_done.is_multiple_of(10) {
2129 let _ = tx.send(UploadEvent::Encrypting { chunks_done }).await;
2130 }
2131 }
2132 if chunks_done % 100 == 0 {
2133 let mb = spill.total_bytes() / (1024 * 1024);
2134 info!(
2135 "Encryption progress: {chunks_done} chunks spilled ({mb} MB) — {}",
2136 path.display()
2137 );
2138 }
2139 }
2140
2141 // Await encryption completion to catch errors before paying.
2142 handle
2143 .await
2144 .map_err(|e| Error::Encryption(format!("encryption task panicked: {e}")))?
2145 .map_err(|e| Error::Encryption(format!("encryption failed: {e}")))?;
2146
2147 let data_map = datamap_rx
2148 .await
2149 .map_err(|_| Error::Encryption("no DataMap from encryption thread".to_string()))?;
2150
2151 Ok((spill, data_map))
2152 }
2153
2154 /// Upload chunks from a spill using wave-based per-chunk (single) payments.
2155 ///
2156 /// Reads one wave at a time from disk, prepares quotes, pays, and stores.
2157 /// Peak memory: ~`UPLOAD_WAVE_SIZE × MAX_CHUNK_SIZE` (~256 MB).
2158 ///
2159 /// Returns `(chunks_stored, storage_cost_atto, gas_cost_wei)`.
2160 async fn upload_waves_single(
2161 &self,
2162 spill: &ChunkSpill,
2163 progress: Option<&mpsc::Sender<UploadEvent>>,
2164 resume_key: Option<&str>,
2165 ) -> Result<(usize, String, u128, WaveAggregateStats)> {
2166 self.upload_spill_addresses_single(
2167 spill,
2168 &spill.addresses,
2169 progress,
2170 &[],
2171 spill.len(),
2172 resume_key,
2173 )
2174 .await
2175 }
2176
2177 async fn upload_spill_addresses_single(
2178 &self,
2179 spill: &ChunkSpill,
2180 addresses: &[[u8; 32]],
2181 progress: Option<&mpsc::Sender<UploadEvent>>,
2182 already_stored_addresses: &[[u8; 32]],
2183 total_chunks: usize,
2184 resume_key: Option<&str>,
2185 ) -> Result<(usize, String, u128, WaveAggregateStats)> {
2186 let mut total_stored = already_stored_addresses.len();
2187 let mut total_storage = Amount::ZERO;
2188 let mut total_gas: u128 = 0;
2189 let mut agg_stats = WaveAggregateStats::default();
2190 // A wave whose chunks fall short of quorum after retries must not abort
2191 // the file: its failures are accumulated here and surfaced as a single
2192 // `PartialUpload` only after every wave has been attempted, mirroring
2193 // `upload_waves_merkle`. Aborting on the first failed wave (the old `?`)
2194 // discarded all later waves' progress — already self-encrypted, spilled,
2195 // and in some cases already paid for — converting high per-chunk success
2196 // into 0% per-file success.
2197 // Seed with the addresses a preflight already confirmed stored (e.g.
2198 // the merkle-fallback path passes `merkle_plan.already_stored`), so a
2199 // returned `PartialUpload.stored` lists every stored chunk and
2200 // `stored_count == stored.len()` holds for programmatic callers.
2201 let mut stored_addresses: Vec<[u8; 32]> = already_stored_addresses.to_vec();
2202 let mut failed: Vec<([u8; 32], String)> = Vec::new();
2203 let waves: Vec<&[[u8; 32]]> = addresses.chunks(UPLOAD_WAVE_SIZE).collect();
2204 let wave_count = waves.len();
2205
2206 // Unconditional breadcrumb: lets a clean run confirm the continue-on-
2207 // partial single-node path is in effect (the old path aborted the file
2208 // on the first failed wave instead of continuing across all waves).
2209 info!(
2210 "single-node upload: {} chunk(s) in {wave_count} wave(s) (continue-on-partial)",
2211 addresses.len()
2212 );
2213
2214 for (wave_idx, wave_addrs) in waves.into_iter().enumerate() {
2215 let wave_num = wave_idx + 1;
2216 let wave_data: Vec<Bytes> = wave_addrs
2217 .iter()
2218 .map(|addr| spill.read_chunk(addr))
2219 .collect::<Result<Vec<_>>>()?;
2220
2221 info!(
2222 "Wave {wave_num}/{wave_count}: quoting {} chunks — {total_stored}/{total_chunks} stored so far",
2223 wave_data.len()
2224 );
2225 if let Some(tx) = progress {
2226 let _ = tx
2227 .send(UploadEvent::QuotingChunks {
2228 wave: wave_num,
2229 total_waves: wave_count,
2230 chunks_in_wave: wave_data.len(),
2231 })
2232 .await;
2233 }
2234 // Fold this wave's result. A quorum shortfall (`PartialUpload`) is
2235 // recoverable and its parts are returned to be recorded here;
2236 // genuinely fatal errors propagate via `?` and abort the file, as in
2237 // `upload_waves_merkle`.
2238 let outcome = fold_single_wave(
2239 self.batch_upload_chunks_with_events(
2240 wave_data,
2241 progress,
2242 total_stored,
2243 total_chunks,
2244 resume_key,
2245 )
2246 .await,
2247 )?;
2248
2249 if !outcome.failed.is_empty() {
2250 warn!(
2251 "Wave {wave_num}/{wave_count}: {} chunk(s) failed to store after retries; \
2252 continuing with remaining waves",
2253 outcome.failed.len()
2254 );
2255 }
2256
2257 total_stored += outcome.stored.len();
2258 stored_addresses.extend(outcome.stored);
2259 failed.extend(outcome.failed);
2260 total_storage += outcome.storage_atto;
2261 total_gas = total_gas.saturating_add(outcome.gas_wei);
2262 // Merge per-wave stats (a quorum-short wave contributes none, since
2263 // `PartialUpload` carries no stats).
2264 agg_stats.chunk_attempts_total = agg_stats
2265 .chunk_attempts_total
2266 .saturating_add(outcome.stats.chunk_attempts_total);
2267 agg_stats
2268 .store_durations_ms
2269 .extend(outcome.stats.store_durations_ms);
2270 for (slot, count) in agg_stats
2271 .retries_histogram
2272 .iter_mut()
2273 .zip(outcome.stats.retries_histogram.iter())
2274 {
2275 *slot = slot.saturating_add(*count);
2276 }
2277
2278 if let Some(tx) = progress {
2279 let _ = tx
2280 .send(UploadEvent::WaveComplete {
2281 wave: wave_num,
2282 total_waves: wave_count,
2283 stored_so_far: total_stored,
2284 total: total_chunks,
2285 })
2286 .await;
2287 }
2288 }
2289
2290 // Any chunk still failed after every wave was attempted means the file
2291 // is not fully stored — surface it as `PartialUpload` (never silently
2292 // succeed with missing chunks), carrying the real on-chain spend.
2293 if !failed.is_empty() {
2294 let failed_count = failed.len();
2295 warn!(
2296 "single-node upload incomplete: {failed_count}/{total_chunks} chunks failed after retries"
2297 );
2298 return Err(Error::PartialUpload {
2299 stored: stored_addresses,
2300 stored_count: total_stored,
2301 failed,
2302 failed_count,
2303 total_chunks,
2304 spend: Box::new(PartialUploadSpend {
2305 storage_cost_atto: total_storage.to_string(),
2306 gas_cost_wei: total_gas,
2307 }),
2308 reason: format!("{failed_count} chunk(s) failed to store after retries"),
2309 });
2310 }
2311
2312 Ok((
2313 total_stored,
2314 total_storage.to_string(),
2315 total_gas,
2316 agg_stats,
2317 ))
2318 }
2319
2320 /// Upload chunks from a spill using pre-computed merkle proofs.
2321 ///
2322 /// Reads one wave at a time from disk, pairs each chunk with its proof,
2323 /// and uploads concurrently. Peak memory: ~`UPLOAD_WAVE_SIZE × MAX_CHUNK_SIZE`.
2324 ///
2325 /// A chunk that is transiently short of quorum (`InsufficientPeers`) does
2326 /// **not** abort the file, nor does it block the pipeline: each wave is
2327 /// stored in a **single pass** (no in-wave backoff barrier), and chunks
2328 /// short of quorum are collected into a file-level deferred set rather than
2329 /// retried in place. After the last wave, [`merkle_deferred_retry`] retries
2330 /// the whole deferred set in concurrent rounds ([`DEFERRED_ROUND_DELAYS_SECS`]
2331 /// delays), re-reading each chunk's body from the spill and reusing its
2332 /// proof. This keeps every wave running at full fan-out instead of parking
2333 /// idle slots behind one slow chunk's backoff, while peak memory stays
2334 /// bounded (bodies are re-read from disk, never pinned). Non-quorum errors
2335 /// (e.g. a missing proof) stay fatal and abort immediately.
2336 ///
2337 /// Returns `(chunks_stored, storage_cost_atto, gas_cost_wei)` on success.
2338 /// Costs come from the `batch_result` which was populated during payment.
2339 ///
2340 /// # Errors
2341 ///
2342 /// Returns [`Error::PartialUpload`] if any chunk is still short of quorum
2343 /// after all retries across every wave (other chunks remain stored), or the
2344 /// underlying error for a non-quorum failure.
2345 async fn upload_waves_merkle(
2346 &self,
2347 spill: &ChunkSpill,
2348 addresses: &[[u8; 32]],
2349 batch_result: &MerkleBatchPaymentResult,
2350 already_stored_addresses: &[[u8; 32]],
2351 progress: Option<&mpsc::Sender<UploadEvent>>,
2352 ) -> Result<(usize, String, u128, WaveAggregateStats)> {
2353 let mut total_stored = already_stored_addresses.len();
2354 let total_chunks = total_stored + addresses.len();
2355 let mut stored_addresses: Vec<[u8; 32]> = already_stored_addresses.to_vec();
2356 let mut failed: Vec<([u8; 32], String)> = Vec::new();
2357 // Chunks short of quorum on their single wave pass are collected here and
2358 // retried after the last wave (see `merkle_deferred_retry`), so a slow
2359 // chunk never holds its wave's other slots idle behind a backoff.
2360 let mut deferred: Vec<([u8; 32], String)> = Vec::new();
2361 let mut agg_stats = WaveAggregateStats::default();
2362
2363 // Chunks without a merkle proof were never paid for: a partial
2364 // `pay_for_merkle_multi_batch` result carries proofs only for the
2365 // sub-batches whose on-chain payment succeeded. Such a chunk cannot be
2366 // stored, so record it as failed (surfaced via `PartialUpload` once the
2367 // storable chunks have been attempted) rather than letting its
2368 // "missing proof" error abort the whole file and discard every other
2369 // chunk's progress.
2370 let (to_store, missing_proof) =
2371 partition_addresses_by_proof(addresses, &batch_result.proofs);
2372 if !missing_proof.is_empty() {
2373 warn!(
2374 "{} chunk(s) lack a merkle proof (partial payment); reporting them as failed",
2375 missing_proof.len()
2376 );
2377 for addr in &missing_proof {
2378 failed.push((
2379 *addr,
2380 format!("Missing merkle proof for chunk {}", hex::encode(addr)),
2381 ));
2382 }
2383 }
2384
2385 let waves: Vec<&[[u8; 32]]> = to_store.chunks(UPLOAD_WAVE_SIZE).collect();
2386 let wave_count = waves.len();
2387
2388 let store_limiter = self.controller().store.clone();
2389
2390 // Store one chunk to its (freshly re-collected) close group, reusing the
2391 // chunk's merkle proof. Shared across every retry round so a converged
2392 // routing table yields a fresh group. Only `InsufficientPeers` is
2393 // recoverable; a missing proof stays fatal. Mirrors the external-signer
2394 // path's closure in `merkle_upload_chunks`.
2395 let store_one = |addr: [u8; 32], content: Bytes| {
2396 let limiter = store_limiter.clone();
2397 let proof_bytes = batch_result.proofs.get(&addr).cloned();
2398 async move {
2399 let started = std::time::Instant::now();
2400 let proof = proof_bytes.ok_or_else(|| {
2401 Error::Payment(format!(
2402 "Missing merkle proof for chunk {}",
2403 hex::encode(addr)
2404 ))
2405 })?;
2406 let peers = self.close_group_peers(&addr).await?;
2407 observe_op(
2408 &limiter,
2409 || async move { self.chunk_put_to_close_group(content, proof, &peers).await },
2410 classify_error,
2411 )
2412 .await
2413 .map(|_| started)
2414 }
2415 };
2416
2417 for (wave_idx, wave_addrs) in waves.into_iter().enumerate() {
2418 let wave_num = wave_idx + 1;
2419 let wave = spill.read_wave(wave_addrs)?;
2420
2421 info!(
2422 "Wave {wave_num}/{wave_count}: storing {} chunks (merkle) — {total_stored}/{total_chunks} stored so far",
2423 wave.len()
2424 );
2425
2426 // Clamp fan-out to wave size — partial last wave should
2427 // not pay for extra slots (see PERF-RESULTS.md).
2428 let store_concurrency = store_limiter.current().min(wave.len().max(1));
2429 let chunks: Vec<([u8; 32], Bytes)> = wave
2430 .into_iter()
2431 .map(|(content, addr)| (addr, content))
2432 .collect();
2433
2434 // Store the wave in a SINGLE pass (`max_attempts = 1`, no backoff):
2435 // quorum-short chunks are collected and deferred to a post-wave
2436 // concurrent retry rather than parking this wave's other slots
2437 // behind a backoff. `stored_offset` is the running cumulative count
2438 // so the progress events the driver emits stay correctly numbered
2439 // across waves.
2440 let outcome = merkle_store_with_retry(
2441 chunks,
2442 store_concurrency,
2443 1,
2444 std::time::Duration::ZERO,
2445 progress,
2446 total_stored,
2447 total_chunks,
2448 &store_one,
2449 )
2450 .await?;
2451
2452 // Record this wave's confirmed stores from the explicit set the
2453 // store helper reports. Using that set (rather than inferring
2454 // "wave chunks minus failed") keeps `stored_addresses` correct even
2455 // when a fatal abort leaves some of the wave neither stored nor
2456 // reported short of quorum.
2457 stored_addresses.extend(&outcome.stored_addresses);
2458 total_stored = outcome.stored;
2459
2460 // Merge per-wave stats (durations, attempts, per-round histogram).
2461 agg_stats.chunk_attempts_total = agg_stats
2462 .chunk_attempts_total
2463 .saturating_add(outcome.stats.chunk_attempts_total);
2464 agg_stats
2465 .store_durations_ms
2466 .extend(outcome.stats.store_durations_ms);
2467 for (slot, count) in agg_stats
2468 .retries_histogram
2469 .iter_mut()
2470 .zip(outcome.stats.retries_histogram.iter())
2471 {
2472 *slot = slot.saturating_add(*count);
2473 }
2474
2475 if let Some(e) = outcome.fatal {
2476 // A non-quorum store error is fatal (missing proofs were
2477 // filtered out above, so this is a genuine network/store
2478 // failure). Preserve every chunk stored so far — including this
2479 // wave's same-pass successes — and report every not-stored chunk
2480 // as failed, so the `PartialUpload` counts are accurate rather
2481 // than omitting same-wave stores and under-counting failures.
2482 warn!("merkle wave {wave_num}/{wave_count} aborted: {e}");
2483 // Best per-chunk messages we already have: missing-proof, this
2484 // wave's shortfalls, and earlier waves' deferred shortfalls.
2485 let mut known_failed = failed;
2486 known_failed.extend(outcome.failed_addresses);
2487 known_failed.extend(std::mem::take(&mut deferred));
2488 return Err(partial_upload_after_fatal(
2489 addresses,
2490 stored_addresses,
2491 total_stored,
2492 total_chunks,
2493 known_failed,
2494 PartialUploadSpend {
2495 storage_cost_atto: batch_result.storage_cost_atto.clone(),
2496 gas_cost_wei: batch_result.gas_cost_wei,
2497 },
2498 format!("merkle chunk store aborted: {e}"),
2499 ));
2500 }
2501
2502 // Non-fatal: this wave's quorum-short chunks are deferred (not failed
2503 // yet) for the post-wave concurrent retry. A deferred chunk joins
2504 // `stored_addresses` only if/when a later round stores it.
2505 deferred.extend(outcome.failed_addresses);
2506
2507 if let Some(tx) = progress {
2508 let _ = tx
2509 .send(UploadEvent::WaveComplete {
2510 wave: wave_num,
2511 total_waves: wave_count,
2512 stored_so_far: total_stored,
2513 total: total_chunks,
2514 })
2515 .await;
2516 }
2517 }
2518
2519 // The wave passes never blocked on backoff; now retry the whole
2520 // file-level deferred set in concurrent rounds. Bodies are re-read from
2521 // the spill at retry time (peak RAM unchanged) and proofs are re-attached
2522 // by `store_one`. Chunks still short after the final round become
2523 // `failed`; a non-quorum error aborts as `PartialUpload`.
2524 if !deferred.is_empty() {
2525 info!(
2526 "Deferring {} merkle chunk(s) short of quorum for concurrent retry after final wave",
2527 deferred.len()
2528 );
2529 let dr = merkle_deferred_retry(
2530 deferred,
2531 &DEFERRED_ROUND_DELAYS_SECS,
2532 // Read and store at most one wave's worth of bodies at a time so
2533 // the deferred path keeps the wave path's ~256 MB peak-memory
2534 // bound regardless of how many chunks were deferred file-wide.
2535 UPLOAD_WAVE_SIZE,
2536 |addrs: &[[u8; 32]]| {
2537 spill.read_wave(addrs).map(|wave| {
2538 wave.into_iter()
2539 .map(|(content, addr)| (addr, content))
2540 .collect()
2541 })
2542 },
2543 |n: usize| store_limiter.current().min(n.max(1)),
2544 progress,
2545 total_stored,
2546 total_chunks,
2547 &store_one,
2548 )
2549 .await?;
2550
2551 stored_addresses.extend(dr.stored_addresses);
2552 total_stored = dr.stored;
2553
2554 // Merge the deferred pass's stats — its histogram is already mapped
2555 // to the right per-round slots — into the file aggregate.
2556 agg_stats.chunk_attempts_total = agg_stats
2557 .chunk_attempts_total
2558 .saturating_add(dr.stats.chunk_attempts_total);
2559 agg_stats
2560 .store_durations_ms
2561 .extend(dr.stats.store_durations_ms);
2562 for (slot, count) in agg_stats
2563 .retries_histogram
2564 .iter_mut()
2565 .zip(dr.stats.retries_histogram.iter())
2566 {
2567 *slot = slot.saturating_add(*count);
2568 }
2569
2570 if let Some(reason) = dr.fatal {
2571 // A non-quorum store error during a deferred round is fatal, the
2572 // same as in the wave path: preserve everything stored so far and
2573 // report every not-stored chunk as failed.
2574 warn!("merkle deferred retry aborted: {reason}");
2575 let mut known_failed = failed;
2576 known_failed.extend(dr.failed_addresses);
2577 return Err(partial_upload_after_fatal(
2578 addresses,
2579 stored_addresses,
2580 total_stored,
2581 total_chunks,
2582 known_failed,
2583 PartialUploadSpend {
2584 storage_cost_atto: batch_result.storage_cost_atto.clone(),
2585 gas_cost_wei: batch_result.gas_cost_wei,
2586 },
2587 format!("merkle chunk store aborted: {reason}"),
2588 ));
2589 }
2590 failed.extend(dr.failed_addresses);
2591 }
2592
2593 // A file with any permanently-failed chunk is not fully stored — surface
2594 // it as `PartialUpload`, but only after the single wave pass and every
2595 // deferred retry round are exhausted (never silently succeed with
2596 // missing chunks).
2597 if !failed.is_empty() {
2598 let failed_count = failed.len();
2599 let total_attempts = 1 + DEFERRED_ROUND_DELAYS_SECS.len();
2600 warn!(
2601 "merkle upload incomplete: {failed_count}/{total_chunks} chunks short of quorum after retries"
2602 );
2603 return Err(Error::PartialUpload {
2604 stored: stored_addresses,
2605 stored_count: total_stored,
2606 failed,
2607 failed_count,
2608 total_chunks,
2609 spend: Box::new(PartialUploadSpend {
2610 storage_cost_atto: batch_result.storage_cost_atto.clone(),
2611 gas_cost_wei: batch_result.gas_cost_wei,
2612 }),
2613 reason: format!(
2614 "{failed_count} chunk(s) short of quorum after {total_attempts} attempts"
2615 ),
2616 });
2617 }
2618
2619 Ok((
2620 total_stored,
2621 batch_result.storage_cost_atto.clone(),
2622 batch_result.gas_cost_wei,
2623 agg_stats,
2624 ))
2625 }
2626
2627 /// Download and decrypt a file from the network, writing it to disk.
2628 ///
2629 /// Uses `streaming_decrypt` so that only one batch of chunks lives in
2630 /// memory at a time, avoiding OOM on large files. Chunks are fetched
2631 /// concurrently within each batch, then decrypted data is written to
2632 /// disk incrementally.
2633 ///
2634 /// Returns the number of bytes written.
2635 ///
2636 /// # Panics
2637 ///
2638 /// Requires a multi-threaded Tokio runtime (`flavor = "multi_thread"`).
2639 /// Will panic if called from a `current_thread` runtime because
2640 /// `streaming_decrypt` takes a synchronous callback that must bridge
2641 /// back to async via `block_in_place`.
2642 ///
2643 /// # Errors
2644 ///
2645 /// Returns an error if any chunk cannot be retrieved, decryption fails,
2646 /// or the file cannot be written.
2647 pub async fn file_download(&self, data_map: &DataMap, output: &Path) -> Result<u64> {
2648 self.file_download_with_progress(data_map, output, None)
2649 .await
2650 }
2651
2652 /// Download and decrypt a file, trying the requested number of
2653 /// closest peers for every chunk fetch.
2654 ///
2655 /// Returns the number of bytes written.
2656 ///
2657 /// # Errors
2658 ///
2659 /// Returns an error if any chunk cannot be retrieved, decryption fails,
2660 /// or the file cannot be written.
2661 pub async fn file_download_from_closest_peers(
2662 &self,
2663 data_map: &DataMap,
2664 output: &Path,
2665 peer_count: NonZeroUsize,
2666 ) -> Result<u64> {
2667 self.file_download_with_progress_using_peer_count(data_map, output, None, peer_count.get())
2668 .await
2669 }
2670
2671 /// Download and decrypt a file with progress events, trying the
2672 /// requested number of closest peers for every chunk fetch.
2673 ///
2674 /// Same as [`Client::file_download_from_closest_peers`] but sends
2675 /// [`DownloadEvent`]s for UI feedback.
2676 ///
2677 /// # Errors
2678 ///
2679 /// Returns an error if any chunk cannot be retrieved, decryption fails,
2680 /// or the file cannot be written.
2681 pub async fn file_download_with_progress_from_closest_peers(
2682 &self,
2683 data_map: &DataMap,
2684 output: &Path,
2685 progress: Option<mpsc::Sender<DownloadEvent>>,
2686 peer_count: NonZeroUsize,
2687 ) -> Result<u64> {
2688 self.file_download_with_progress_using_peer_count(
2689 data_map,
2690 output,
2691 progress,
2692 peer_count.get(),
2693 )
2694 .await
2695 }
2696
2697 /// Shared download core: resolve the DataMap, then fetch + streaming-decrypt
2698 /// the file one batch at a time, handing each decrypted plaintext segment
2699 /// (in order) to `on_chunk`. Constant memory — only one decrypt batch is
2700 /// resident at a time. Returns the total plaintext bytes produced.
2701 ///
2702 /// `on_chunk` is async so a sink can apply backpressure (e.g. a bounded
2703 /// channel). Driving the decrypt iterator runs the batched chunk fetch via
2704 /// `block_in_place`, so this requires a multi-threaded Tokio runtime.
2705 ///
2706 /// Every chunk fetch tries `peer_count` closest peers.
2707 ///
2708 /// Progress reporting (via `progress`):
2709 /// 1. Resolves hierarchical DataMaps to the root level first (reports as
2710 /// `ChunksFetched` with `total: 0` during resolution)
2711 /// 2. Once the root DataMap is known, sends `total_chunks` with accurate count
2712 /// 3. Fetches data chunks with accurate `fetched/total` progress
2713 async fn download_decrypted_chunks<F, Fut>(
2714 &self,
2715 data_map: &DataMap,
2716 progress: Option<mpsc::Sender<DownloadEvent>>,
2717 peer_count: usize,
2718 mut on_chunk: F,
2719 ) -> Result<u64>
2720 where
2721 F: FnMut(Bytes) -> Fut,
2722 Fut: std::future::Future<Output = Result<()>>,
2723 {
2724 let handle = Handle::current();
2725
2726 // Phase 1: Resolve hierarchical DataMap to root level.
2727 // This fetches child DataMap chunks (typically 3) to discover the real chunk count.
2728 let root_map = if data_map.is_child() {
2729 let dm_chunks = data_map.len();
2730 if let Some(ref tx) = progress {
2731 let _ = tx.try_send(DownloadEvent::ResolvingDataMap {
2732 total_map_chunks: dm_chunks,
2733 });
2734 }
2735
2736 let resolve_progress = progress.clone();
2737 let resolve_counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
2738
2739 let resolved = tokio::task::block_in_place(|| {
2740 let counter_ref = resolve_counter.clone();
2741 let progress_ref = resolve_progress.clone();
2742 let fetch_limiter = self.controller().fetch.clone();
2743 let fetch = |batch: &[(usize, XorName)]| {
2744 let batch_owned: Vec<(usize, XorName)> = batch.to_vec();
2745 let counter = counter_ref.clone();
2746 let prog = progress_ref.clone();
2747 let limiter = fetch_limiter.clone();
2748 handle.block_on(async {
2749 // Use rebucketed_unordered so the in-flight cap
2750 // is re-read from the limiter as each slot frees.
2751 // `buffer_unordered` snapshots the cap once at
2752 // pipeline build, which means observe_op
2753 // signals from inside chunk_get cannot reduce
2754 // concurrency on the current batch — exactly
2755 // the case where load-shedding is needed.
2756 let mut results = rebucketed_unordered(
2757 &limiter,
2758 batch_owned,
2759 |(idx, hash): (usize, XorName)| {
2760 let counter = counter.clone();
2761 let prog = prog.clone();
2762 async move {
2763 let addr = hash.0;
2764 // chunk_get_observed feeds the
2765 // adaptive fetch limiter once per
2766 // call via chunk_get_outcome
2767 // (Ok(None) -> Timeout is the
2768 // load-shedding signal for
2769 // sustained close-group exhaustion).
2770 let chunk = self
2771 .chunk_get_observed_from_closest_peers(&addr, peer_count)
2772 .await
2773 .map_err(|e| {
2774 self_encryption::Error::Generic(format!(
2775 "DataMap resolution failed: {e}"
2776 ))
2777 })?
2778 .ok_or_else(|| {
2779 self_encryption::Error::Generic(format!(
2780 "DataMap chunk not found: {}",
2781 hex::encode(addr)
2782 ))
2783 })?;
2784 let fetched = counter
2785 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
2786 + 1;
2787 if let Some(ref tx) = prog {
2788 let _ =
2789 tx.try_send(DownloadEvent::MapChunkFetched { fetched });
2790 }
2791 Ok::<_, self_encryption::Error>((idx, chunk.content))
2792 }
2793 },
2794 )
2795 .await?;
2796 // CRITICAL: self_encryption::get_root_data_map_parallel
2797 // pairs the returned Vec POSITIONALLY with the input
2798 // hashes via .zip() and discards our idx field.
2799 // rebucketed_unordered preserves first-completion
2800 // order, so sort by idx to restore input order
2801 // before returning.
2802 results.sort_by_key(|(idx, _)| *idx);
2803 Ok(results)
2804 })
2805 };
2806 get_root_data_map_parallel(data_map.clone(), &fetch)
2807 })
2808 .map_err(|e| Error::Encryption(format!("DataMap resolution failed: {e}")))?;
2809
2810 info!(
2811 "Resolved hierarchical DataMap: {} data chunks",
2812 resolved.len()
2813 );
2814 resolved
2815 } else {
2816 data_map.clone()
2817 };
2818
2819 // Phase 2: Now we know the real chunk count.
2820 let total_chunks = root_map.len();
2821 if let Some(ref tx) = progress {
2822 let _ = tx.try_send(DownloadEvent::DataMapResolved { total_chunks });
2823 }
2824
2825 // Phase 3: Fetch and decrypt data chunks with accurate progress.
2826 let fetched_counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
2827 let fetched_for_closure = fetched_counter.clone();
2828 let progress_for_closure = progress.clone();
2829
2830 let fetch_limiter_outer = self.controller().fetch.clone();
2831 let usable_memory = usable_memory_bytes();
2832 let configured_batch_floor = stream_decrypt_batch_size();
2833 let fetch_cap = fetch_limiter_outer.current();
2834 let decrypt_batch_size = adaptive_stream_decrypt_batch_size(
2835 total_chunks,
2836 fetch_cap,
2837 configured_batch_floor,
2838 usable_memory,
2839 );
2840 info!(
2841 total_chunks,
2842 fetch_cap,
2843 configured_batch_floor,
2844 ?usable_memory,
2845 decrypt_batch_size,
2846 "Selected adaptive stream decrypt batch size"
2847 );
2848
2849 let stream = streaming_decrypt_with_batch_size(
2850 &root_map,
2851 |batch: &[(usize, XorName)]| {
2852 let batch_owned: Vec<(usize, XorName)> = batch.to_vec();
2853 let fetched_ref = fetched_for_closure.clone();
2854 let progress_ref = progress_for_closure.clone();
2855 let fetch_limiter = fetch_limiter_outer.clone();
2856
2857 tokio::task::block_in_place(|| {
2858 handle.block_on(async {
2859 // First pass: try every chunk in the batch via
2860 // chunk_get_observed (which already does its own
2861 // first-attempt + retry sweep). A chunk that
2862 // returns Ok(None) here is NOT a fatal failure
2863 // — it's a candidate for a deferred retry below.
2864 // We carry the chunk's XorName through so the
2865 // retry pass can re-fetch by address.
2866 //
2867 // The closure ONLY returns Err on a true
2868 // protocol/network error from chunk_get (the
2869 // Err variant). Ok(None) is encoded as
2870 // `Err(addr)` in the inner Result so the outer
2871 // rebucketed pass doesn't early-abort on it.
2872 type BatchEntry =
2873 (usize, std::result::Result<bytes::Bytes, XorName>);
2874 let raw: Vec<BatchEntry> = rebucketed_unordered(
2875 &fetch_limiter,
2876 batch_owned,
2877 |(idx, hash): (usize, XorName)| {
2878 let fetched_ref = fetched_ref.clone();
2879 let progress_ref = progress_ref.clone();
2880 async move {
2881 let addr = hash.0;
2882 let addr_hex = hex::encode(addr);
2883 match self
2884 .chunk_get_observed_from_closest_peers(&addr, peer_count)
2885 .await
2886 {
2887 Ok(Some(chunk)) => {
2888 let fetched = fetched_ref.fetch_add(
2889 1,
2890 std::sync::atomic::Ordering::Relaxed,
2891 ) + 1;
2892 info!("Downloaded {fetched}/{total_chunks}");
2893 if let Some(ref tx) = progress_ref {
2894 let _ = tx.try_send(
2895 DownloadEvent::ChunksFetched {
2896 fetched,
2897 total: total_chunks,
2898 },
2899 );
2900 }
2901 Ok::<BatchEntry, self_encryption::Error>((
2902 idx,
2903 Ok(chunk.content),
2904 ))
2905 }
2906 // chunk_get returned Ok(None): defer
2907 // this chunk for a later retry rather
2908 // than aborting the whole batch.
2909 Ok(None) => Ok((idx, Err(hash))),
2910 // A transient error for one chunk
2911 // (e.g. its close-group DHT walk
2912 // erroring on this pass) must not
2913 // abort a multi-hundred-chunk
2914 // download. Defer it to the retry
2915 // rounds, same as Ok(None); only a
2916 // chunk that survives all deferred
2917 // rounds is fatal.
2918 Err(e) => {
2919 info!(
2920 "First-pass fetch error for {addr_hex}: {e}; deferring"
2921 );
2922 Ok((idx, Err(hash)))
2923 }
2924 }
2925 }
2926 },
2927 )
2928 .await?;
2929
2930 // Partition: things we already have vs the
2931 // deferred set we need to retry.
2932 let mut results: Vec<(usize, bytes::Bytes)> = Vec::new();
2933 let mut deferred: Vec<(usize, XorName)> = Vec::new();
2934 for (idx, inner) in raw {
2935 match inner {
2936 Ok(bytes) => results.push((idx, bytes)),
2937 Err(hash) => deferred.push((idx, hash)),
2938 }
2939 }
2940
2941 // Deferred retry pass: retry the deferred chunks
2942 // in CONCURRENT rounds (reusing the fetch
2943 // limiter's cap), not serially. The first round
2944 // fires immediately — most deferrals on a
2945 // healthy-but-lossy link are peer-side noise
2946 // that clears in well under a second, and
2947 // serializing them behind mandatory multi-second
2948 // sleeps was the single biggest throughput sink
2949 // on such links (a batch deferring ~20 chunks
2950 // burned minutes of near-zero throughput even
2951 // though every chunk succeeded on its first
2952 // retry). Only chunks that survive a round get a
2953 // longer back-off before the next, so genuine
2954 // saturation still gets time to settle.
2955 if !deferred.is_empty() {
2956 // Round delays in seconds. Round 0 is
2957 // immediate; later rounds back off to ride
2958 // out sustained saturation.
2959 const DEFERRED_ROUND_DELAYS_SECS: [u64; 3] = [0, 15, 45];
2960 info!(
2961 "Deferring {} chunk(s) for concurrent retry after batch settles",
2962 deferred.len()
2963 );
2964 let mut remaining = deferred;
2965 for (round, &delay_secs) in DEFERRED_ROUND_DELAYS_SECS
2966 .iter()
2967 .enumerate()
2968 {
2969 if remaining.is_empty() {
2970 break;
2971 }
2972 if delay_secs > 0 {
2973 tokio::time::sleep(std::time::Duration::from_secs(
2974 delay_secs,
2975 ))
2976 .await;
2977 }
2978 info!(
2979 "Deferred retry round {}/{}: {} chunk(s)",
2980 round + 1,
2981 DEFERRED_ROUND_DELAYS_SECS.len(),
2982 remaining.len(),
2983 );
2984 let round_input = std::mem::take(&mut remaining);
2985 let round_results: Vec<BatchEntry> = rebucketed_unordered(
2986 &fetch_limiter,
2987 round_input,
2988 |(idx, hash): (usize, XorName)| {
2989 let fetched_ref = fetched_ref.clone();
2990 let progress_ref = progress_ref.clone();
2991 async move {
2992 let addr = hash.0;
2993 // Both Ok(None) and a transient
2994 // Err re-defer the chunk to the
2995 // next round rather than
2996 // aborting; only the final
2997 // round's leftovers are fatal.
2998 match self
2999 .chunk_get_observed_from_closest_peers(
3000 &addr, peer_count,
3001 )
3002 .await
3003 {
3004 Ok(Some(chunk)) => {
3005 let fetched = fetched_ref.fetch_add(
3006 1,
3007 std::sync::atomic::Ordering::Relaxed,
3008 ) + 1;
3009 info!(
3010 "Downloaded {fetched}/{total_chunks} (deferred retry)"
3011 );
3012 if let Some(ref tx) = progress_ref {
3013 let _ = tx.try_send(
3014 DownloadEvent::ChunksFetched {
3015 fetched,
3016 total: total_chunks,
3017 },
3018 );
3019 }
3020 Ok::<BatchEntry, self_encryption::Error>((
3021 idx,
3022 Ok(chunk.content),
3023 ))
3024 }
3025 Ok(None) => Ok((idx, Err(hash))),
3026 Err(e) => {
3027 info!(
3028 "Deferred retry for {} hit transient error: {e}; re-deferring",
3029 hex::encode(addr)
3030 );
3031 Ok((idx, Err(hash)))
3032 }
3033 }
3034 }
3035 },
3036 )
3037 .await?;
3038 for (idx, inner) in round_results {
3039 match inner {
3040 Ok(bytes) => results.push((idx, bytes)),
3041 Err(hash) => remaining.push((idx, hash)),
3042 }
3043 }
3044 }
3045 if let Some((_, hash)) = remaining.first() {
3046 return Err(self_encryption::Error::Generic(format!(
3047 "Chunk not found after {} deferred retry rounds: {}",
3048 DEFERRED_ROUND_DELAYS_SECS.len(),
3049 hex::encode(hash.0),
3050 )));
3051 }
3052 }
3053
3054 // streaming_decrypt itself sort_by_keys before
3055 // zipping, but the same closure is also passed
3056 // through get_root_data_map_parallel internally
3057 // (see self_encryption::stream_decrypt.rs::new), and
3058 // THAT path zips positionally without sorting. Sort
3059 // here so both consumers see input order.
3060 results.sort_by_key(|(idx, _)| *idx);
3061 Ok(results)
3062 })
3063 })
3064 },
3065 decrypt_batch_size,
3066 )
3067 .map_err(|e| Error::Encryption(format!("streaming decrypt failed: {e}")))?;
3068
3069 // Drive the iterator (each `next()` runs the batched fetch via
3070 // block_in_place) and hand each decrypted segment to the sink in
3071 // order. Awaiting the sink between items yields back to the runtime so
3072 // a bounded sink can apply backpressure.
3073 let mut bytes_total = 0u64;
3074 for chunk_result in stream {
3075 let chunk: Bytes =
3076 chunk_result.map_err(|e| Error::Encryption(format!("decryption failed: {e}")))?;
3077 bytes_total += chunk.len() as u64;
3078 on_chunk(chunk).await?;
3079 }
3080 Ok(bytes_total)
3081 }
3082
3083 /// Download and decrypt a file to disk, with optional progress events.
3084 ///
3085 /// Same as [`Client::file_download`] but sends [`DownloadEvent`]s for UI
3086 /// feedback. Streams to a temp file (one decrypt batch resident at a time)
3087 /// and renames atomically on success. A `TempDownload` guard removes the
3088 /// staging file on any error path, including a panic.
3089 pub async fn file_download_with_progress(
3090 &self,
3091 data_map: &DataMap,
3092 output: &Path,
3093 progress: Option<mpsc::Sender<DownloadEvent>>,
3094 ) -> Result<u64> {
3095 self.file_download_with_progress_using_peer_count(
3096 data_map,
3097 output,
3098 progress,
3099 self.config().close_group_size,
3100 )
3101 .await
3102 }
3103
3104 /// Download and decrypt a file to disk with progress events, trying
3105 /// `peer_count` closest peers for every chunk fetch.
3106 ///
3107 /// Streams to a temp file (one decrypt batch resident at a time) and
3108 /// renames atomically on success.
3109 async fn file_download_with_progress_using_peer_count(
3110 &self,
3111 data_map: &DataMap,
3112 output: &Path,
3113 progress: Option<mpsc::Sender<DownloadEvent>>,
3114 peer_count: usize,
3115 ) -> Result<u64> {
3116 debug!("Downloading file to {}", output.display());
3117
3118 let parent = output.parent().unwrap_or_else(|| Path::new("."));
3119 let unique: u64 = rand::random();
3120 let tmp_path = parent.join(format!(".ant_download_{}_{unique}.tmp", std::process::id()));
3121
3122 // Guard removes the staging file on any early return OR a panic unwind
3123 // out of the `block_in_place` decrypt loop; defused only by a
3124 // successful commit(). Centralizes what used to be three duplicated
3125 // cleanup arms.
3126 let tmp = TempDownload::new(tmp_path);
3127 let mut file = std::fs::File::create(tmp.path())?;
3128
3129 let bytes_written = self
3130 .download_decrypted_chunks(data_map, progress, peer_count, |bytes| {
3131 let r = file.write_all(&bytes).map_err(Error::from);
3132 std::future::ready(r)
3133 })
3134 .await?;
3135 file.flush()?;
3136 drop(file); // close the handle before rename (Windows won't rename an open file)
3137
3138 tmp.commit(output)?;
3139 info!(
3140 "File downloaded: {bytes_written} bytes written to {}",
3141 output.display()
3142 );
3143 Ok(bytes_written)
3144 }
3145
3146 /// Download and decrypt a file, streaming the plaintext to `sink` instead
3147 /// of writing to disk.
3148 ///
3149 /// Constant memory (one decrypt batch resident at a time); the caller
3150 /// receives bytes progressively as each batch decrypts, suitable for
3151 /// forwarding to an HTTP chunked body or a gRPC response stream. The
3152 /// bounded `sink` applies backpressure. If the receiver is dropped (e.g.
3153 /// the client disconnected) the download stops early and returns
3154 /// [`Error::Cancelled`].
3155 ///
3156 /// The channel item type is `Result<Bytes, Error>`, so the caller sets up:
3157 ///
3158 /// ```ignore
3159 /// let (tx, rx) = tokio::sync::mpsc::channel::<Result<Bytes, Error>>(8);
3160 /// ```
3161 ///
3162 /// Typically the caller `tokio::spawn`s this and converts the matching
3163 /// `Receiver` into its response stream. Requires a multi-threaded Tokio
3164 /// runtime (the decrypt iterator uses `block_in_place`).
3165 pub async fn file_download_to_sender(
3166 &self,
3167 data_map: &DataMap,
3168 sink: mpsc::Sender<std::result::Result<Bytes, Error>>,
3169 progress: Option<mpsc::Sender<DownloadEvent>>,
3170 ) -> Result<u64> {
3171 let peer_count = self.config().close_group_size;
3172 self.download_decrypted_chunks(data_map, progress, peer_count, |bytes| {
3173 let sink = sink.clone();
3174 async move {
3175 sink.send(Ok(bytes))
3176 .await
3177 .map_err(|_| Error::Cancelled("download stream receiver dropped".into()))
3178 }
3179 })
3180 .await
3181 }
3182}
3183
3184#[cfg(test)]
3185#[allow(clippy::unwrap_used)]
3186mod tests {
3187 use super::*;
3188
3189 #[test]
3190 fn distributed_sample_indices_spreads_across_large_file() {
3191 // cap 5 over 100 chunks: first and last included, evenly spread.
3192 assert_eq!(distributed_sample_indices(100, 5), vec![0, 24, 49, 74, 99]);
3193 }
3194
3195 #[test]
3196 fn distributed_sample_indices_covers_whole_small_file() {
3197 // total <= cap returns every index, preserving the exact
3198 // "whole file sampled" detection in estimate_upload_cost.
3199 assert_eq!(distributed_sample_indices(3, 5), vec![0, 1, 2]);
3200 assert_eq!(distributed_sample_indices(5, 5), vec![0, 1, 2, 3, 4]);
3201 }
3202
3203 #[test]
3204 fn distributed_sample_indices_is_in_range_and_increasing() {
3205 assert!(distributed_sample_indices(0, 5).is_empty());
3206 assert_eq!(distributed_sample_indices(1, 5), vec![0]);
3207 for total in 1..200usize {
3208 let idx = distributed_sample_indices(total, 5);
3209 assert_eq!(*idx.first().unwrap(), 0);
3210 assert_eq!(*idx.last().unwrap(), total - 1);
3211 assert!(idx.iter().all(|&i| i < total));
3212 assert!(idx.windows(2).all(|w| w[0] < w[1]));
3213 }
3214 }
3215
3216 #[test]
3217 fn disk_space_check_passes_for_small_file() {
3218 // A 1 KB file should always pass the disk space check
3219 check_disk_space_for_spill(1024).unwrap();
3220 }
3221
3222 #[test]
3223 fn disk_space_check_fails_for_absurd_size() {
3224 // Requesting space for a 1 exabyte file should fail on any real system
3225 let result = check_disk_space_for_spill(u64::MAX / 2);
3226 assert!(result.is_err());
3227 let err = result.unwrap_err();
3228 assert!(
3229 matches!(err, Error::InsufficientDiskSpace(_)),
3230 "expected InsufficientDiskSpace, got: {err}"
3231 );
3232 }
3233
3234 #[test]
3235 fn adaptive_stream_decrypt_batch_size_tracks_fetch_headroom() {
3236 let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, Some(u64::MAX));
3237
3238 assert_eq!(batch_size, 64 * DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER);
3239 }
3240
3241 #[test]
3242 fn adaptive_stream_decrypt_batch_size_caps_to_total_chunks() {
3243 let batch_size = adaptive_stream_decrypt_batch_size(12, 64, 10, Some(u64::MAX));
3244
3245 assert_eq!(batch_size, 12);
3246 }
3247
3248 #[test]
3249 fn adaptive_stream_decrypt_batch_size_honours_configured_floor() {
3250 let batch_size = adaptive_stream_decrypt_batch_size(1_000, 1, 32, None);
3251
3252 assert_eq!(batch_size, 32);
3253 }
3254
3255 #[test]
3256 fn adaptive_stream_decrypt_batch_size_does_not_expand_without_memory_reading() {
3257 let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, None);
3258
3259 assert_eq!(batch_size, 10);
3260 }
3261
3262 #[test]
3263 fn adaptive_stream_decrypt_batch_size_caps_to_memory_budget() {
3264 let estimated_bytes_per_chunk = (self_encryption::MAX_CHUNK_SIZE as u64)
3265 .saturating_mul(DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER)
3266 .max(1);
3267 let usable_memory = estimated_bytes_per_chunk
3268 .saturating_mul(16)
3269 .saturating_mul(DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR);
3270 let batch_size = adaptive_stream_decrypt_batch_size(1_000, 256, 10, Some(usable_memory));
3271
3272 assert_eq!(batch_size, 16);
3273 }
3274
3275 #[test]
3276 fn adaptive_stream_decrypt_batch_size_keeps_one_chunk_when_memory_is_tight() {
3277 let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, Some(1));
3278
3279 assert_eq!(batch_size, 1);
3280 }
3281
3282 #[test]
3283 fn cached_merkle_covers_only_when_all_addresses_have_proofs() {
3284 let covered = compute_address(&Bytes::from_static(b"covered"));
3285 let extra = compute_address(&Bytes::from_static(b"extra"));
3286 let missing = compute_address(&Bytes::from_static(b"missing"));
3287 let cached = MerkleBatchPaymentResult {
3288 proofs: HashMap::from([(covered, vec![1]), (extra, vec![2])]),
3289 chunk_count: 2,
3290 storage_cost_atto: "0".to_string(),
3291 gas_cost_wei: 0,
3292 merkle_payment_timestamp: 0,
3293 };
3294
3295 assert!(cached_merkle_covers_addresses(&cached, &[covered]));
3296 assert!(cached_merkle_covers_addresses(&cached, &[covered, extra]));
3297 assert!(!cached_merkle_covers_addresses(
3298 &cached,
3299 &[covered, missing]
3300 ));
3301 }
3302
3303 /// A partial merkle payment leaves some addresses without a proof. Those
3304 /// must be split out so `upload_waves_merkle` reports them as failed
3305 /// (`PartialUpload`) instead of aborting the whole file — preserving the
3306 /// addresses' original order in each group.
3307 #[test]
3308 fn partition_addresses_by_proof_splits_paid_and_unpaid() {
3309 let paid_a = [1u8; 32];
3310 let unpaid_b = [2u8; 32];
3311 let paid_c = [3u8; 32];
3312 let unpaid_d = [4u8; 32];
3313 let proofs: HashMap<[u8; 32], Vec<u8>> =
3314 HashMap::from([(paid_a, vec![0xaa]), (paid_c, vec![0xcc])]);
3315
3316 let (to_store, missing) =
3317 partition_addresses_by_proof(&[paid_a, unpaid_b, paid_c, unpaid_d], &proofs);
3318
3319 assert_eq!(to_store, vec![paid_a, paid_c]);
3320 assert_eq!(missing, vec![unpaid_b, unpaid_d]);
3321 }
3322
3323 /// A wave that returns `Ok` contributes its stored chunks, parsed cost, and
3324 /// stats; nothing is recorded as failed.
3325 #[test]
3326 fn fold_single_wave_keeps_ok_wave() {
3327 let stored = vec![[1u8; 32], [2u8; 32]];
3328 let stats = WaveAggregateStats {
3329 chunk_attempts_total: 7,
3330 ..Default::default()
3331 };
3332
3333 let outcome = fold_single_wave(Ok((stored.clone(), "100".to_string(), 9, stats))).unwrap();
3334
3335 assert_eq!(outcome.stored, stored);
3336 assert!(outcome.failed.is_empty());
3337 assert_eq!(outcome.storage_atto.to_string(), "100");
3338 assert_eq!(outcome.gas_wei, 9);
3339 assert_eq!(outcome.stats.chunk_attempts_total, 7);
3340 }
3341
3342 /// The core V2-461 semantic: a wave short of quorum (`PartialUpload`) is
3343 /// recoverable — its stored chunks, failed chunks, and on-chain spend are
3344 /// folded so the caller can continue to the next wave rather than aborting
3345 /// the whole file.
3346 #[test]
3347 fn fold_single_wave_folds_partial_upload() {
3348 let stored = vec![[3u8; 32]];
3349 let failed = vec![([4u8; 32], "short of quorum".to_string())];
3350 let err = Error::PartialUpload {
3351 stored: stored.clone(),
3352 stored_count: 1,
3353 failed: failed.clone(),
3354 failed_count: 1,
3355 total_chunks: 2,
3356 spend: Box::new(PartialUploadSpend {
3357 storage_cost_atto: "250".to_string(),
3358 gas_cost_wei: 11,
3359 }),
3360 reason: "wave store failed after retries".to_string(),
3361 };
3362
3363 let outcome = fold_single_wave(Err(err)).unwrap();
3364
3365 assert_eq!(outcome.stored, stored);
3366 assert_eq!(outcome.failed, failed);
3367 assert_eq!(outcome.storage_atto.to_string(), "250");
3368 assert_eq!(outcome.gas_wei, 11);
3369 // `PartialUpload` carries no stats, so the failed wave contributes none.
3370 assert_eq!(outcome.stats.chunk_attempts_total, 0);
3371 }
3372
3373 /// A non-`PartialUpload` error (wallet/payment-infrastructure failure) is
3374 /// fatal and must abort the file, not be folded into the failed set.
3375 #[test]
3376 fn fold_single_wave_propagates_fatal_error() {
3377 let result = fold_single_wave(Err(Error::Payment("wallet unavailable".to_string())));
3378
3379 assert!(
3380 matches!(result, Err(Error::Payment(_))),
3381 "fatal payment error must propagate, got: {result:?}"
3382 );
3383 }
3384
3385 #[test]
3386 fn partition_addresses_by_proof_handles_all_or_nothing() {
3387 let a = [5u8; 32];
3388 let b = [6u8; 32];
3389
3390 // No proofs at all → every address is missing.
3391 let empty: HashMap<[u8; 32], Vec<u8>> = HashMap::new();
3392 let (to_store, missing) = partition_addresses_by_proof(&[a, b], &empty);
3393 assert!(to_store.is_empty());
3394 assert_eq!(missing, vec![a, b]);
3395
3396 // All proofs present → nothing missing.
3397 let full: HashMap<[u8; 32], Vec<u8>> = HashMap::from([(a, vec![1]), (b, vec![2])]);
3398 let (to_store, missing) = partition_addresses_by_proof(&[a, b], &full);
3399 assert_eq!(to_store, vec![a, b]);
3400 assert!(missing.is_empty());
3401 }
3402
3403 #[test]
3404 fn chunk_spill_round_trip() {
3405 let mut spill = ChunkSpill::new().unwrap();
3406 let data1 = vec![0xAA; 1024];
3407 let data2 = vec![0xBB; 2048];
3408
3409 spill.push(&data1).unwrap();
3410 spill.push(&data2).unwrap();
3411
3412 assert_eq!(spill.len(), 2);
3413 assert_eq!(spill.total_bytes(), 1024 + 2048);
3414 let chunk_entries = spill.chunk_entries().unwrap();
3415 let entry_total: u64 = chunk_entries.iter().map(|(_, size)| *size).sum();
3416 assert_eq!(entry_total, 1024 + 2048);
3417
3418 // Read back and verify
3419 let chunk1 = spill.read_chunk(spill.addresses.first().unwrap()).unwrap();
3420 assert_eq!(&chunk1[..], &data1[..]);
3421
3422 let chunk2 = spill.read_chunk(spill.addresses.get(1).unwrap()).unwrap();
3423 assert_eq!(&chunk2[..], &data2[..]);
3424
3425 // Verify waves with 1-chunk wave size
3426 let waves: Vec<_> = spill.addresses.chunks(1).collect();
3427 assert_eq!(waves.len(), 2);
3428 }
3429
3430 #[test]
3431 fn chunk_spill_cleanup_on_drop() {
3432 let dir;
3433 {
3434 let spill = ChunkSpill::new().unwrap();
3435 dir = spill.dir.clone();
3436 assert!(dir.exists());
3437 }
3438 // After drop, the directory should be cleaned up
3439 assert!(!dir.exists(), "spill dir should be removed on drop");
3440 }
3441
3442 #[test]
3443 fn chunk_spill_deduplicates_identical_content() {
3444 let mut spill = ChunkSpill::new().unwrap();
3445 let data = vec![0xCC; 512];
3446
3447 spill.push(&data).unwrap();
3448 spill.push(&data).unwrap(); // same content, should be skipped
3449 spill.push(&data).unwrap(); // again
3450
3451 assert_eq!(spill.len(), 1, "duplicate chunks should be deduplicated");
3452 assert_eq!(
3453 spill.total_bytes(),
3454 512,
3455 "total_bytes should count unique only"
3456 );
3457
3458 // Different content should still be added
3459 let data2 = vec![0xDD; 256];
3460 spill.push(&data2).unwrap();
3461 assert_eq!(spill.len(), 2);
3462 assert_eq!(spill.total_bytes(), 512 + 256);
3463 }
3464}
3465
3466/// Compile-time assertions that Client file method futures are Send.
3467#[cfg(test)]
3468mod send_assertions {
3469 use super::*;
3470
3471 fn _assert_send<T: Send>(_: &T) {}
3472
3473 #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
3474 async fn _file_upload_is_send(client: &Client) {
3475 let fut = client.file_upload(Path::new("/dev/null"));
3476 _assert_send(&fut);
3477 }
3478
3479 #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
3480 async fn _file_upload_with_mode_is_send(client: &Client) {
3481 let fut = client.file_upload_with_mode(Path::new("/dev/null"), PaymentMode::Auto);
3482 _assert_send(&fut);
3483 }
3484
3485 #[allow(
3486 dead_code,
3487 unreachable_code,
3488 unused_variables,
3489 clippy::diverging_sub_expression
3490 )]
3491 async fn _file_download_is_send(client: &Client) {
3492 let dm: DataMap = todo!();
3493 let fut = client.file_download(&dm, Path::new("/dev/null"));
3494 _assert_send(&fut);
3495 }
3496}