ant_core/data/client/file.rs
1//! File operations using streaming self-encryption.
2//!
3//! Upload files directly from disk without loading them entirely into memory.
4//! Uses `stream_encrypt` to process files in 8KB chunks, encrypting and
5//! uploading each piece as it's produced.
6//!
7//! Encrypted chunks are spilled to a temporary directory during encryption
8//! so that peak memory usage is bounded to one wave (~256 MB for 64 × 4 MB
9//! chunks) regardless of file size.
10//!
11//! For in-memory data uploads, see the `data` module.
12
13use crate::data::client::adaptive::{observe_op, rebucketed_unordered};
14use crate::data::client::batch::{
15 finalize_batch_payment, PaymentIntent, PreparedChunk, WaveAggregateStats,
16};
17use crate::data::client::chunk::ChunkPeerGetResult;
18use crate::data::client::classify_error;
19use crate::data::client::merkle::{
20 chunk_contents_for_upload_addresses, finalize_merkle_batch, merkle_deferred_retry,
21 merkle_store_with_retry, should_use_merkle, MerkleBatchPaymentResult, PaymentMode,
22 PreparedMerkleBatch, DEFERRED_ROUND_DELAYS_SECS,
23};
24use crate::data::client::Client;
25use crate::data::error::{Error, PartialUploadSpend, Result};
26use ant_protocol::evm::{Amount, PaymentQuote, QuoteHash, TxHash, MAX_LEAVES};
27use ant_protocol::transport::{MultiAddr, PeerId};
28use ant_protocol::{compute_address, XorName as ChunkAddress, DATA_TYPE_CHUNK};
29use bytes::Bytes;
30use fs2::FileExt;
31use futures::stream::{self, StreamExt};
32use self_encryption::{
33 get_root_data_map_parallel, stream_decrypt_batch_size, stream_encrypt,
34 streaming_decrypt_with_batch_size, DataMap,
35};
36use std::collections::{HashMap, HashSet};
37use std::io::Write;
38use std::num::NonZeroUsize;
39use std::path::{Path, PathBuf};
40use std::sync::{Arc, Mutex};
41use tokio::runtime::Handle;
42use tokio::sync::mpsc;
43use tracing::{debug, info, warn};
44use xor_name::XorName;
45
46/// Progress events emitted during file upload for UI feedback.
47#[derive(Debug, Clone)]
48pub enum UploadEvent {
49 /// A chunk has been encrypted and spilled to disk.
50 Encrypting { chunks_done: usize },
51 /// File encryption complete.
52 Encrypted { total_chunks: usize },
53 /// Starting quote collection for a wave.
54 QuotingChunks {
55 wave: usize,
56 total_waves: usize,
57 chunks_in_wave: usize,
58 },
59 /// A chunk has been quoted (peer discovery + price received).
60 /// This is the slow phase — each quote involves network round-trips.
61 ChunkQuoted { quoted: usize, total: usize },
62 /// A chunk has been stored on the network.
63 ChunkStored { stored: usize, total: usize },
64}
65
66/// Progress events emitted during file download for UI feedback.
67#[derive(Debug, Clone)]
68pub enum DownloadEvent {
69 /// Resolving hierarchical DataMap to discover real chunk count.
70 ResolvingDataMap { total_map_chunks: usize },
71 /// A DataMap chunk has been fetched during resolution.
72 MapChunkFetched { fetched: usize },
73 /// DataMap resolved — total data chunk count now known.
74 DataMapResolved { total_chunks: usize },
75 /// Data chunks are being fetched from the network.
76 ChunksFetched { fetched: usize, total: usize },
77}
78
79/// File download result when peer-health diagnostics are enabled.
80#[derive(Debug, Clone)]
81pub struct FileDownloadWithPeerReport {
82 /// Number of plaintext bytes written to the destination.
83 pub bytes_written: u64,
84 /// Per-file-chunk closest-peer GET results collected during the actual download.
85 pub chunk_reports: Vec<FileChunkPeerReport>,
86}
87
88/// Closest-peer GET results for one file chunk.
89#[derive(Debug, Clone)]
90pub struct FileChunkPeerReport {
91 /// 1-based chunk index in the resolved file DataMap.
92 pub index: usize,
93 /// Chunk address.
94 pub address: ChunkAddress,
95 /// All diagnostic GET sweeps attempted for this chunk.
96 pub sweeps: Vec<FileChunkPeerSweepReport>,
97}
98
99/// One all-peer diagnostic GET sweep for a file chunk.
100#[derive(Debug, Clone)]
101pub struct FileChunkPeerSweepReport {
102 /// 1-based attempt number for this chunk.
103 pub attempt: usize,
104 /// Whether this sweep happened during a deferred retry round.
105 pub deferred_retry: bool,
106 /// DHT lookup / sweep-level error, if the closest-peer group could not be queried.
107 pub error: Option<String>,
108 /// Per-peer results, sorted closest first.
109 pub peers: Vec<FileChunkPeerReportPeer>,
110}
111
112/// One peer result in a [`FileChunkPeerReport`].
113#[derive(Debug, Clone)]
114pub struct FileChunkPeerReportPeer {
115 /// Peer queried for the chunk.
116 pub peer_id: PeerId,
117 /// Known network addresses used for the peer.
118 pub peer_addrs: Vec<MultiAddr>,
119 /// XOR distance from `peer_id` to the chunk address.
120 pub xor_distance: ChunkAddress,
121 /// Whether this peer returned the chunk or why it did not.
122 pub status: FileChunkPeerStatus,
123}
124
125/// Peer-level file chunk GET diagnostic status.
126#[derive(Debug, Clone)]
127pub enum FileChunkPeerStatus {
128 /// The peer returned the chunk.
129 Found { bytes: usize },
130 /// The peer responded authoritatively that it does not store the chunk.
131 NotFound,
132 /// The peer did not respond before the timeout.
133 Timeout { message: String },
134 /// The transport/network path to the peer failed.
135 NetworkError { message: String },
136 /// Any other per-peer error.
137 Error { message: String },
138}
139
140/// One entry in the per-chunk quote list returned by
141/// [`Client::get_store_quotes`]: the responding peer, its addresses, the
142/// signed quote it returned, and the payment amount it is demanding.
143type QuoteEntry = (PeerId, Vec<MultiAddr>, PaymentQuote, Amount);
144
145type DownloadBatchEntry = (usize, std::result::Result<Bytes, XorName>);
146
147#[derive(Debug, Clone)]
148struct RecordedFileChunkPeerSweep {
149 index: usize,
150 address: ChunkAddress,
151 sweep: FileChunkPeerSweepReport,
152}
153
154#[derive(Clone)]
155struct FileDownloadFetchContext {
156 total_chunks: usize,
157 peer_count: usize,
158 fetched_ref: Arc<std::sync::atomic::AtomicUsize>,
159 progress_ref: Option<mpsc::Sender<DownloadEvent>>,
160 peer_reports: Option<Arc<Mutex<Vec<RecordedFileChunkPeerSweep>>>>,
161}
162
163/// Number of chunks per upload wave (matches batch.rs PAYMENT_WAVE_SIZE).
164const UPLOAD_WAVE_SIZE: usize = 64;
165
166/// Hard ceiling on chunk bodies held in memory at once by the merkle whole-file
167/// store fan-out (`upload_merkle_from_spill`). Each in-flight store holds one
168/// spilled body (≤ `MAX_CHUNK_SIZE` = 4 MiB), so this bounds peak resident store
169/// memory at ~256 MiB — the same bound the old fixed 64-chunk waves gave. The
170/// adaptive store cap can legitimately exceed this (`AdaptiveConfig::sanitize`
171/// permits `adaptive.max.store` above 64), so the fan-out clamps its cap here to
172/// keep a high configured max from pinning gigabytes of chunk bodies (PR #137
173/// review). Throughput is unaffected at the default cap, which is already 64.
174const MERKLE_STORE_MAX_IN_FLIGHT: usize = 64;
175
176/// The merkle whole-file store fan-out concurrency: the adaptive store cap,
177/// clamped to [`MERKLE_STORE_MAX_IN_FLIGHT`] (memory bound) and floored at 1.
178fn merkle_store_cap(limiter_current: usize) -> usize {
179 limiter_current.clamp(1, MERKLE_STORE_MAX_IN_FLIGHT)
180}
181
182/// Stream decrypt batches should be larger than fetch fan-out so
183/// the rolling fetch scheduler can keep launching new chunk GETs as earlier
184/// ones complete, instead of stopping at each self-encryption batch boundary.
185const DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER: usize = 4;
186
187/// Use at most this fraction of currently usable RAM for one decrypt batch.
188const DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR: u64 = 4;
189
190/// A decrypt batch briefly holds encrypted chunk bytes, decrypted chunk bytes,
191/// and Vec/Bytes overhead. Use a conservative multiplier rather than assuming
192/// payload bytes alone.
193const DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER: u64 = 3;
194
195/// Maximum number of distinct chunk addresses to sample when probing for a
196/// representative quote in [`Client::estimate_upload_cost`].
197///
198/// Bounded small so we never spend more than a couple of round-trips on the
199/// `AlreadyStored` retry path, which only matters when many leading chunks
200/// of a file already live on the network.
201const ESTIMATE_SAMPLE_CAP: usize = 5;
202
203/// First diagnostic all-peer fetch attempt for a file chunk.
204const FIRST_DIAGNOSTIC_FETCH_ATTEMPT: usize = 1;
205
206/// Deferred retry attempt number for retry round 0.
207const DEFERRED_RETRY_ATTEMPT_OFFSET: usize = 2;
208
209/// Pick up to `cap` chunk indices spread evenly across `[0, total)`, always
210/// including the first and last chunk.
211///
212/// Sampling the *first* N chunks biases the probe: a file sharing a leading
213/// prefix with a prior upload (compressed archives, similar headers) reports
214/// those chunks as `AlreadyStored` even when the tail is new, so a positional
215/// sample looks in the worst possible place. Spreading the sample means a
216/// single new chunk anywhere in the file yields a real price.
217///
218/// Returns `[0]` for a single chunk and every index when `total <= cap`, so
219/// [`Client::estimate_upload_cost`] can still detect the "whole file sampled"
220/// case. Indices are strictly increasing.
221fn distributed_sample_indices(total: usize, cap: usize) -> Vec<usize> {
222 if total == 0 {
223 return Vec::new();
224 }
225 let sample_limit = total.min(cap);
226 if sample_limit <= 1 {
227 return vec![0];
228 }
229 let mut indices: Vec<usize> = (0..sample_limit)
230 .map(|i| i * (total - 1) / (sample_limit - 1))
231 .collect();
232 indices.dedup(); // defensive: already strictly increasing for cap >= 2
233 indices
234}
235
236fn file_chunk_sweep_report_from_peer_results(
237 attempt: usize,
238 deferred_retry: bool,
239 results: &[ChunkPeerGetResult],
240) -> (Option<Bytes>, FileChunkPeerSweepReport) {
241 let mut content = None;
242 let peers = results
243 .iter()
244 .map(|result| {
245 if content.is_none() {
246 if let Ok(Some(chunk)) = &result.chunk_result {
247 content = Some(chunk.content.clone());
248 }
249 }
250
251 FileChunkPeerReportPeer {
252 peer_id: result.peer_id,
253 peer_addrs: result.peer_addrs.clone(),
254 xor_distance: result.xor_distance,
255 status: file_chunk_peer_status(&result.chunk_result),
256 }
257 })
258 .collect();
259
260 (
261 content,
262 FileChunkPeerSweepReport {
263 attempt,
264 deferred_retry,
265 error: None,
266 peers,
267 },
268 )
269}
270
271fn file_chunk_sweep_report_from_error(
272 attempt: usize,
273 deferred_retry: bool,
274 error: &Error,
275) -> FileChunkPeerSweepReport {
276 FileChunkPeerSweepReport {
277 attempt,
278 deferred_retry,
279 error: Some(error.to_string()),
280 peers: Vec::new(),
281 }
282}
283
284fn file_chunk_reports_from_recorded_sweeps(
285 mut sweeps: Vec<RecordedFileChunkPeerSweep>,
286) -> Vec<FileChunkPeerReport> {
287 sweeps.sort_by_key(|record| (record.index, record.sweep.attempt));
288
289 let mut reports: Vec<FileChunkPeerReport> = Vec::new();
290 for record in sweeps {
291 if let Some(report) = reports
292 .last_mut()
293 .filter(|report| report.index == record.index)
294 {
295 report.sweeps.push(record.sweep);
296 continue;
297 }
298
299 reports.push(FileChunkPeerReport {
300 index: record.index,
301 address: record.address,
302 sweeps: vec![record.sweep],
303 });
304 }
305
306 reports
307}
308
309fn file_chunk_peer_status(
310 chunk_result: &std::result::Result<Option<ant_protocol::DataChunk>, Error>,
311) -> FileChunkPeerStatus {
312 match chunk_result {
313 Ok(Some(chunk)) => FileChunkPeerStatus::Found {
314 bytes: chunk.content.len(),
315 },
316 Ok(None) => FileChunkPeerStatus::NotFound,
317 Err(Error::Timeout(e)) => FileChunkPeerStatus::Timeout { message: e.clone() },
318 Err(Error::Network(e)) => FileChunkPeerStatus::NetworkError { message: e.clone() },
319 Err(e) => FileChunkPeerStatus::Error {
320 message: e.to_string(),
321 },
322 }
323}
324
325/// Gas used by one `pay_for_quotes` transaction that packs up to
326/// `UPLOAD_WAVE_SIZE` (quote_hash, rewards_address, amount) entries.
327///
328/// `batch_pay` in `batch.rs` flattens every chunk's close-group quotes into a
329/// single EVM call, so the dominant cost is the SSTOREs for each entry plus
330/// the base tx overhead. On Arbitrum that is roughly
331/// `21_000 + 64 × (20_000 + small)` ≈ 1.3M; we round up to 1.5M as a
332/// conservative per-wave upper bound.
333const GAS_PER_WAVE_TX: u128 = 1_500_000;
334
335/// Gas used by one merkle batch payment transaction.
336///
337/// One on-chain tx per merkle sub-batch, but each tx verifies a merkle tree
338/// and posts a pool commitment, so budget higher than a plain transfer.
339const GAS_PER_MERKLE_TX: u128 = 500_000;
340
341/// Advisory gas price (wei/gas) used to turn the gas estimate into an ETH
342/// figure when no live gas oracle is consulted.
343///
344/// Arbitrum One typically settles around 0.1 gwei on quiet blocks; we use
345/// that as the default so the CLI prints a sensible order-of-magnitude
346/// number. Users should treat the reported gas cost as an estimate, not a
347/// commitment — real gas is bid at submission time.
348const ARBITRUM_GAS_PRICE_WEI: u128 = 100_000_000;
349
350/// Extra headroom percentage for disk space check.
351///
352/// Encrypted chunks are slightly larger than the source data due to padding
353/// and self-encryption overhead. We require file_size + 10% free space in
354/// the temp directory to account for this.
355const DISK_SPACE_HEADROOM_PERCENT: u64 = 10;
356
357/// Temporary on-disk buffer for encrypted chunks.
358///
359/// During file encryption, chunks are written to a temp directory so that
360/// only their 32-byte addresses stay in memory. At upload time chunks are
361/// read back one wave at a time, keeping peak RAM at ~`UPLOAD_WAVE_SIZE × 4 MB`.
362/// Grace period (in seconds) before a spill dir is eligible for stale cleanup.
363///
364/// This is a small TOCTOU guard covering the sub-millisecond window inside
365/// [`ChunkSpill::new`] between `create_dir` and `try_lock_exclusive`. Once a
366/// dir is older than this and its lockfile is releasable, the owning process
367/// is gone and the dir is safe to reap — regardless of how old it is.
368///
369/// The previous policy waited 24 h before reaping any orphan, which meant
370/// that any non-graceful exit (SIGKILL, kernel OOM, panic abort) leaked its
371/// spill dir until the next day's upload — and on a host being restart-looped
372/// by systemd, orphans could fill the disk well within that window.
373const SPILL_STALE_GRACE_SECS: u64 = 30;
374
375/// Prefix for spill directory names to distinguish from user files.
376const SPILL_DIR_PREFIX: &str = "spill_";
377
378/// Lockfile name inside each spill dir to signal active use.
379const SPILL_LOCK_NAME: &str = ".lock";
380
381struct ChunkSpill {
382 /// Directory holding spilled chunk files (named by hex address).
383 dir: PathBuf,
384 /// Lockfile held for the lifetime of this spill (prevents stale cleanup).
385 _lock: std::fs::File,
386 /// Deduplicated list of chunk addresses.
387 addresses: Vec<[u8; 32]>,
388 /// Tracks seen addresses for deduplication.
389 seen: HashSet<[u8; 32]>,
390 /// Byte size per spilled chunk address.
391 sizes: HashMap<[u8; 32], u64>,
392 /// Running total of unique chunk byte sizes (for average-size calculation).
393 total_bytes: u64,
394}
395
396impl ChunkSpill {
397 /// Return the parent directory for all spill dirs: `<data_dir>/spill/`.
398 fn spill_root() -> Result<PathBuf> {
399 use crate::config;
400 let root = config::data_dir()
401 .map_err(|e| Error::Config(format!("cannot determine data dir for spill: {e}")))?
402 .join("spill");
403 Ok(root)
404 }
405
406 /// Create a new spill directory under `<data_dir>/spill/`.
407 ///
408 /// Directory name is `spill_<timestamp>_<random>` so orphans can be
409 /// identified by prefix and cleaned up by age. A lockfile inside the
410 /// dir prevents concurrent cleanup from deleting an active spill.
411 fn new() -> Result<Self> {
412 let root = Self::spill_root()?;
413 std::fs::create_dir_all(&root)?;
414
415 // Clean up stale spill dirs from previous crashed runs.
416 Self::cleanup_stale(&root);
417
418 let now = std::time::SystemTime::now()
419 .duration_since(std::time::UNIX_EPOCH)
420 .unwrap_or_default()
421 .as_secs();
422 let unique: u64 = rand::random();
423 let dir = root.join(format!("{SPILL_DIR_PREFIX}{now}_{unique}"));
424 std::fs::create_dir(&dir)?;
425
426 // Create and hold a lockfile for the lifetime of this spill.
427 // cleanup_stale() will skip dirs with locked files.
428 let lock_path = dir.join(SPILL_LOCK_NAME);
429 let lock_file = std::fs::File::create(&lock_path).map_err(|e| {
430 Error::Io(std::io::Error::new(
431 e.kind(),
432 format!("failed to create spill lockfile: {e}"),
433 ))
434 })?;
435 lock_file.try_lock_exclusive().map_err(|e| {
436 Error::Io(std::io::Error::new(
437 e.kind(),
438 format!("failed to lock spill lockfile: {e}"),
439 ))
440 })?;
441
442 Ok(Self {
443 dir,
444 _lock: lock_file,
445 addresses: Vec::new(),
446 seen: HashSet::new(),
447 sizes: HashMap::new(),
448 total_bytes: 0,
449 })
450 }
451
452 /// Clean up stale spill directories. Best-effort, errors are logged.
453 ///
454 /// A spill dir is reaped when:
455 /// 1. Its name starts with `SPILL_DIR_PREFIX` (ignores unrelated files)
456 /// 2. It is an actual directory, not a symlink (prevents symlink attacks)
457 /// 3. Its timestamp is older than `SPILL_STALE_GRACE_SECS` (TOCTOU guard)
458 /// 4. Its lockfile is releasable — i.e. no live process holds it
459 ///
460 /// The lockfile is the primary correctness gate: a releasable lock means
461 /// the owning `ChunkSpill` has been dropped or the process is gone, so
462 /// the dir is fair game. The grace period covers only the brief window
463 /// inside [`Self::new`] between `create_dir` and `try_lock_exclusive`.
464 ///
465 /// Safe to call concurrently from multiple processes.
466 fn cleanup_stale(root: &Path) {
467 let now = std::time::SystemTime::now()
468 .duration_since(std::time::UNIX_EPOCH)
469 .unwrap_or_default()
470 .as_secs();
471
472 if now == 0 {
473 // Clock is broken (before Unix epoch). Skip cleanup to avoid
474 // misidentifying dirs as stale.
475 warn!("System clock before Unix epoch, skipping spill cleanup");
476 return;
477 }
478
479 let entries = match std::fs::read_dir(root) {
480 Ok(entries) => entries,
481 Err(_) => return,
482 };
483
484 for entry in entries.flatten() {
485 let name = entry.file_name();
486 let name_str = name.to_string_lossy();
487
488 // Only process dirs with our prefix.
489 let suffix = match name_str.strip_prefix(SPILL_DIR_PREFIX) {
490 Some(s) => s,
491 None => continue,
492 };
493
494 // Parse timestamp: "spill_<timestamp>_<random>"
495 let timestamp: u64 = match suffix.split('_').next().and_then(|s| s.parse().ok()) {
496 Some(ts) => ts,
497 None => continue,
498 };
499
500 if now.saturating_sub(timestamp) < SPILL_STALE_GRACE_SECS {
501 continue;
502 }
503
504 // Safety: only delete actual directories, not symlinks.
505 let file_type = match entry.file_type() {
506 Ok(ft) => ft,
507 Err(_) => continue,
508 };
509 if !file_type.is_dir() {
510 continue;
511 }
512
513 let path = entry.path();
514
515 // Check lockfile: if locked, the dir is in active use -- skip it.
516 let lock_path = path.join(SPILL_LOCK_NAME);
517 if let Ok(lock_file) = std::fs::File::open(&lock_path) {
518 use fs2::FileExt;
519 if lock_file.try_lock_exclusive().is_err() {
520 // Lock held by another process -- dir is active.
521 debug!("Skipping active spill dir: {}", path.display());
522 continue;
523 }
524 // We acquired the lock, so no one else holds it.
525 // Drop it before deleting.
526 drop(lock_file);
527 }
528
529 info!("Cleaning up stale spill dir: {}", path.display());
530 if let Err(e) = std::fs::remove_dir_all(&path) {
531 warn!("Failed to clean up stale spill dir {}: {e}", path.display());
532 }
533 }
534 }
535
536 /// Run stale spill cleanup. Call at client startup or periodically.
537 #[allow(dead_code)]
538 pub(crate) fn run_cleanup() {
539 if let Ok(root) = Self::spill_root() {
540 Self::cleanup_stale(&root);
541 }
542 }
543
544 /// Write one encrypted chunk to disk and record its address.
545 ///
546 /// Deduplicates by content address: if the same chunk was already
547 /// spilled, the write and accounting are skipped. This prevents
548 /// double-uploads and inflated quoting metrics.
549 fn push(&mut self, content: &[u8]) -> Result<()> {
550 let address = compute_address(content);
551 if !self.seen.insert(address) {
552 return Ok(());
553 }
554 let path = self.dir.join(hex::encode(address));
555 std::fs::write(&path, content)?;
556 let content_len = content.len() as u64;
557 self.sizes.insert(address, content_len);
558 self.total_bytes += content_len;
559 self.addresses.push(address);
560 Ok(())
561 }
562
563 /// Number of chunks stored.
564 fn len(&self) -> usize {
565 self.addresses.len()
566 }
567
568 /// Total bytes of all spilled chunks.
569 fn total_bytes(&self) -> u64 {
570 self.total_bytes
571 }
572
573 /// Address and byte-size pairs for all spilled chunks.
574 fn chunk_entries(&self) -> Result<Vec<([u8; 32], u64)>> {
575 self.addresses
576 .iter()
577 .map(|address| {
578 self.sizes
579 .get(address)
580 .copied()
581 .map(|size| (*address, size))
582 .ok_or_else(|| {
583 Error::Storage(format!(
584 "missing size for spilled chunk {}",
585 hex::encode(address)
586 ))
587 })
588 })
589 .collect()
590 }
591
592 /// Read a single chunk back from disk by address.
593 fn read_chunk(&self, address: &[u8; 32]) -> Result<Bytes> {
594 let path = self.dir.join(hex::encode(address));
595 let data = std::fs::read(&path).map_err(|e| {
596 Error::Io(std::io::Error::new(
597 e.kind(),
598 format!("reading spilled chunk {}: {e}", hex::encode(address)),
599 ))
600 })?;
601 Ok(Bytes::from(data))
602 }
603
604 /// Clean up the spill directory.
605 fn cleanup(&self) {
606 if let Err(e) = std::fs::remove_dir_all(&self.dir) {
607 warn!(
608 "Failed to clean up chunk spill dir {}: {e}",
609 self.dir.display()
610 );
611 }
612 }
613}
614
615impl Drop for ChunkSpill {
616 fn drop(&mut self) {
617 self.cleanup();
618 }
619}
620
621fn cached_merkle_covers_addresses(
622 cached: &MerkleBatchPaymentResult,
623 addresses: &[[u8; 32]],
624) -> bool {
625 addresses
626 .iter()
627 .all(|addr| cached.proofs.contains_key(addr))
628}
629
630/// Split `addresses` into `(to_store, missing_proof)`: those that have a merkle
631/// proof in `proofs`, and those that don't.
632///
633/// A partial [`MerkleBatchPaymentResult`] (from a `pay_for_merkle_multi_batch`
634/// where a later sub-batch's payment failed) carries proofs only for the
635/// already-paid sub-batches, so unpaid chunks reach the upload path with no
636/// proof. `upload_merkle_from_spill` reports those as failed via
637/// [`Error::PartialUpload`] rather than aborting the whole file. Order within
638/// each group follows `addresses`.
639fn partition_addresses_by_proof(
640 addresses: &[[u8; 32]],
641 proofs: &HashMap<[u8; 32], Vec<u8>>,
642) -> (Vec<[u8; 32]>, Vec<[u8; 32]>) {
643 addresses
644 .iter()
645 .copied()
646 .partition(|addr| proofs.contains_key(addr))
647}
648
649/// Build a `PartialUpload` after a fatal merkle store error, with accurate
650/// counts.
651///
652/// A fatal abort can leave chunks in three states: confirmed stored (in
653/// `stored_addresses`), known-failed (in `known_failed` — missing proofs, the
654/// quorum shortfalls and the fatal chunk seen so far), and "in flight when the
655/// abort hit" (neither). Rather than trust the helpers to enumerate the last
656/// group, this derives the failed set authoritatively as *every* `addresses`
657/// entry not in `stored_addresses`, preferring a known per-chunk message and
658/// falling back to the fatal `reason`. That guarantees
659/// `stored_count + failed_count` accounts for the whole file — fixing the
660/// under-reporting where a fatal wave could surface `failed_count = 0` and omit
661/// same-pass successes.
662fn partial_upload_after_fatal(
663 addresses: &[[u8; 32]],
664 stored_addresses: Vec<[u8; 32]>,
665 stored_count: usize,
666 total_chunks: usize,
667 known_failed: Vec<([u8; 32], String)>,
668 spend: PartialUploadSpend,
669 reason: String,
670) -> Error {
671 let stored_set: HashSet<[u8; 32]> = stored_addresses.iter().copied().collect();
672 let mut failed_map: HashMap<[u8; 32], String> = HashMap::new();
673 for (addr, msg) in known_failed {
674 if !stored_set.contains(&addr) {
675 failed_map.entry(addr).or_insert(msg);
676 }
677 }
678 for addr in addresses {
679 if !stored_set.contains(addr) {
680 failed_map.entry(*addr).or_insert_with(|| reason.clone());
681 }
682 }
683 let failed: Vec<([u8; 32], String)> = failed_map.into_iter().collect();
684 let failed_count = failed.len();
685 Error::PartialUpload {
686 stored: stored_addresses,
687 stored_count,
688 failed,
689 failed_count,
690 total_chunks,
691 spend: Box::new(spend),
692 reason,
693 }
694}
695
696/// One wave's contribution to a single-node upload, distilled from its
697/// `batch_upload_chunks_with_events` result.
698#[derive(Debug)]
699struct SingleWaveOutcome {
700 /// Addresses confirmed stored in this wave.
701 stored: Vec<[u8; 32]>,
702 /// Chunks that failed after retries in this wave.
703 failed: Vec<([u8; 32], String)>,
704 /// Storage cost paid on-chain for this wave, in atto-tokens.
705 storage_atto: Amount,
706 /// Gas paid on-chain for this wave, in wei.
707 gas_wei: u128,
708 /// Per-wave store/retry statistics. Empty for a quorum-short wave, whose
709 /// `PartialUpload` carries no stats.
710 stats: WaveAggregateStats,
711}
712
713/// Fold one wave's batch-upload result for the single-node path.
714///
715/// A `PartialUpload` (chunks short of quorum after retries) is **recoverable**:
716/// its stored/failed chunks and on-chain spend are returned so the caller
717/// records them and continues to the next wave, making the file make maximum
718/// progress exactly like `upload_merkle_from_spill`. Every other error is **fatal**
719/// (wallet/payment-infrastructure failures, missing proofs, spill reads) and is
720/// returned via `Err` to abort the file. Because `UPLOAD_WAVE_SIZE ==
721/// PAYMENT_WAVE_SIZE`, each batch call is exactly one payment wave, so folding a
722/// `PartialUpload` leaves nothing un-attempted within the wave.
723fn fold_single_wave(
724 result: Result<(Vec<[u8; 32]>, String, u128, WaveAggregateStats)>,
725) -> Result<SingleWaveOutcome> {
726 match result {
727 Ok((stored, storage, gas, stats)) => Ok(SingleWaveOutcome {
728 stored,
729 failed: Vec::new(),
730 storage_atto: storage.parse().unwrap_or(Amount::ZERO),
731 gas_wei: gas,
732 stats,
733 }),
734 Err(Error::PartialUpload {
735 stored,
736 failed,
737 spend,
738 ..
739 }) => Ok(SingleWaveOutcome {
740 stored,
741 failed,
742 storage_atto: spend.storage_cost_atto.parse().unwrap_or(Amount::ZERO),
743 gas_wei: spend.gas_cost_wei,
744 stats: WaveAggregateStats::default(),
745 }),
746 Err(e) => Err(e),
747 }
748}
749
750/// Check that the spill directory has enough free space for the spilled chunks.
751///
752/// `file_size` is the source file's byte count. We require
753/// `file_size + 10%` free space to account for self-encryption overhead.
754fn check_disk_space_for_spill(file_size: u64) -> Result<()> {
755 let spill_root = ChunkSpill::spill_root()?;
756
757 // Ensure the root exists so fs2 can query it.
758 std::fs::create_dir_all(&spill_root)?;
759
760 let available = fs2::available_space(&spill_root).map_err(|e| {
761 Error::Io(std::io::Error::new(
762 e.kind(),
763 format!(
764 "failed to query disk space on {}: {e}",
765 spill_root.display()
766 ),
767 ))
768 })?;
769
770 // Use integer arithmetic to avoid f64 precision loss on large file sizes.
771 let headroom = file_size / DISK_SPACE_HEADROOM_PERCENT;
772 let required = file_size.saturating_add(headroom);
773
774 if available < required {
775 let avail_mb = available / (1024 * 1024);
776 let req_mb = required / (1024 * 1024);
777 return Err(Error::InsufficientDiskSpace(format!(
778 "need ~{req_mb} MB in spill dir ({}) but only {avail_mb} MB available",
779 spill_root.display()
780 )));
781 }
782
783 debug!(
784 "Disk space check passed: {available} bytes available, {required} bytes required (spill: {})",
785 spill_root.display()
786 );
787 Ok(())
788}
789
790fn usable_memory_bytes() -> Option<u64> {
791 let mut system = sysinfo::System::new();
792 system.refresh_memory();
793
794 let available_memory = system.available_memory();
795 let free_memory = system.free_memory();
796 let used_memory = system.used_memory();
797 let total_memory = system.total_memory();
798 let unused_memory = total_memory.saturating_sub(used_memory);
799
800 let mut usable = [available_memory, free_memory, unused_memory]
801 .into_iter()
802 .filter(|bytes| *bytes > 0)
803 .max();
804
805 let cgroup_free_memory = system
806 .cgroup_limits()
807 .filter(|limits| limits.total_memory > 0)
808 .map(|limits| limits.free_memory);
809 if let Some(cgroup_free_memory) = cgroup_free_memory {
810 usable = Some(usable.unwrap_or(u64::MAX).min(cgroup_free_memory));
811 }
812
813 debug!(
814 available_memory,
815 free_memory,
816 used_memory,
817 total_memory,
818 cgroup_free_memory,
819 usable_memory = ?usable,
820 "Detected usable memory for stream decrypt batch sizing"
821 );
822
823 usable
824}
825
826fn stream_decrypt_batch_memory_cap(usable_memory_bytes: u64) -> usize {
827 let budget = usable_memory_bytes / DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR;
828 let estimated_bytes_per_chunk = (self_encryption::MAX_CHUNK_SIZE as u64)
829 .saturating_mul(DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER)
830 .max(1);
831 let cap = (budget / estimated_bytes_per_chunk).max(1);
832
833 usize::try_from(cap).unwrap_or(usize::MAX)
834}
835
836fn adaptive_stream_decrypt_batch_size(
837 total_chunks: usize,
838 fetch_cap: usize,
839 configured_batch_floor: usize,
840 usable_memory_bytes: Option<u64>,
841) -> usize {
842 let fetch_target = fetch_cap
843 .max(1)
844 .saturating_mul(DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER);
845 let requested = match usable_memory_bytes {
846 Some(bytes) => {
847 let memory_cap = stream_decrypt_batch_memory_cap(bytes);
848 configured_batch_floor
849 .max(fetch_target)
850 .max(1)
851 .min(memory_cap)
852 }
853 None => configured_batch_floor.max(1),
854 };
855
856 requested.min(total_chunks.max(1)).max(1)
857}
858
859/// Whether the data map is published to the network for address-based retrieval.
860///
861/// A private upload stores only the data chunks and returns the `DataMap` to
862/// the caller — only someone holding that `DataMap` can reconstruct the file.
863/// A public upload additionally stores the serialized `DataMap` as a chunk on
864/// the network, yielding a single chunk address that anyone can use to
865/// retrieve the `DataMap` (via [`Client::data_map_fetch`]) and then the file.
866#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
867pub enum Visibility {
868 /// Keep the data map local; only the holder can retrieve the file.
869 #[default]
870 Private,
871 /// Publish the data map as a network chunk so anyone with the returned
872 /// address can retrieve and decrypt the file.
873 Public,
874}
875
876/// Confidence attached to an [`UploadCostEstimate`]'s `storage_cost_atto`.
877///
878/// `estimate_upload_cost` prices a file by sampling a few of its chunk
879/// addresses and extrapolating. When every sampled chunk is already stored
880/// there is no live price to extrapolate from, so a `"0"` cost can mean either
881/// "provably free" (the whole file was sampled) or only "probably free" (the
882/// tail was unsampled). This lets callers tell those apart instead of treating
883/// every `"0"` as unconditionally free.
884#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, serde::Serialize, serde::Deserialize)]
885#[serde(rename_all = "snake_case")]
886pub enum CostEstimateConfidence {
887 /// At least one sampled chunk returned a live quote; `storage_cost_atto`
888 /// is extrapolated from a real per-chunk price. The normal case.
889 #[default]
890 PricedSample,
891 /// Every chunk in the file was sampled and every one was already stored.
892 /// `storage_cost_atto` is exactly `"0"` — the upload is genuinely free.
893 VerifiedAllAlreadyStored,
894 /// Every *sampled* chunk was already stored, but not all chunks were
895 /// sampled. `storage_cost_atto` is `"0"` as a best-effort guess; the real
896 /// upload reconciles the true cost at payment time. Render this as "likely
897 /// already stored", not a guaranteed-free price.
898 AllSamplesAlreadyStoredIncomplete,
899}
900
901/// Estimated cost of uploading a file, returned by
902/// [`Client::estimate_upload_cost`].
903///
904/// Marked `#[non_exhaustive]` so adding a field later is not a breaking change
905/// for downstream consumers that construct or pattern-match on this struct.
906#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
907#[non_exhaustive]
908pub struct UploadCostEstimate {
909 /// Original file size in bytes.
910 pub file_size: u64,
911 /// Number of chunks the file would be split into (data chunks only,
912 /// does not include the DataMap chunk added during public uploads).
913 pub chunk_count: usize,
914 /// Estimated total storage cost in atto (token smallest unit).
915 pub storage_cost_atto: String,
916 /// Estimated gas cost in wei as a string. This is a rough heuristic
917 /// based on chunk count and payment mode, NOT a live gas price query.
918 pub estimated_gas_cost_wei: String,
919 /// Payment mode that would be used.
920 pub payment_mode: PaymentMode,
921 /// How much to trust `storage_cost_atto`. See [`CostEstimateConfidence`].
922 #[serde(default)]
923 pub confidence: CostEstimateConfidence,
924}
925
926/// Result of a file upload: the `DataMap` needed to retrieve the file.
927///
928/// Marked `#[non_exhaustive]` so adding a new field in future is not a
929/// breaking change for downstream consumers that construct or pattern-match
930/// on this struct.
931#[derive(Debug, Clone)]
932#[non_exhaustive]
933pub struct FileUploadResult {
934 /// The data map containing chunk metadata for reconstruction.
935 pub data_map: DataMap,
936 /// Number of chunks stored on the network.
937 pub chunks_stored: usize,
938 /// Number of chunks that failed to store. Always 0 for a successful
939 /// upload — partial-failure information is conveyed via
940 /// [`crate::data::Error::PartialUpload`] instead.
941 pub chunks_failed: usize,
942 /// Total number of chunks in the upload, including chunks that were
943 /// already stored and skipped. On full success this equals `chunks_stored`.
944 pub total_chunks: usize,
945 /// Which payment mode was actually used (not just requested).
946 pub payment_mode_used: PaymentMode,
947 /// Total storage cost paid in token units (atto). "0" if all chunks already existed.
948 pub storage_cost_atto: String,
949 /// Total gas cost in wei. 0 if no on-chain transactions were made.
950 pub gas_cost_wei: u128,
951 /// Chunk address of the serialized `DataMap`, set only for
952 /// [`Visibility::Public`] uploads. **`Some` means this address is
953 /// retrievable from the network (via [`Client::data_map_fetch`])**, not
954 /// necessarily that *this* upload paid to store it — if the serialized
955 /// `DataMap` hashed to a chunk that was already on the network (same
956 /// file uploaded before; deterministic via self-encryption), the address
957 /// is still returned but no storage payment was made for it.
958 pub data_map_address: Option<[u8; 32]>,
959 /// Sum of chunk-store RPC attempts across the upload
960 /// (`>= chunks_stored` on full success; more if any chunk retried).
961 /// `0` for paths that don't run the wave store loop.
962 pub chunk_attempts_total: usize,
963 /// Per-chunk store wall-clock in ms (length == `chunks_stored` on full
964 /// success, empty for paths that don't run the wave store loop).
965 pub store_durations_ms: Vec<u64>,
966 /// Count of stored chunks that succeeded on each retry round
967 /// (index 0 = first attempt, 1 = first retry, etc.). All zeros for
968 /// paths that don't run the wave store loop.
969 pub retries_histogram: [usize; 4],
970}
971
972/// Payment information for external signing — either wave-batch or merkle.
973#[derive(Debug)]
974pub enum ExternalPaymentInfo {
975 /// Wave-batch: individual (quote_hash, rewards_address, amount) tuples.
976 WaveBatch {
977 /// Chunks ready for payment (needed for finalize).
978 prepared_chunks: Vec<PreparedChunk>,
979 /// Payment intent for external signing.
980 payment_intent: PaymentIntent,
981 },
982 /// Merkle: single on-chain call with depth, pool commitments, timestamp.
983 Merkle {
984 /// The prepared merkle batch (public fields sent to frontend, private fields stay in Rust).
985 prepared_batch: PreparedMerkleBatch,
986 /// Raw chunk contents that still need upload after the preflight check.
987 chunk_contents: Vec<Bytes>,
988 /// Chunk addresses that still need upload after the preflight check.
989 chunk_addresses: Vec<[u8; 32]>,
990 },
991}
992
993/// Prepared upload ready for external payment.
994///
995/// Contains everything needed to construct the on-chain payment transaction
996/// externally (e.g. via WalletConnect in a desktop app) and then finalize
997/// the upload without a Rust-side wallet.
998///
999/// Note: This struct stays in Rust memory — only the public fields of
1000/// `payment_info` are sent to the frontend. `PreparedChunk` contains
1001/// non-serializable network types, so the full struct cannot derive `Serialize`.
1002///
1003/// Marked `#[non_exhaustive]` so adding a new field in future is not a
1004/// breaking change for downstream consumers.
1005#[derive(Debug)]
1006#[non_exhaustive]
1007pub struct PreparedUpload {
1008 /// The data map for later retrieval.
1009 pub data_map: DataMap,
1010 /// Payment information for chunks that still need payment after the
1011 /// already-stored preflight. This may be wave-batch even when the original
1012 /// chunk count was merkle-eligible if the remaining count is below the
1013 /// merkle threshold.
1014 pub payment_info: ExternalPaymentInfo,
1015 /// Chunk address of the serialized `DataMap` when this upload was
1016 /// prepared with [`Visibility::Public`]. `Some` means the address is
1017 /// retrievable on the network after finalization — either because this
1018 /// upload paid to store the chunk in `payment_info`, or because the
1019 /// chunk was already on the network (deterministic self-encryption).
1020 /// Carried through to [`FileUploadResult::data_map_address`].
1021 pub data_map_address: Option<[u8; 32]>,
1022 /// Chunk addresses already present on the network when this upload was
1023 /// prepared. These do not require payment or PUT during finalization.
1024 pub already_stored_addresses: Vec<[u8; 32]>,
1025 /// Total chunk count for the upload, including already-stored chunks.
1026 pub total_chunks: usize,
1027}
1028
1029/// Return type for [`spawn_file_encryption`]: chunk receiver, `DataMap` oneshot, join handle.
1030type EncryptionChannels = (
1031 tokio::sync::mpsc::Receiver<Bytes>,
1032 tokio::sync::oneshot::Receiver<DataMap>,
1033 tokio::task::JoinHandle<Result<()>>,
1034);
1035
1036/// Spawn a blocking task that streams file encryption through a channel.
1037fn spawn_file_encryption(path: PathBuf) -> Result<EncryptionChannels> {
1038 let metadata = std::fs::metadata(&path)?;
1039 let data_size = usize::try_from(metadata.len())
1040 .map_err(|e| Error::Encryption(format!("file size exceeds platform usize: {e}")))?;
1041
1042 let (chunk_tx, chunk_rx) = tokio::sync::mpsc::channel(2);
1043 let (datamap_tx, datamap_rx) = tokio::sync::oneshot::channel();
1044
1045 let handle = tokio::task::spawn_blocking(move || {
1046 let file = std::fs::File::open(&path)?;
1047 let mut reader = std::io::BufReader::new(file);
1048
1049 let read_error: Arc<Mutex<Option<std::io::Error>>> = Arc::new(Mutex::new(None));
1050 let read_error_clone = Arc::clone(&read_error);
1051
1052 let data_iter = std::iter::from_fn(move || {
1053 let mut buffer = vec![0u8; 8192];
1054 match std::io::Read::read(&mut reader, &mut buffer) {
1055 Ok(0) => None,
1056 Ok(n) => {
1057 buffer.truncate(n);
1058 Some(Bytes::from(buffer))
1059 }
1060 Err(e) => {
1061 let mut guard = read_error_clone
1062 .lock()
1063 .unwrap_or_else(|poisoned| poisoned.into_inner());
1064 *guard = Some(e);
1065 None
1066 }
1067 }
1068 });
1069
1070 let mut stream = stream_encrypt(data_size, data_iter)
1071 .map_err(|e| Error::Encryption(format!("stream_encrypt failed: {e}")))?;
1072
1073 for chunk_result in stream.chunks() {
1074 // Check for captured read errors immediately after each chunk.
1075 // stream_encrypt sees None (EOF) when a read fails, so it stops
1076 // producing chunks. We must detect this before sending the
1077 // partial results to avoid uploading a truncated DataMap.
1078 {
1079 let guard = read_error
1080 .lock()
1081 .unwrap_or_else(|poisoned| poisoned.into_inner());
1082 if let Some(ref e) = *guard {
1083 return Err(Error::Io(std::io::Error::new(e.kind(), e.to_string())));
1084 }
1085 }
1086
1087 let (_hash, content) = chunk_result
1088 .map_err(|e| Error::Encryption(format!("chunk encryption failed: {e}")))?;
1089 if chunk_tx.blocking_send(content).is_err() {
1090 return Err(Error::Encryption("upload receiver dropped".to_string()));
1091 }
1092 }
1093
1094 // Final check: read error after last chunk (stream saw EOF).
1095 {
1096 let guard = read_error
1097 .lock()
1098 .unwrap_or_else(|poisoned| poisoned.into_inner());
1099 if let Some(ref e) = *guard {
1100 return Err(Error::Io(std::io::Error::new(e.kind(), e.to_string())));
1101 }
1102 }
1103
1104 let datamap = stream
1105 .into_datamap()
1106 .ok_or_else(|| Error::Encryption("no DataMap after encryption".to_string()))?;
1107 if datamap_tx.send(datamap).is_err() {
1108 warn!("DataMap receiver dropped — upload may have been cancelled");
1109 }
1110 Ok(())
1111 });
1112
1113 Ok((chunk_rx, datamap_rx, handle))
1114}
1115
1116/// RAII guard for the staging temp file used during a disk download.
1117///
1118/// Removes the file on drop — including a panic unwind out of the
1119/// `block_in_place` decrypt loop — unless [`commit`](Self::commit) has
1120/// promoted it to its final path. Centralizes the cleanup the explicit error
1121/// arms used to repeat.
1122struct TempDownload {
1123 /// `Some` while the staging file may need cleanup; `None` once committed.
1124 path: Option<PathBuf>,
1125}
1126
1127impl TempDownload {
1128 fn new(path: PathBuf) -> Self {
1129 Self { path: Some(path) }
1130 }
1131
1132 /// Path of the staging file (valid until `commit`).
1133 fn path(&self) -> &Path {
1134 self.path
1135 .as_deref()
1136 .expect("TempDownload::path called after commit")
1137 }
1138
1139 /// Rename the staged file to `dest`. On success the guard is defused so
1140 /// `Drop` is a no-op; on failure the guard stays armed and `Drop` removes
1141 /// the orphaned temp file.
1142 fn commit(mut self, dest: &Path) -> std::io::Result<()> {
1143 std::fs::rename(self.path(), dest)?; // err → guard armed → Drop cleans up
1144 self.path = None; // success → nothing left to clean
1145 Ok(())
1146 }
1147}
1148
1149impl Drop for TempDownload {
1150 fn drop(&mut self) {
1151 if let Some(path) = self.path.take() {
1152 if let Err(e) = std::fs::remove_file(&path) {
1153 // Absent file is fine (never created / already gone).
1154 if e.kind() != std::io::ErrorKind::NotFound {
1155 warn!(
1156 "Failed to remove temp download file {}: {e}",
1157 path.display()
1158 );
1159 }
1160 }
1161 }
1162 }
1163}
1164
1165impl Client {
1166 /// Upload a file to the network using streaming self-encryption.
1167 ///
1168 /// Automatically selects merkle batch payment for files that produce
1169 /// 64+ chunks (saves gas). Encrypted chunks are spilled to a temp
1170 /// directory so peak memory stays at ~256 MB regardless of file size.
1171 ///
1172 /// # Errors
1173 ///
1174 /// Returns an error if the file cannot be read, encryption fails,
1175 /// or any chunk cannot be stored.
1176 pub async fn file_upload(&self, path: &Path) -> Result<FileUploadResult> {
1177 self.file_upload_with_mode(path, PaymentMode::Auto).await
1178 }
1179
1180 /// Estimate the cost of uploading a file without actually uploading.
1181 ///
1182 /// Encrypts the file to determine chunk count and sizes, then requests
1183 /// a single quote from the network for a representative chunk. The
1184 /// per-chunk price is extrapolated to the total chunk count.
1185 ///
1186 /// The estimate is fast (~2-5s) and does not require a wallet. Spilled
1187 /// chunks are cleaned up automatically when the function returns.
1188 ///
1189 /// Gas cost is an advisory heuristic, not a live gas-oracle query. It is
1190 /// derived from realistic per-transaction budgets (`GAS_PER_WAVE_TX`,
1191 /// `GAS_PER_MERKLE_TX`) priced at `ARBITRUM_GAS_PRICE_WEI`. Real gas
1192 /// varies with network conditions.
1193 ///
1194 /// Sampled chunk addresses are spread across the whole file (not the first
1195 /// N) so a shared leading prefix doesn't bias the sample. When a sample
1196 /// returns a live quote the per-chunk price is extrapolated and the result
1197 /// is tagged [`CostEstimateConfidence::PricedSample`].
1198 ///
1199 /// When every sampled chunk is already stored the result is still `Ok`
1200 /// with `storage_cost_atto: "0"`, tagged either
1201 /// [`CostEstimateConfidence::VerifiedAllAlreadyStored`] when the whole file
1202 /// was sampled (exactly free) or
1203 /// [`CostEstimateConfidence::AllSamplesAlreadyStoredIncomplete`] when the
1204 /// tail was unsampled (a best-effort guess that payment reconciles).
1205 ///
1206 /// # Errors
1207 ///
1208 /// Returns an error if the file cannot be read, encryption fails, or the
1209 /// network cannot provide a quote.
1210 pub async fn estimate_upload_cost(
1211 &self,
1212 path: &Path,
1213 mode: PaymentMode,
1214 progress: Option<mpsc::Sender<UploadEvent>>,
1215 ) -> Result<UploadCostEstimate> {
1216 let file_size = std::fs::metadata(path).map_err(Error::Io)?.len();
1217
1218 if file_size < 3 {
1219 return Err(Error::InvalidData(
1220 "File too small: self-encryption requires at least 3 bytes".into(),
1221 ));
1222 }
1223
1224 check_disk_space_for_spill(file_size)?;
1225
1226 info!(
1227 "Estimating upload cost for {} ({file_size} bytes)",
1228 path.display()
1229 );
1230
1231 let (spill, _data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
1232 let chunk_count = spill.len();
1233
1234 if let Some(ref tx) = progress {
1235 let _ = tx
1236 .send(UploadEvent::Encrypted {
1237 total_chunks: chunk_count,
1238 })
1239 .await;
1240 }
1241
1242 info!("Encrypted into {chunk_count} chunks, requesting quote");
1243 let uses_merkle = should_use_merkle(chunk_count, mode);
1244
1245 // Sample chunk addresses spread evenly across the file (see
1246 // `distributed_sample_indices`) rather than the first N. A single
1247 // AlreadyStored result says nothing about the rest of the file, and a
1248 // positional sample lands on a shared leading prefix in the worst case,
1249 // so we spread the probe and only treat the whole file as "fully
1250 // stored" when every sample comes back stored.
1251 let sample_indices = distributed_sample_indices(spill.addresses.len(), ESTIMATE_SAMPLE_CAP);
1252 let mut sampled = 0usize;
1253 let mut all_already_stored = true;
1254 let mut quotes_opt: Option<Vec<QuoteEntry>> = None;
1255
1256 for &idx in &sample_indices {
1257 let addr = &spill.addresses[idx];
1258 sampled += 1;
1259 let chunk_bytes = spill.read_chunk(addr)?;
1260 let data_size = u64::try_from(chunk_bytes.len())
1261 .map_err(|e| Error::InvalidData(format!("chunk size too large: {e}")))?;
1262 let result = if uses_merkle {
1263 self.get_store_quotes_with_fault_tolerance(addr, data_size, DATA_TYPE_CHUNK)
1264 .await
1265 } else {
1266 self.get_store_quotes(addr, data_size, DATA_TYPE_CHUNK)
1267 .await
1268 };
1269 match result {
1270 Ok(q) => {
1271 quotes_opt = Some(q);
1272 all_already_stored = false;
1273 break;
1274 }
1275 Err(Error::AlreadyStored) => {
1276 debug!(
1277 "Sample chunk {} already stored; trying next address ({sampled}/{})",
1278 hex::encode(addr),
1279 sample_indices.len()
1280 );
1281 continue;
1282 }
1283 Err(e) => return Err(e),
1284 }
1285 }
1286
1287 let quotes = match quotes_opt {
1288 Some(q) => q,
1289 None if all_already_stored && sampled == chunk_count => {
1290 // Every address in the file was sampled and every one is
1291 // already on the network — a zero-cost estimate is exact here.
1292 info!("All {chunk_count} chunks already stored; returning zero-cost estimate");
1293 return Ok(UploadCostEstimate {
1294 file_size,
1295 chunk_count,
1296 storage_cost_atto: "0".into(),
1297 estimated_gas_cost_wei: "0".into(),
1298 payment_mode: if uses_merkle {
1299 PaymentMode::Merkle
1300 } else {
1301 PaymentMode::Single
1302 },
1303 confidence: CostEstimateConfidence::VerifiedAllAlreadyStored,
1304 });
1305 }
1306 None => {
1307 // Every sampled chunk was already stored but the tail was not
1308 // sampled, so there is no live price to extrapolate. The
1309 // estimate is display-only and payment reconciles the true
1310 // cost, so return an optimistic zero flagged as incomplete
1311 // rather than erroring — callers still get a value to show.
1312 info!(
1313 "All {sampled}/{chunk_count} sampled chunks already stored; \
1314 returning incomplete zero-cost estimate"
1315 );
1316 return Ok(UploadCostEstimate {
1317 file_size,
1318 chunk_count,
1319 storage_cost_atto: "0".into(),
1320 estimated_gas_cost_wei: "0".into(),
1321 payment_mode: if uses_merkle {
1322 PaymentMode::Merkle
1323 } else {
1324 PaymentMode::Single
1325 },
1326 confidence: CostEstimateConfidence::AllSamplesAlreadyStoredIncomplete,
1327 });
1328 }
1329 };
1330
1331 // Use the median price × 3 (matches SingleNodePayment::from_quotes
1332 // which pays 3x the median to incentivize reliable storage).
1333 let mut prices: Vec<Amount> = quotes.iter().map(|(_, _, _, price)| *price).collect();
1334 prices.sort();
1335 let median_price = prices
1336 .get(prices.len() / 2)
1337 .copied()
1338 .unwrap_or(Amount::ZERO);
1339 let per_chunk_cost = median_price * Amount::from(3u64);
1340
1341 let chunk_count_u64 = u64::try_from(chunk_count).unwrap_or(u64::MAX);
1342 let total_storage = per_chunk_cost * Amount::from(chunk_count_u64);
1343
1344 // Estimate gas cost from realistic per-transaction budgets rather
1345 // than a flat per-chunk or per-wave number.
1346 //
1347 // - Single mode: `batch_pay` packs up to UPLOAD_WAVE_SIZE chunks'
1348 // close-group quotes into one `pay_for_quotes` call on Arbitrum.
1349 // The dominant cost is one SSTORE per entry plus base tx overhead,
1350 // so we use GAS_PER_WAVE_TX (≈1.5M) as a conservative upper bound
1351 // on a full wave and multiply by the number of waves. The previous
1352 // per-wave figure of 150k was closer to a single-entry transfer
1353 // and understated cost by 5–10x for full waves.
1354 // - Merkle mode: one tx per sub-batch that verifies a merkle tree
1355 // and posts a pool commitment (GAS_PER_MERKLE_TX ≈ 500k each).
1356 //
1357 // Gas is priced at ARBITRUM_GAS_PRICE_WEI (~0.1 gwei, a typical
1358 // Arbitrum baseline). Treat the result as advisory, not a commitment.
1359 let waves = u128::try_from(chunk_count.div_ceil(UPLOAD_WAVE_SIZE)).unwrap_or(u128::MAX);
1360 let merkle_batches = u128::try_from(chunk_count.div_ceil(MAX_LEAVES)).unwrap_or(u128::MAX);
1361 let estimated_gas: u128 = if uses_merkle {
1362 merkle_batches
1363 .saturating_mul(GAS_PER_MERKLE_TX)
1364 .saturating_mul(ARBITRUM_GAS_PRICE_WEI)
1365 } else {
1366 waves
1367 .saturating_mul(GAS_PER_WAVE_TX)
1368 .saturating_mul(ARBITRUM_GAS_PRICE_WEI)
1369 };
1370
1371 info!(
1372 "Estimate: {chunk_count} chunks, storage={total_storage} atto, gas~={estimated_gas} wei"
1373 );
1374
1375 Ok(UploadCostEstimate {
1376 file_size,
1377 chunk_count,
1378 storage_cost_atto: total_storage.to_string(),
1379 estimated_gas_cost_wei: estimated_gas.to_string(),
1380 payment_mode: if uses_merkle {
1381 PaymentMode::Merkle
1382 } else {
1383 PaymentMode::Single
1384 },
1385 confidence: CostEstimateConfidence::PricedSample,
1386 })
1387 }
1388
1389 /// Phase 1 of external-signer upload: encrypt file and prepare chunks.
1390 ///
1391 /// Equivalent to [`Client::file_prepare_upload_with_visibility`] with
1392 /// [`Visibility::Private`] — see that method for details.
1393 pub async fn file_prepare_upload(&self, path: &Path) -> Result<PreparedUpload> {
1394 self.file_prepare_upload_with_progress(path, Visibility::Private, None)
1395 .await
1396 }
1397
1398 /// Phase 1 of external-signer upload with explicit [`Visibility`] control.
1399 ///
1400 /// Equivalent to [`Client::file_prepare_upload_with_progress`] with
1401 /// `progress: None` — see that method for details.
1402 pub async fn file_prepare_upload_with_visibility(
1403 &self,
1404 path: &Path,
1405 visibility: Visibility,
1406 ) -> Result<PreparedUpload> {
1407 self.file_prepare_upload_with_progress(path, visibility, None)
1408 .await
1409 }
1410
1411 /// Phase 1 of external-signer upload with progress events.
1412 ///
1413 /// Requires an EVM network (for contract price queries) but NOT a wallet.
1414 /// Returns a [`PreparedUpload`] containing the data map, prepared chunks,
1415 /// and a [`PaymentIntent`] that the external signer uses to construct
1416 /// and submit the on-chain payment transaction.
1417 ///
1418 /// When `visibility` is [`Visibility::Public`], the serialized `DataMap`
1419 /// is bundled into the payment batch as an additional chunk and its
1420 /// address is recorded on the returned [`PreparedUpload`]. After
1421 /// [`Client::finalize_upload`] (or `_merkle`) succeeds, that address is
1422 /// surfaced via [`FileUploadResult::data_map_address`] so the uploader
1423 /// can share a single address from which anyone can retrieve the file.
1424 ///
1425 /// When `progress` is `Some`, [`UploadEvent`]s are emitted on the channel
1426 /// during encryption ([`UploadEvent::Encrypting`] / [`UploadEvent::Encrypted`])
1427 /// and per-chunk quoting ([`UploadEvent::ChunkQuoted`]). Storage events are
1428 /// emitted later by [`Client::finalize_upload_with_progress`] /
1429 /// [`Client::finalize_upload_merkle_with_progress`].
1430 ///
1431 /// **Memory note:** Encryption uses disk spilling for bounded memory, but
1432 /// the returned [`PreparedUpload`] holds all chunk content in memory (each
1433 /// [`PreparedChunk`] contains a `Bytes` with the full chunk data). This is
1434 /// inherent to the two-phase external-signer protocol — the chunks must
1435 /// stay in memory until [`Client::finalize_upload`] stores them. For very
1436 /// large files, prefer [`Client::file_upload`] which streams directly.
1437 ///
1438 /// # Errors
1439 ///
1440 /// Returns an error if there is insufficient disk space, the file cannot
1441 /// be read, encryption fails, or quote collection fails.
1442 pub async fn file_prepare_upload_with_progress(
1443 &self,
1444 path: &Path,
1445 visibility: Visibility,
1446 progress: Option<mpsc::Sender<UploadEvent>>,
1447 ) -> Result<PreparedUpload> {
1448 debug!(
1449 "Preparing file upload for external signing (visibility={visibility:?}): {}",
1450 path.display()
1451 );
1452
1453 let file_size = std::fs::metadata(path)?.len();
1454 check_disk_space_for_spill(file_size)?;
1455
1456 let (spill, data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
1457
1458 info!(
1459 "Encrypted {} into {} chunks for external signing (spilled to disk)",
1460 path.display(),
1461 spill.len()
1462 );
1463
1464 // Read each chunk from disk and collect quotes concurrently.
1465 // Note: all PreparedChunks accumulate in memory because the external-signer
1466 // protocol requires them for finalize_upload. NOT memory-bounded for large files.
1467 let mut chunk_data: Vec<Bytes> = spill
1468 .addresses
1469 .iter()
1470 .map(|addr| spill.read_chunk(addr))
1471 .collect::<std::result::Result<Vec<_>, _>>()?;
1472
1473 // For public uploads, bundle the serialized DataMap as an extra chunk
1474 // in the same payment batch. This lets the external signer pay for
1475 // the data chunks and the DataMap chunk in one flow, and lets the
1476 // finalize step return the DataMap's chunk address as the shareable
1477 // retrieval address.
1478 let data_map_address = match visibility {
1479 Visibility::Private => None,
1480 Visibility::Public => {
1481 let serialized = rmp_serde::to_vec(&data_map).map_err(|e| {
1482 Error::Serialization(format!("Failed to serialize DataMap: {e}"))
1483 })?;
1484 let bytes = Bytes::from(serialized);
1485 let address = compute_address(&bytes);
1486 info!(
1487 "Public upload: bundling DataMap chunk ({} bytes) at address {}",
1488 bytes.len(),
1489 hex::encode(address)
1490 );
1491 chunk_data.push(bytes);
1492 Some(address)
1493 }
1494 };
1495
1496 let chunk_count = chunk_data.len();
1497
1498 if let Some(ref tx) = progress {
1499 let _ = tx
1500 .send(UploadEvent::Encrypted {
1501 total_chunks: chunk_count,
1502 })
1503 .await;
1504 }
1505
1506 let (payment_info, already_stored_addresses) = if should_use_merkle(
1507 chunk_count,
1508 PaymentMode::Auto,
1509 ) {
1510 // Merkle path: build tree, collect candidate pools, return for external payment.
1511 info!("Using merkle batch preparation for {chunk_count} file chunks");
1512
1513 let chunk_entries: Vec<([u8; 32], u64)> = chunk_data
1514 .iter()
1515 .map(|chunk| {
1516 let size = u64::try_from(chunk.len())
1517 .map_err(|e| Error::InvalidData(format!("chunk size too large: {e}")))?;
1518 Ok((compute_address(chunk), size))
1519 })
1520 .collect::<Result<Vec<_>>>()?;
1521
1522 let merkle_plan = self
1523 .plan_merkle_upload(chunk_entries, DATA_TYPE_CHUNK, progress.as_ref())
1524 .await?;
1525
1526 if merkle_plan.to_upload.is_empty() {
1527 info!("All {chunk_count} file chunks already stored; no external payment needed");
1528 (
1529 ExternalPaymentInfo::WaveBatch {
1530 prepared_chunks: Vec::new(),
1531 payment_intent: PaymentIntent::from_prepared_chunks(&[]),
1532 },
1533 merkle_plan.already_stored,
1534 )
1535 } else {
1536 let chunk_data =
1537 chunk_contents_for_upload_addresses(chunk_data, &merkle_plan.to_upload)?;
1538
1539 if !should_use_merkle(merkle_plan.to_upload.len(), PaymentMode::Auto) {
1540 info!(
1541 "{} file chunks need upload after merkle preflight; preparing wave-batch payment",
1542 merkle_plan.to_upload.len()
1543 );
1544 let (payment_info, mut wave_already_stored) = self
1545 .prepare_wave_batch_external_chunks(
1546 chunk_data,
1547 progress.as_ref(),
1548 chunk_count,
1549 )
1550 .await?;
1551 let mut already_stored = merkle_plan.already_stored;
1552 already_stored.append(&mut wave_already_stored);
1553 (payment_info, already_stored)
1554 } else {
1555 match self
1556 .prepare_merkle_batch_external(
1557 &merkle_plan.to_upload,
1558 DATA_TYPE_CHUNK,
1559 merkle_plan.to_upload_avg_size(),
1560 )
1561 .await
1562 {
1563 Ok(prepared_batch) => {
1564 info!(
1565 "File prepared for external merkle signing: {} chunks, depth={} ({})",
1566 merkle_plan.to_upload.len(),
1567 prepared_batch.depth,
1568 path.display()
1569 );
1570
1571 (
1572 ExternalPaymentInfo::Merkle {
1573 prepared_batch,
1574 chunk_contents: chunk_data,
1575 chunk_addresses: merkle_plan.to_upload,
1576 },
1577 merkle_plan.already_stored,
1578 )
1579 }
1580 Err(Error::InsufficientPeers(ref msg)) => {
1581 info!(
1582 "External merkle preparation needs more peers ({msg}); preparing wave-batch payment"
1583 );
1584 let (payment_info, mut wave_already_stored) = self
1585 .prepare_wave_batch_external_chunks(
1586 chunk_data,
1587 progress.as_ref(),
1588 chunk_count,
1589 )
1590 .await?;
1591 let mut already_stored = merkle_plan.already_stored;
1592 already_stored.append(&mut wave_already_stored);
1593 (payment_info, already_stored)
1594 }
1595 Err(e) => return Err(e),
1596 }
1597 }
1598 }
1599 } else {
1600 self.prepare_wave_batch_external_chunks(chunk_data, progress.as_ref(), chunk_count)
1601 .await?
1602 };
1603
1604 // Surface the "DataMap chunk was already on the network" case
1605 // so debugging "why is data_map_address set but no storage cost
1606 // appears for it?" doesn't require reading the source. See the
1607 // `data_map_address` doc comment for why this is still a valid
1608 // `Some(addr)` outcome.
1609 if let Some(addr) = data_map_address {
1610 let data_map_needs_payment = match &payment_info {
1611 ExternalPaymentInfo::WaveBatch {
1612 prepared_chunks, ..
1613 } => prepared_chunks.iter().any(|c| c.address == addr),
1614 ExternalPaymentInfo::Merkle {
1615 chunk_addresses, ..
1616 } => chunk_addresses.contains(&addr),
1617 };
1618 if !data_map_needs_payment {
1619 info!(
1620 "Public upload: DataMap chunk {} was already stored \
1621 on the network — address is retrievable without a \
1622 new payment",
1623 hex::encode(addr)
1624 );
1625 }
1626 }
1627
1628 Ok(PreparedUpload {
1629 data_map,
1630 payment_info,
1631 data_map_address,
1632 already_stored_addresses,
1633 total_chunks: chunk_count,
1634 })
1635 }
1636
1637 async fn prepare_wave_batch_external_chunks(
1638 &self,
1639 chunk_data: Vec<Bytes>,
1640 progress: Option<&mpsc::Sender<UploadEvent>>,
1641 progress_total: usize,
1642 ) -> Result<(ExternalPaymentInfo, Vec<[u8; 32]>)> {
1643 let chunk_count = chunk_data.len();
1644 let chunks_with_addr: Vec<(Bytes, [u8; 32])> = chunk_data
1645 .into_iter()
1646 .map(|content| {
1647 let address = compute_address(&content);
1648 (content, address)
1649 })
1650 .collect();
1651
1652 // Wave-batch path: collect quotes per chunk concurrently, emitting
1653 // a `ChunkQuoted` event after each completion so callers can drive
1654 // a progress bar through the slow quote phase.
1655 let quote_limiter = self.controller().quote.clone();
1656 let quote_concurrency = quote_limiter.current().min(chunk_count.max(1));
1657 let mut quote_stream = stream::iter(chunks_with_addr)
1658 .map(|(content, address)| {
1659 let limiter = quote_limiter.clone();
1660 async move {
1661 let result = observe_op(
1662 &limiter,
1663 || async move { self.prepare_chunk_payment(content).await },
1664 classify_error,
1665 )
1666 .await;
1667 (address, result)
1668 }
1669 })
1670 .buffer_unordered(quote_concurrency);
1671
1672 let mut prepared_chunks = Vec::with_capacity(chunk_count);
1673 let mut already_stored = Vec::new();
1674 let mut quoted = 0usize;
1675 while let Some((address, result)) = quote_stream.next().await {
1676 match result? {
1677 Some(prepared) => prepared_chunks.push(prepared),
1678 None => already_stored.push(address),
1679 }
1680 quoted += 1;
1681 if let Some(tx) = progress {
1682 let _ = tx.try_send(UploadEvent::ChunkQuoted {
1683 quoted,
1684 total: progress_total,
1685 });
1686 }
1687 }
1688
1689 let payment_intent = PaymentIntent::from_prepared_chunks(&prepared_chunks);
1690 info!(
1691 "Prepared external wave-batch payment: {} chunks, {} already stored, total {} atto",
1692 prepared_chunks.len(),
1693 already_stored.len(),
1694 payment_intent.total_amount,
1695 );
1696
1697 Ok((
1698 ExternalPaymentInfo::WaveBatch {
1699 prepared_chunks,
1700 payment_intent,
1701 },
1702 already_stored,
1703 ))
1704 }
1705
1706 /// Phase 2 of external-signer upload (wave-batch): finalize with externally-signed tx hashes.
1707 ///
1708 /// Takes a [`PreparedUpload`] that used wave-batch payment and a map
1709 /// of `quote_hash -> tx_hash` provided by the external signer after on-chain
1710 /// payment. Builds payment proofs and stores chunks on the network.
1711 ///
1712 /// # Errors
1713 ///
1714 /// Returns an error if the prepared upload used merkle payment (use
1715 /// [`Client::finalize_upload_merkle`] instead), proof construction fails,
1716 /// or any chunk cannot be stored.
1717 pub async fn finalize_upload(
1718 &self,
1719 prepared: PreparedUpload,
1720 tx_hash_map: &HashMap<QuoteHash, TxHash>,
1721 ) -> Result<FileUploadResult> {
1722 self.finalize_upload_with_progress(prepared, tx_hash_map, None)
1723 .await
1724 }
1725
1726 /// Phase 2 of external-signer upload (wave-batch) with progress events.
1727 ///
1728 /// Same as [`Client::finalize_upload`] but emits [`UploadEvent::ChunkStored`]
1729 /// on the provided channel as each chunk is successfully stored.
1730 ///
1731 /// # Errors
1732 ///
1733 /// Same as [`Client::finalize_upload`].
1734 pub async fn finalize_upload_with_progress(
1735 &self,
1736 prepared: PreparedUpload,
1737 tx_hash_map: &HashMap<QuoteHash, TxHash>,
1738 progress: Option<mpsc::Sender<UploadEvent>>,
1739 ) -> Result<FileUploadResult> {
1740 let data_map_address = prepared.data_map_address;
1741 let already_stored_addresses = prepared.already_stored_addresses;
1742 let already_stored_count = already_stored_addresses.len();
1743 let total_chunks = prepared.total_chunks;
1744 match prepared.payment_info {
1745 ExternalPaymentInfo::WaveBatch {
1746 prepared_chunks,
1747 payment_intent,
1748 } => {
1749 let paid_chunks = finalize_batch_payment(prepared_chunks, tx_hash_map)?;
1750 let wave_result = self
1751 .store_paid_chunks_with_events(
1752 paid_chunks,
1753 progress.as_ref(),
1754 already_stored_count,
1755 total_chunks,
1756 )
1757 .await;
1758 if !wave_result.failed.is_empty() {
1759 let failed_count = wave_result.failed.len();
1760 let stored_count = already_stored_count + wave_result.stored.len();
1761 let mut stored = already_stored_addresses;
1762 stored.extend(wave_result.stored);
1763 return Err(Error::PartialUpload {
1764 stored,
1765 stored_count,
1766 failed: wave_result.failed,
1767 failed_count,
1768 total_chunks,
1769 // Report the storage spend known from the payment intent
1770 // the external signer was handed. Gas is paid by the
1771 // signer out-of-band, so it stays unknown (0).
1772 spend: Box::new(PartialUploadSpend {
1773 storage_cost_atto: payment_intent.total_amount.to_string(),
1774 gas_cost_wei: 0,
1775 }),
1776 reason: "finalize_upload: chunk storage failed after retries".into(),
1777 });
1778 }
1779 let chunks_stored = already_stored_count + wave_result.stored.len();
1780
1781 info!("External-signer upload finalized: {chunks_stored} chunks stored");
1782
1783 let mut stats = WaveAggregateStats::default();
1784 stats.absorb(&wave_result);
1785
1786 Ok(FileUploadResult {
1787 data_map: prepared.data_map,
1788 chunks_stored,
1789 chunks_failed: 0,
1790 total_chunks,
1791 payment_mode_used: PaymentMode::Single,
1792 // Storage spend is known from the payment intent; gas is
1793 // paid by the external signer out-of-band (unknown here).
1794 storage_cost_atto: payment_intent.total_amount.to_string(),
1795 gas_cost_wei: 0,
1796 data_map_address,
1797 chunk_attempts_total: stats.chunk_attempts_total,
1798 store_durations_ms: stats.store_durations_ms,
1799 retries_histogram: stats.retries_histogram,
1800 })
1801 }
1802 ExternalPaymentInfo::Merkle { .. } => Err(Error::Payment(
1803 "Cannot finalize merkle upload with wave-batch tx hashes. \
1804 Use finalize_upload_merkle() instead."
1805 .to_string(),
1806 )),
1807 }
1808 }
1809
1810 /// Phase 2 of external-signer upload (merkle): finalize with winner pool hash.
1811 ///
1812 /// Takes a [`PreparedUpload`] that used merkle payment and the `winner_pool_hash`
1813 /// returned by the on-chain merkle payment transaction. Generates proofs and
1814 /// stores chunks on the network.
1815 ///
1816 /// # Errors
1817 ///
1818 /// Returns an error if the prepared upload used wave-batch payment (use
1819 /// [`Client::finalize_upload`] instead), proof generation fails,
1820 /// or any chunk cannot be stored.
1821 pub async fn finalize_upload_merkle(
1822 &self,
1823 prepared: PreparedUpload,
1824 winner_pool_hash: [u8; 32],
1825 ) -> Result<FileUploadResult> {
1826 self.finalize_upload_merkle_with_progress(prepared, winner_pool_hash, None)
1827 .await
1828 }
1829
1830 /// Phase 2 of external-signer upload (merkle) with progress events.
1831 ///
1832 /// Same as [`Client::finalize_upload_merkle`] but emits [`UploadEvent::ChunkStored`]
1833 /// on the provided channel as each chunk is successfully stored.
1834 ///
1835 /// # Errors
1836 ///
1837 /// Same as [`Client::finalize_upload_merkle`].
1838 pub async fn finalize_upload_merkle_with_progress(
1839 &self,
1840 prepared: PreparedUpload,
1841 winner_pool_hash: [u8; 32],
1842 progress: Option<mpsc::Sender<UploadEvent>>,
1843 ) -> Result<FileUploadResult> {
1844 let data_map_address = prepared.data_map_address;
1845 let already_stored_count = prepared.already_stored_addresses.len();
1846 let total_chunks = prepared.total_chunks;
1847 match prepared.payment_info {
1848 ExternalPaymentInfo::Merkle {
1849 prepared_batch,
1850 chunk_contents,
1851 chunk_addresses,
1852 } => {
1853 let batch_result = finalize_merkle_batch(prepared_batch, winner_pool_hash)?;
1854 let outcome = self
1855 .merkle_upload_chunks(
1856 chunk_contents,
1857 chunk_addresses,
1858 &batch_result,
1859 progress.as_ref(),
1860 already_stored_count,
1861 total_chunks,
1862 )
1863 .await?;
1864
1865 info!(
1866 "External-signer merkle upload finalized: {} chunks stored, {} failed",
1867 outcome.stored, outcome.failed
1868 );
1869
1870 Ok(FileUploadResult {
1871 data_map: prepared.data_map,
1872 chunks_stored: outcome.stored,
1873 chunks_failed: outcome.failed,
1874 total_chunks,
1875 payment_mode_used: PaymentMode::Merkle,
1876 storage_cost_atto: "0".into(),
1877 gas_cost_wei: 0,
1878 data_map_address,
1879 chunk_attempts_total: outcome.stats.chunk_attempts_total,
1880 store_durations_ms: outcome.stats.store_durations_ms,
1881 retries_histogram: outcome.stats.retries_histogram,
1882 })
1883 }
1884 ExternalPaymentInfo::WaveBatch { .. } => Err(Error::Payment(
1885 "Cannot finalize wave-batch upload with merkle winner hash. \
1886 Use finalize_upload() instead."
1887 .to_string(),
1888 )),
1889 }
1890 }
1891
1892 /// Upload a file with a specific payment mode.
1893 ///
1894 /// Before encryption, checks that the temp directory has enough free
1895 /// disk space for the spilled chunks (~1.1× source file size).
1896 ///
1897 /// Encrypted chunks are spilled to a temp directory during encryption
1898 /// so that only their 32-byte addresses stay in memory. At upload time,
1899 /// chunks are read back one wave at a time (~64 × 4 MB ≈ 256 MB peak).
1900 ///
1901 /// # Errors
1902 ///
1903 /// Returns an error if there is insufficient disk space, the file cannot
1904 /// be read, encryption fails, or any chunk cannot be stored.
1905 #[allow(clippy::too_many_lines)]
1906 pub async fn file_upload_with_mode(
1907 &self,
1908 path: &Path,
1909 mode: PaymentMode,
1910 ) -> Result<FileUploadResult> {
1911 self.file_upload_with_progress(path, mode, None).await
1912 }
1913
1914 /// Upload a file publicly, storing the serialized [`DataMap`] as part of
1915 /// the same upload payment batch.
1916 ///
1917 /// The returned [`FileUploadResult::data_map_address`] can be shared for
1918 /// public downloads via [`Client::data_map_fetch`].
1919 #[allow(clippy::too_many_lines)]
1920 pub async fn file_upload_public_with_mode(
1921 &self,
1922 path: &Path,
1923 mode: PaymentMode,
1924 ) -> Result<FileUploadResult> {
1925 self.file_upload_with_visibility_and_progress(path, mode, Visibility::Public, None)
1926 .await
1927 }
1928
1929 /// Upload a file with progress events sent to the given channel.
1930 ///
1931 /// Same as [`Client::file_upload_with_mode`] but sends [`UploadEvent`]s to the
1932 /// provided channel for UI progress feedback.
1933 #[allow(clippy::too_many_lines)]
1934 pub async fn file_upload_with_progress(
1935 &self,
1936 path: &Path,
1937 mode: PaymentMode,
1938 progress: Option<mpsc::Sender<UploadEvent>>,
1939 ) -> Result<FileUploadResult> {
1940 self.file_upload_with_visibility_and_progress(path, mode, Visibility::Private, progress)
1941 .await
1942 }
1943
1944 /// Public file upload with progress events.
1945 ///
1946 /// Same as [`Client::file_upload_public_with_mode`] but sends
1947 /// [`UploadEvent`]s to the provided channel for UI progress feedback.
1948 #[allow(clippy::too_many_lines)]
1949 pub async fn file_upload_public_with_progress(
1950 &self,
1951 path: &Path,
1952 mode: PaymentMode,
1953 progress: Option<mpsc::Sender<UploadEvent>>,
1954 ) -> Result<FileUploadResult> {
1955 self.file_upload_with_visibility_and_progress(path, mode, Visibility::Public, progress)
1956 .await
1957 }
1958
1959 #[allow(clippy::too_many_lines)]
1960 async fn file_upload_with_visibility_and_progress(
1961 &self,
1962 path: &Path,
1963 mode: PaymentMode,
1964 visibility: Visibility,
1965 progress: Option<mpsc::Sender<UploadEvent>>,
1966 ) -> Result<FileUploadResult> {
1967 debug!(
1968 "Streaming file upload with mode {mode:?}, visibility {visibility:?}: {}",
1969 path.display()
1970 );
1971
1972 // Pre-flight: verify enough temp disk space for the chunk spill.
1973 let file_size = std::fs::metadata(path)?.len();
1974 check_disk_space_for_spill(file_size)?;
1975
1976 // Phase 1: Encrypt file and spill chunks to temp directory.
1977 // Only 32-byte addresses stay in memory — chunk data lives on disk.
1978 let (mut spill, data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
1979
1980 let data_map_address = match visibility {
1981 Visibility::Private => None,
1982 Visibility::Public => {
1983 let serialized = rmp_serde::to_vec(&data_map).map_err(|e| {
1984 Error::Serialization(format!("Failed to serialize DataMap: {e}"))
1985 })?;
1986 let address = compute_address(&serialized);
1987 info!(
1988 "Public upload: adding DataMap chunk ({} bytes) at address {} to payment batch",
1989 serialized.len(),
1990 hex::encode(address)
1991 );
1992 spill.push(&serialized)?;
1993 Some(address)
1994 }
1995 };
1996
1997 let chunk_count = spill.len();
1998 info!(
1999 "Encrypted {} into {chunk_count} chunks (spilled to disk)",
2000 path.display()
2001 );
2002 if let Some(ref tx) = progress {
2003 let _ = tx
2004 .send(UploadEvent::Encrypted {
2005 total_chunks: chunk_count,
2006 })
2007 .await;
2008 }
2009
2010 // Phase 2: Decide payment mode and upload in waves from disk.
2011 //
2012 // For the merkle path, attempt to resume from a cached
2013 // receipt before paying again. The cache is keyed by the
2014 // CANONICAL source path so `./foo`, `/abs/foo`, and any
2015 // symlink alias all resolve to the same cache entry — a
2016 // crash-and-retry from a different cwd or via a different
2017 // alias still hits the receipt. Canonicalize may fail (the
2018 // file could have been moved between phase 1 and here); we
2019 // fall back to the display string in that case, which
2020 // preserves pre-fix behaviour rather than dropping cache
2021 // resume entirely.
2022 let file_path_key = std::fs::canonicalize(path)
2023 .map(|p| p.display().to_string())
2024 .unwrap_or_else(|_| path.display().to_string());
2025 let (chunks_stored, actual_mode, storage_cost_atto, gas_cost_wei, stats) = if self
2026 .should_use_merkle(chunk_count, mode)
2027 {
2028 info!("Using merkle batch payment for {chunk_count} file chunks");
2029
2030 let cached_merkle =
2031 crate::data::client::cached_merkle::try_load_for_file(&file_path_key)
2032 .map(|(_cache_path, cached)| cached);
2033
2034 let merkle_plan = match self
2035 .plan_merkle_upload(spill.chunk_entries()?, DATA_TYPE_CHUNK, progress.as_ref())
2036 .await
2037 {
2038 Ok(plan) => plan,
2039 Err(e) => {
2040 if let Some(cached) = cached_merkle
2041 .as_ref()
2042 .filter(|cached| cached_merkle_covers_addresses(cached, &spill.addresses))
2043 {
2044 info!(
2045 "Merkle preflight failed ({e}); \
2046 resuming with cached merkle proofs"
2047 );
2048 let (stored, sc, gc, stats) = self
2049 .upload_merkle_from_spill(
2050 &spill,
2051 &spill.addresses,
2052 cached,
2053 &[],
2054 progress.as_ref(),
2055 )
2056 .await?;
2057 crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
2058 return Ok(FileUploadResult {
2059 data_map,
2060 chunks_stored: stored,
2061 chunks_failed: 0,
2062 total_chunks: chunk_count,
2063 payment_mode_used: PaymentMode::Merkle,
2064 storage_cost_atto: sc,
2065 gas_cost_wei: gc,
2066 data_map_address,
2067 chunk_attempts_total: stats.chunk_attempts_total,
2068 store_durations_ms: stats.store_durations_ms,
2069 retries_histogram: stats.retries_histogram,
2070 });
2071 }
2072 match &e {
2073 Error::InsufficientPeers(msg) if mode == PaymentMode::Auto => {
2074 info!(
2075 "Merkle preflight needs more peers ({msg}), \
2076 falling back to wave-batch"
2077 );
2078 let (stored, sc, gc, fb_stats) = self
2079 .upload_waves_single(
2080 &spill,
2081 progress.as_ref(),
2082 Some(&file_path_key),
2083 )
2084 .await?;
2085 crate::data::client::cached_single::try_delete_for_file(&file_path_key);
2086 return Ok(FileUploadResult {
2087 data_map,
2088 chunks_stored: stored,
2089 chunks_failed: 0,
2090 total_chunks: chunk_count,
2091 payment_mode_used: PaymentMode::Single,
2092 storage_cost_atto: sc,
2093 gas_cost_wei: gc,
2094 data_map_address,
2095 chunk_attempts_total: fb_stats.chunk_attempts_total,
2096 store_durations_ms: fb_stats.store_durations_ms,
2097 retries_histogram: fb_stats.retries_histogram,
2098 });
2099 }
2100 _ => return Err(e),
2101 }
2102 }
2103 };
2104
2105 if merkle_plan.to_upload.is_empty() {
2106 info!("All {chunk_count} merkle chunks already stored; skipping payment");
2107 crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
2108 crate::data::client::cached_single::try_delete_for_file(&file_path_key);
2109 (
2110 chunk_count,
2111 PaymentMode::Merkle,
2112 "0".to_string(),
2113 0,
2114 WaveAggregateStats::default(),
2115 )
2116 } else if !self.should_use_merkle(merkle_plan.to_upload.len(), mode) {
2117 let remaining_chunks = merkle_plan.to_upload.len();
2118 if let Some(cached) = cached_merkle
2119 .as_ref()
2120 .filter(|cached| cached_merkle_covers_addresses(cached, &merkle_plan.to_upload))
2121 {
2122 info!(
2123 "{remaining_chunks} chunks remain below merkle threshold; \
2124 reusing cached merkle proofs"
2125 );
2126 let (stored, sc, gc, stats) = self
2127 .upload_merkle_from_spill(
2128 &spill,
2129 &merkle_plan.to_upload,
2130 cached,
2131 &merkle_plan.already_stored,
2132 progress.as_ref(),
2133 )
2134 .await?;
2135 crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
2136 (stored, PaymentMode::Merkle, sc, gc, stats)
2137 } else {
2138 if cached_merkle.is_some() {
2139 info!(
2140 "{remaining_chunks} chunks remain below merkle threshold, \
2141 and the cached merkle receipt does not cover them. \
2142 Discarding cache and using single-node payment."
2143 );
2144 crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
2145 } else {
2146 info!(
2147 "{remaining_chunks} chunks need upload after merkle preflight; \
2148 using single-node payment"
2149 );
2150 }
2151 let (stored, sc, gc, stats) = self
2152 .upload_spill_addresses_single(
2153 &spill,
2154 &merkle_plan.to_upload,
2155 progress.as_ref(),
2156 &merkle_plan.already_stored,
2157 chunk_count,
2158 Some(&file_path_key),
2159 )
2160 .await?;
2161 crate::data::client::cached_single::try_delete_for_file(&file_path_key);
2162 (stored, PaymentMode::Single, sc, gc, stats)
2163 }
2164 } else {
2165 let batch_result = if let Some(cached) = cached_merkle.as_ref() {
2166 // Validate the cache against the chunks that still need
2167 // storage. Extra proofs are harmless: a previous attempt
2168 // may have paid for chunks that are now already stored.
2169 if cached_merkle_covers_addresses(cached, &merkle_plan.to_upload) {
2170 info!(
2171 "Skipping merkle payment phase; resuming with \
2172 cached proofs for {} remaining chunks",
2173 merkle_plan.to_upload.len()
2174 );
2175 Ok(cached.clone())
2176 } else {
2177 info!(
2178 "Cached merkle receipt does not cover the current \
2179 remaining chunks (cached={}, remaining={}). \
2180 Discarding cache and paying fresh.",
2181 cached.proofs.len(),
2182 merkle_plan.to_upload.len()
2183 );
2184 crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
2185 self.pay_for_merkle_batch(
2186 &merkle_plan.to_upload,
2187 DATA_TYPE_CHUNK,
2188 merkle_plan.to_upload_avg_size(),
2189 )
2190 .await
2191 .inspect(|result| {
2192 crate::data::client::cached_merkle::try_save(&file_path_key, result);
2193 })
2194 }
2195 } else {
2196 self.pay_for_merkle_batch(
2197 &merkle_plan.to_upload,
2198 DATA_TYPE_CHUNK,
2199 merkle_plan.to_upload_avg_size(),
2200 )
2201 .await
2202 .inspect(|result| {
2203 // Save BEFORE the store phase so a crash
2204 // mid-upload leaves a resumable receipt.
2205 crate::data::client::cached_merkle::try_save(&file_path_key, result);
2206 })
2207 };
2208
2209 let batch_result = match batch_result {
2210 Ok(result) => result,
2211 Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => {
2212 info!("Merkle needs more peers ({msg}), falling back to wave-batch");
2213 let (stored, sc, gc, fb_stats) = self
2214 .upload_spill_addresses_single(
2215 &spill,
2216 &merkle_plan.to_upload,
2217 progress.as_ref(),
2218 &merkle_plan.already_stored,
2219 chunk_count,
2220 Some(&file_path_key),
2221 )
2222 .await?;
2223 crate::data::client::cached_single::try_delete_for_file(&file_path_key);
2224 return Ok(FileUploadResult {
2225 data_map,
2226 chunks_stored: stored,
2227 chunks_failed: 0,
2228 total_chunks: chunk_count,
2229 payment_mode_used: PaymentMode::Single,
2230 storage_cost_atto: sc,
2231 gas_cost_wei: gc,
2232 data_map_address,
2233 chunk_attempts_total: fb_stats.chunk_attempts_total,
2234 store_durations_ms: fb_stats.store_durations_ms,
2235 retries_histogram: fb_stats.retries_histogram,
2236 });
2237 }
2238 Err(e) => return Err(e),
2239 };
2240
2241 let (stored, sc, gc, stats) = self
2242 .upload_merkle_from_spill(
2243 &spill,
2244 &merkle_plan.to_upload,
2245 &batch_result,
2246 &merkle_plan.already_stored,
2247 progress.as_ref(),
2248 )
2249 .await?;
2250 // Upload succeeded end-to-end; the cached receipt is
2251 // no longer needed.
2252 crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
2253 (stored, PaymentMode::Merkle, sc, gc, stats)
2254 }
2255 } else {
2256 let (stored, sc, gc, stats) = self
2257 .upload_waves_single(&spill, progress.as_ref(), Some(&file_path_key))
2258 .await?;
2259 // Full file success: drop any cached single-node receipt.
2260 crate::data::client::cached_single::try_delete_for_file(&file_path_key);
2261 (stored, PaymentMode::Single, sc, gc, stats)
2262 };
2263
2264 info!(
2265 "File uploaded with {actual_mode:?}: {chunks_stored} chunks stored ({})",
2266 path.display()
2267 );
2268
2269 Ok(FileUploadResult {
2270 data_map,
2271 chunks_stored,
2272 chunks_failed: 0,
2273 total_chunks: chunk_count,
2274 payment_mode_used: actual_mode,
2275 storage_cost_atto,
2276 gas_cost_wei,
2277 data_map_address,
2278 chunk_attempts_total: stats.chunk_attempts_total,
2279 store_durations_ms: stats.store_durations_ms,
2280 retries_histogram: stats.retries_histogram,
2281 })
2282 }
2283
2284 /// Encrypt a file and spill chunks to a temp directory.
2285 ///
2286 /// Logs progress every 100 chunks so users get feedback during
2287 /// multi-GB encryptions.
2288 ///
2289 /// Returns the spill buffer (addresses on disk) and the `DataMap`.
2290 async fn encrypt_file_to_spill(
2291 &self,
2292 path: &Path,
2293 progress: Option<&mpsc::Sender<UploadEvent>>,
2294 ) -> Result<(ChunkSpill, DataMap)> {
2295 let (mut chunk_rx, datamap_rx, handle) = spawn_file_encryption(path.to_path_buf())?;
2296
2297 let mut spill = ChunkSpill::new()?;
2298 while let Some(content) = chunk_rx.recv().await {
2299 spill.push(&content)?;
2300 let chunks_done = spill.len();
2301 if let Some(tx) = progress {
2302 if chunks_done.is_multiple_of(10) {
2303 let _ = tx.send(UploadEvent::Encrypting { chunks_done }).await;
2304 }
2305 }
2306 if chunks_done % 100 == 0 {
2307 let mb = spill.total_bytes() / (1024 * 1024);
2308 info!(
2309 "Encryption progress: {chunks_done} chunks spilled ({mb} MB) — {}",
2310 path.display()
2311 );
2312 }
2313 }
2314
2315 // Await encryption completion to catch errors before paying.
2316 handle
2317 .await
2318 .map_err(|e| Error::Encryption(format!("encryption task panicked: {e}")))?
2319 .map_err(|e| Error::Encryption(format!("encryption failed: {e}")))?;
2320
2321 let data_map = datamap_rx
2322 .await
2323 .map_err(|_| Error::Encryption("no DataMap from encryption thread".to_string()))?;
2324
2325 Ok((spill, data_map))
2326 }
2327
2328 /// Upload chunks from a spill using wave-based per-chunk (single) payments.
2329 ///
2330 /// Reads one wave at a time from disk, prepares quotes, pays, and stores.
2331 /// Peak memory: ~`UPLOAD_WAVE_SIZE × MAX_CHUNK_SIZE` (~256 MB).
2332 ///
2333 /// Returns `(chunks_stored, storage_cost_atto, gas_cost_wei)`.
2334 async fn upload_waves_single(
2335 &self,
2336 spill: &ChunkSpill,
2337 progress: Option<&mpsc::Sender<UploadEvent>>,
2338 resume_key: Option<&str>,
2339 ) -> Result<(usize, String, u128, WaveAggregateStats)> {
2340 self.upload_spill_addresses_single(
2341 spill,
2342 &spill.addresses,
2343 progress,
2344 &[],
2345 spill.len(),
2346 resume_key,
2347 )
2348 .await
2349 }
2350
2351 async fn upload_spill_addresses_single(
2352 &self,
2353 spill: &ChunkSpill,
2354 addresses: &[[u8; 32]],
2355 progress: Option<&mpsc::Sender<UploadEvent>>,
2356 already_stored_addresses: &[[u8; 32]],
2357 total_chunks: usize,
2358 resume_key: Option<&str>,
2359 ) -> Result<(usize, String, u128, WaveAggregateStats)> {
2360 let mut total_stored = already_stored_addresses.len();
2361 let mut total_storage = Amount::ZERO;
2362 let mut total_gas: u128 = 0;
2363 let mut agg_stats = WaveAggregateStats::default();
2364 // A wave whose chunks fall short of quorum after retries must not abort
2365 // the file: its failures are accumulated here and surfaced as a single
2366 // `PartialUpload` only after every wave has been attempted, mirroring
2367 // `upload_merkle_from_spill`. Aborting on the first failed wave (the old `?`)
2368 // discarded all later waves' progress — already self-encrypted, spilled,
2369 // and in some cases already paid for — converting high per-chunk success
2370 // into 0% per-file success.
2371 // Seed with the addresses a preflight already confirmed stored (e.g.
2372 // the merkle-fallback path passes `merkle_plan.already_stored`), so a
2373 // returned `PartialUpload.stored` lists every stored chunk and
2374 // `stored_count == stored.len()` holds for programmatic callers.
2375 let mut stored_addresses: Vec<[u8; 32]> = already_stored_addresses.to_vec();
2376 let mut failed: Vec<([u8; 32], String)> = Vec::new();
2377 let waves: Vec<&[[u8; 32]]> = addresses.chunks(UPLOAD_WAVE_SIZE).collect();
2378 let wave_count = waves.len();
2379
2380 // Unconditional breadcrumb: lets a clean run confirm the continue-on-
2381 // partial single-node path is in effect (the old path aborted the file
2382 // on the first failed wave instead of continuing across all waves).
2383 info!(
2384 "single-node upload: {} chunk(s) in {wave_count} wave(s) (continue-on-partial)",
2385 addresses.len()
2386 );
2387
2388 for (wave_idx, wave_addrs) in waves.into_iter().enumerate() {
2389 let wave_num = wave_idx + 1;
2390 let wave_data: Vec<Bytes> = wave_addrs
2391 .iter()
2392 .map(|addr| spill.read_chunk(addr))
2393 .collect::<Result<Vec<_>>>()?;
2394
2395 info!(
2396 "Wave {wave_num}/{wave_count}: quoting {} chunks — {total_stored}/{total_chunks} stored so far",
2397 wave_data.len()
2398 );
2399 if let Some(tx) = progress {
2400 let _ = tx
2401 .send(UploadEvent::QuotingChunks {
2402 wave: wave_num,
2403 total_waves: wave_count,
2404 chunks_in_wave: wave_data.len(),
2405 })
2406 .await;
2407 }
2408 // Fold this wave's result. A quorum shortfall (`PartialUpload`) is
2409 // recoverable and its parts are returned to be recorded here;
2410 // genuinely fatal errors propagate via `?` and abort the file, as in
2411 // `upload_merkle_from_spill`.
2412 let outcome = fold_single_wave(
2413 self.batch_upload_chunks_with_events(
2414 wave_data,
2415 progress,
2416 total_stored,
2417 total_chunks,
2418 resume_key,
2419 )
2420 .await,
2421 )?;
2422
2423 if !outcome.failed.is_empty() {
2424 warn!(
2425 "Wave {wave_num}/{wave_count}: {} chunk(s) failed to store after retries; \
2426 continuing with remaining waves",
2427 outcome.failed.len()
2428 );
2429 }
2430
2431 total_stored += outcome.stored.len();
2432 stored_addresses.extend(outcome.stored);
2433 failed.extend(outcome.failed);
2434 total_storage += outcome.storage_atto;
2435 total_gas = total_gas.saturating_add(outcome.gas_wei);
2436 // Merge per-wave stats (a quorum-short wave contributes none, since
2437 // `PartialUpload` carries no stats).
2438 agg_stats.chunk_attempts_total = agg_stats
2439 .chunk_attempts_total
2440 .saturating_add(outcome.stats.chunk_attempts_total);
2441 agg_stats
2442 .store_durations_ms
2443 .extend(outcome.stats.store_durations_ms);
2444 for (slot, count) in agg_stats
2445 .retries_histogram
2446 .iter_mut()
2447 .zip(outcome.stats.retries_histogram.iter())
2448 {
2449 *slot = slot.saturating_add(*count);
2450 }
2451 }
2452
2453 // Any chunk still failed after every wave was attempted means the file
2454 // is not fully stored — surface it as `PartialUpload` (never silently
2455 // succeed with missing chunks), carrying the real on-chain spend.
2456 if !failed.is_empty() {
2457 let failed_count = failed.len();
2458 warn!(
2459 "single-node upload incomplete: {failed_count}/{total_chunks} chunks failed after retries"
2460 );
2461 return Err(Error::PartialUpload {
2462 stored: stored_addresses,
2463 stored_count: total_stored,
2464 failed,
2465 failed_count,
2466 total_chunks,
2467 spend: Box::new(PartialUploadSpend {
2468 storage_cost_atto: total_storage.to_string(),
2469 gas_cost_wei: total_gas,
2470 }),
2471 reason: format!("{failed_count} chunk(s) failed to store after retries"),
2472 });
2473 }
2474
2475 Ok((
2476 total_stored,
2477 total_storage.to_string(),
2478 total_gas,
2479 agg_stats,
2480 ))
2481 }
2482
2483 /// Upload chunks from a spill using pre-computed merkle proofs.
2484 ///
2485 /// Stores the whole file as a **single cap-bounded fan-out** — not in fixed
2486 /// waves. The store concurrency limiter is the only throttle: `store_one`
2487 /// reads each chunk's body from the on-disk spill on demand, so at most
2488 /// `store_cap` (≤ 64) bodies are ever resident, giving the same
2489 /// `~store_cap × MAX_CHUNK_SIZE` peak-memory bound the old 64-chunk waves
2490 /// gave — but with **no wave barrier**, so a slow straggler (e.g. a chunk
2491 /// whose close-group peers are stale relayed addresses that take minutes to
2492 /// revalidate) no longer stalls the rest of the file behind it.
2493 ///
2494 /// A chunk that is transiently short of quorum (`InsufficientPeers` /
2495 /// `CloseGroupShortfall` / `RemotePut`) does **not** abort the file, nor
2496 /// block the pass: the store pass is a **single attempt** (no in-pass
2497 /// backoff), and quorum-short chunks are collected into a deferred set. After
2498 /// the pass, [`merkle_deferred_retry`] retries that set in concurrent rounds
2499 /// ([`DEFERRED_ROUND_DELAYS_SECS`] delays), re-reading each body from the
2500 /// spill and reusing its proof. Non-quorum errors (e.g. a missing proof)
2501 /// stay fatal and abort immediately.
2502 ///
2503 /// Returns `(chunks_stored, storage_cost_atto, gas_cost_wei)` on success.
2504 /// Costs come from the `batch_result` which was populated during payment.
2505 ///
2506 /// # Errors
2507 ///
2508 /// Returns [`Error::PartialUpload`] if any chunk is still short of quorum
2509 /// after the store pass and every deferred round (other chunks remain
2510 /// stored), or the underlying error for a non-quorum failure.
2511 async fn upload_merkle_from_spill(
2512 &self,
2513 spill: &ChunkSpill,
2514 addresses: &[[u8; 32]],
2515 batch_result: &MerkleBatchPaymentResult,
2516 already_stored_addresses: &[[u8; 32]],
2517 progress: Option<&mpsc::Sender<UploadEvent>>,
2518 ) -> Result<(usize, String, u128, WaveAggregateStats)> {
2519 let mut total_stored = already_stored_addresses.len();
2520 let total_chunks = total_stored + addresses.len();
2521 let mut stored_addresses: Vec<[u8; 32]> = already_stored_addresses.to_vec();
2522 let mut failed: Vec<([u8; 32], String)> = Vec::new();
2523 let mut agg_stats = WaveAggregateStats::default();
2524
2525 // Chunks without a merkle proof were never paid for: a partial
2526 // `pay_for_merkle_multi_batch` result carries proofs only for the
2527 // sub-batches whose on-chain payment succeeded. Such a chunk cannot be
2528 // stored, so record it as failed (surfaced via `PartialUpload` once the
2529 // storable chunks have been attempted) rather than letting its
2530 // "missing proof" error abort the whole file and discard every other
2531 // chunk's progress.
2532 let (to_store, missing_proof) =
2533 partition_addresses_by_proof(addresses, &batch_result.proofs);
2534 if !missing_proof.is_empty() {
2535 warn!(
2536 "{} chunk(s) lack a merkle proof (partial payment); reporting them as failed",
2537 missing_proof.len()
2538 );
2539 for addr in &missing_proof {
2540 failed.push((
2541 *addr,
2542 format!("Missing merkle proof for chunk {}", hex::encode(addr)),
2543 ));
2544 }
2545 }
2546
2547 let store_limiter = self.controller().store.clone();
2548
2549 // Store one chunk to its (freshly re-collected) close group, reusing the
2550 // chunk's merkle proof. Reads the body from the on-disk spill on demand,
2551 // so the whole-file store runs as ONE cap-bounded fan-out with no per-wave
2552 // barrier: a slow straggler (e.g. a chunk whose close-group peers are
2553 // stale relayed addresses that take minutes to revalidate) no longer
2554 // holds back the rest of the file. Only the ≤cap in-flight stores hold a
2555 // body, so peak resident memory is `cap × MAX_CHUNK_SIZE`; the cap is
2556 // clamped to `MERKLE_STORE_MAX_IN_FLIGHT` (below) so it stays within the
2557 // ~256 MiB bound the fixed 64-chunk waves gave even if `adaptive.max.store`
2558 // is configured above 64.
2559 // Shared across every deferred round so a converged routing table yields
2560 // a fresh group. Only a quorum shortfall is recoverable; a missing proof
2561 // or a failed spill read stays fatal. Mirrors `merkle_upload_chunks`.
2562 let store_one = |addr: [u8; 32]| {
2563 let limiter = store_limiter.clone();
2564 let proof_bytes = batch_result.proofs.get(&addr).cloned();
2565 async move {
2566 let started = std::time::Instant::now();
2567 let proof = proof_bytes.ok_or_else(|| {
2568 Error::Payment(format!(
2569 "Missing merkle proof for chunk {}",
2570 hex::encode(addr)
2571 ))
2572 })?;
2573 let content = spill.read_chunk(&addr)?;
2574 let peers = self.put_target_peers(&addr).await?;
2575 observe_op(
2576 &limiter,
2577 || async move { self.chunk_put_to_close_group(content, proof, &peers).await },
2578 classify_error,
2579 )
2580 .await
2581 .map(|_| started)
2582 }
2583 };
2584
2585 info!(
2586 "Storing {} chunks (merkle) as a single cap-bounded pass — {total_stored}/{total_chunks} stored so far",
2587 to_store.len()
2588 );
2589
2590 // Store the WHOLE file in one cap-bounded fan-out (`max_attempts = 1`, no
2591 // backoff): no wave barrier, so a slow straggler (dead-relay peers) can't
2592 // hold back the rest of the file. The store cap re-reads the limiter per
2593 // slot, so it maxes at 64 → ≤64 bodies resident (bodies read from spill on
2594 // demand by `store_one`), the same peak-memory bound the fixed 64-chunk
2595 // waves gave. Quorum-short chunks are collected and deferred to the
2596 // post-pass concurrent retry rather than parking slots behind a backoff.
2597 // `merkle_store_cap` clamps to `MERKLE_STORE_MAX_IN_FLIGHT` so a high
2598 // configured `adaptive.max.store` can't hold more than the wave-era
2599 // ~256 MB of spilled bodies resident (PR #137 review).
2600 let cap = || merkle_store_cap(store_limiter.current());
2601 let outcome = merkle_store_with_retry(
2602 to_store.clone(),
2603 cap,
2604 1,
2605 std::time::Duration::ZERO,
2606 progress,
2607 total_stored,
2608 total_chunks,
2609 &store_one,
2610 )
2611 .await?;
2612
2613 // Record confirmed stores from the explicit set the store helper reports.
2614 // Using that set (rather than inferring "chunks minus failed") keeps
2615 // `stored_addresses` correct even when a fatal abort leaves some chunks
2616 // neither stored nor reported short of quorum.
2617 stored_addresses.extend(&outcome.stored_addresses);
2618 total_stored = outcome.stored;
2619
2620 // Merge store stats (durations, attempts, per-round histogram).
2621 agg_stats.chunk_attempts_total = agg_stats
2622 .chunk_attempts_total
2623 .saturating_add(outcome.stats.chunk_attempts_total);
2624 agg_stats
2625 .store_durations_ms
2626 .extend(outcome.stats.store_durations_ms);
2627 for (slot, count) in agg_stats
2628 .retries_histogram
2629 .iter_mut()
2630 .zip(outcome.stats.retries_histogram.iter())
2631 {
2632 *slot = slot.saturating_add(*count);
2633 }
2634
2635 if let Some(e) = outcome.fatal {
2636 // A non-quorum store error is fatal (missing proofs were filtered out
2637 // above, so this is a genuine network/store failure). Preserve every
2638 // chunk stored so far and report every not-stored chunk as failed, so
2639 // the `PartialUpload` counts are accurate.
2640 warn!("merkle store aborted: {e}");
2641 let mut known_failed = failed;
2642 known_failed.extend(outcome.failed_addresses);
2643 return Err(partial_upload_after_fatal(
2644 addresses,
2645 stored_addresses,
2646 total_stored,
2647 total_chunks,
2648 known_failed,
2649 PartialUploadSpend {
2650 storage_cost_atto: batch_result.storage_cost_atto.clone(),
2651 gas_cost_wei: batch_result.gas_cost_wei,
2652 },
2653 format!("merkle chunk store aborted: {e}"),
2654 ));
2655 }
2656
2657 // Non-fatal: quorum-short chunks are deferred (not failed yet) for the
2658 // post-pass concurrent retry. A deferred chunk joins `stored_addresses`
2659 // only if/when a later round stores it.
2660 let deferred: Vec<([u8; 32], String)> = outcome.failed_addresses;
2661
2662 // The store pass never blocked on backoff; now retry the deferred set in
2663 // concurrent rounds. Bodies are re-read from the spill by `store_one`
2664 // (peak RAM unchanged) and proofs re-attached. Chunks still short after
2665 // the final round become `failed`; a non-quorum error aborts as
2666 // `PartialUpload`.
2667 if !deferred.is_empty() {
2668 info!(
2669 "Deferring {} merkle chunk(s) short of quorum for concurrent retry after the store pass",
2670 deferred.len()
2671 );
2672 let dr = merkle_deferred_retry(
2673 deferred,
2674 &DEFERRED_ROUND_DELAYS_SECS,
2675 |n: usize| merkle_store_cap(store_limiter.current()).min(n.max(1)),
2676 progress,
2677 total_stored,
2678 total_chunks,
2679 &store_one,
2680 )
2681 .await?;
2682
2683 stored_addresses.extend(dr.stored_addresses);
2684 total_stored = dr.stored;
2685
2686 // Merge the deferred pass's stats — its histogram is already mapped
2687 // to the right per-round slots — into the file aggregate.
2688 agg_stats.chunk_attempts_total = agg_stats
2689 .chunk_attempts_total
2690 .saturating_add(dr.stats.chunk_attempts_total);
2691 agg_stats
2692 .store_durations_ms
2693 .extend(dr.stats.store_durations_ms);
2694 for (slot, count) in agg_stats
2695 .retries_histogram
2696 .iter_mut()
2697 .zip(dr.stats.retries_histogram.iter())
2698 {
2699 *slot = slot.saturating_add(*count);
2700 }
2701
2702 if let Some(reason) = dr.fatal {
2703 // A non-quorum store error during a deferred round is fatal, the
2704 // same as in the wave path: preserve everything stored so far and
2705 // report every not-stored chunk as failed.
2706 warn!("merkle deferred retry aborted: {reason}");
2707 let mut known_failed = failed;
2708 known_failed.extend(dr.failed_addresses);
2709 return Err(partial_upload_after_fatal(
2710 addresses,
2711 stored_addresses,
2712 total_stored,
2713 total_chunks,
2714 known_failed,
2715 PartialUploadSpend {
2716 storage_cost_atto: batch_result.storage_cost_atto.clone(),
2717 gas_cost_wei: batch_result.gas_cost_wei,
2718 },
2719 format!("merkle chunk store aborted: {reason}"),
2720 ));
2721 }
2722 failed.extend(dr.failed_addresses);
2723 }
2724
2725 // A file with any permanently-failed chunk is not fully stored — surface
2726 // it as `PartialUpload`, but only after the store pass and every deferred
2727 // retry round are exhausted (never silently succeed with missing chunks).
2728 if !failed.is_empty() {
2729 let failed_count = failed.len();
2730 let total_attempts = 1 + DEFERRED_ROUND_DELAYS_SECS.len();
2731 warn!(
2732 "merkle upload incomplete: {failed_count}/{total_chunks} chunks short of quorum after retries"
2733 );
2734 return Err(Error::PartialUpload {
2735 stored: stored_addresses,
2736 stored_count: total_stored,
2737 failed,
2738 failed_count,
2739 total_chunks,
2740 spend: Box::new(PartialUploadSpend {
2741 storage_cost_atto: batch_result.storage_cost_atto.clone(),
2742 gas_cost_wei: batch_result.gas_cost_wei,
2743 }),
2744 reason: format!(
2745 "{failed_count} chunk(s) short of quorum after {total_attempts} attempts"
2746 ),
2747 });
2748 }
2749
2750 Ok((
2751 total_stored,
2752 batch_result.storage_cost_atto.clone(),
2753 batch_result.gas_cost_wei,
2754 agg_stats,
2755 ))
2756 }
2757
2758 /// Download and decrypt a file from the network, writing it to disk.
2759 ///
2760 /// Uses `streaming_decrypt` so that only one batch of chunks lives in
2761 /// memory at a time, avoiding OOM on large files. Chunks are fetched
2762 /// concurrently within each batch, then decrypted data is written to
2763 /// disk incrementally.
2764 ///
2765 /// Returns the number of bytes written.
2766 ///
2767 /// # Panics
2768 ///
2769 /// Requires a multi-threaded Tokio runtime (`flavor = "multi_thread"`).
2770 /// Will panic if called from a `current_thread` runtime because
2771 /// `streaming_decrypt` takes a synchronous callback that must bridge
2772 /// back to async via `block_in_place`.
2773 ///
2774 /// # Errors
2775 ///
2776 /// Returns an error if any chunk cannot be retrieved, decryption fails,
2777 /// or the file cannot be written.
2778 pub async fn file_download(&self, data_map: &DataMap, output: &Path) -> Result<u64> {
2779 self.file_download_with_progress(data_map, output, None)
2780 .await
2781 }
2782
2783 /// Download and decrypt a file, trying the requested number of
2784 /// closest peers for every chunk fetch.
2785 ///
2786 /// Returns the number of bytes written.
2787 ///
2788 /// # Errors
2789 ///
2790 /// Returns an error if any chunk cannot be retrieved, decryption fails,
2791 /// or the file cannot be written.
2792 pub async fn file_download_from_closest_peers(
2793 &self,
2794 data_map: &DataMap,
2795 output: &Path,
2796 peer_count: NonZeroUsize,
2797 ) -> Result<u64> {
2798 self.file_download_with_progress_using_peer_count(data_map, output, None, peer_count.get())
2799 .await
2800 }
2801
2802 /// Download and decrypt a file with progress events, trying the
2803 /// requested number of closest peers for every chunk fetch.
2804 ///
2805 /// Same as [`Client::file_download_from_closest_peers`] but sends
2806 /// [`DownloadEvent`]s for UI feedback.
2807 ///
2808 /// # Errors
2809 ///
2810 /// Returns an error if any chunk cannot be retrieved, decryption fails,
2811 /// or the file cannot be written.
2812 pub async fn file_download_with_progress_from_closest_peers(
2813 &self,
2814 data_map: &DataMap,
2815 output: &Path,
2816 progress: Option<mpsc::Sender<DownloadEvent>>,
2817 peer_count: NonZeroUsize,
2818 ) -> Result<u64> {
2819 self.file_download_with_progress_using_peer_count(
2820 data_map,
2821 output,
2822 progress,
2823 peer_count.get(),
2824 )
2825 .await
2826 }
2827
2828 /// Download and decrypt a file with peer-health diagnostics.
2829 ///
2830 /// Each file chunk is fetched by querying every selected closest peer,
2831 /// not by returning after the first successful peer. The returned report
2832 /// records which peers had each chunk and which did not. DataMap
2833 /// resolution still uses the normal early-return fetch path; diagnostics
2834 /// are for file chunks only.
2835 ///
2836 /// # Errors
2837 ///
2838 /// Returns an error if any chunk cannot be retrieved, decryption fails,
2839 /// or the file cannot be written.
2840 pub async fn file_download_with_peer_report_from_closest_peers(
2841 &self,
2842 data_map: &DataMap,
2843 output: &Path,
2844 progress: Option<mpsc::Sender<DownloadEvent>>,
2845 peer_count: NonZeroUsize,
2846 ) -> Result<FileDownloadWithPeerReport> {
2847 let chunk_reports = Arc::new(Mutex::new(Vec::new()));
2848 let bytes_written = self
2849 .file_download_with_progress_using_peer_count_and_reports(
2850 data_map,
2851 output,
2852 progress,
2853 peer_count.get(),
2854 Some(chunk_reports.clone()),
2855 )
2856 .await?;
2857
2858 let chunk_reports = chunk_reports
2859 .lock()
2860 .map_err(|_| Error::Storage("file chunk peer report lock poisoned".to_string()))?
2861 .clone();
2862 let chunk_reports = file_chunk_reports_from_recorded_sweeps(chunk_reports);
2863
2864 Ok(FileDownloadWithPeerReport {
2865 bytes_written,
2866 chunk_reports,
2867 })
2868 }
2869
2870 async fn download_fetch_file_chunk(
2871 &self,
2872 idx: usize,
2873 hash: XorName,
2874 context: FileDownloadFetchContext,
2875 is_deferred_retry: bool,
2876 attempt: usize,
2877 ) -> std::result::Result<DownloadBatchEntry, self_encryption::Error> {
2878 let addr = hash.0;
2879 let addr_hex = hex::encode(addr);
2880
2881 let chunk_content = if let Some(peer_reports) = context.peer_reports {
2882 match self
2883 .chunk_get_from_closest_peer_group(&addr, context.peer_count)
2884 .await
2885 {
2886 Ok(results) => {
2887 let (content, sweep) = file_chunk_sweep_report_from_peer_results(
2888 attempt,
2889 is_deferred_retry,
2890 &results,
2891 );
2892 peer_reports
2893 .lock()
2894 .map_err(|_| {
2895 self_encryption::Error::Generic(
2896 "file chunk peer report lock poisoned".to_string(),
2897 )
2898 })?
2899 .push(RecordedFileChunkPeerSweep {
2900 index: idx + 1,
2901 address: addr,
2902 sweep,
2903 });
2904 content
2905 }
2906 Err(e) => {
2907 if is_deferred_retry {
2908 info!(
2909 "Deferred all-peer retry for {addr_hex} hit transient error: {e}; re-deferring"
2910 );
2911 } else {
2912 info!("First-pass all-peer fetch error for {addr_hex}: {e}; deferring");
2913 }
2914 peer_reports
2915 .lock()
2916 .map_err(|_| {
2917 self_encryption::Error::Generic(
2918 "file chunk peer report lock poisoned".to_string(),
2919 )
2920 })?
2921 .push(RecordedFileChunkPeerSweep {
2922 index: idx + 1,
2923 address: addr,
2924 sweep: file_chunk_sweep_report_from_error(
2925 attempt,
2926 is_deferred_retry,
2927 &e,
2928 ),
2929 });
2930 None
2931 }
2932 }
2933 } else {
2934 match self
2935 .chunk_get_observed_from_closest_peers(&addr, context.peer_count)
2936 .await
2937 {
2938 Ok(Some(chunk)) => Some(chunk.content),
2939 Ok(None) => None,
2940 Err(e) => {
2941 if is_deferred_retry {
2942 info!(
2943 "Deferred retry for {addr_hex} hit transient error: {e}; re-deferring"
2944 );
2945 } else {
2946 info!("First-pass fetch error for {addr_hex}: {e}; deferring");
2947 }
2948 None
2949 }
2950 }
2951 };
2952
2953 let Some(content) = chunk_content else {
2954 return Ok((idx, Err(hash)));
2955 };
2956
2957 let fetched = context
2958 .fetched_ref
2959 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
2960 + 1;
2961 if is_deferred_retry {
2962 info!(
2963 "Downloaded {fetched}/{} (deferred retry)",
2964 context.total_chunks
2965 );
2966 } else {
2967 let total_chunks = context.total_chunks;
2968 info!("Downloaded {fetched}/{total_chunks}");
2969 }
2970 if let Some(ref tx) = context.progress_ref {
2971 let _ = tx.try_send(DownloadEvent::ChunksFetched {
2972 fetched,
2973 total: context.total_chunks,
2974 });
2975 }
2976
2977 Ok((idx, Ok(content)))
2978 }
2979
2980 /// Shared download core: resolve the DataMap, then fetch + streaming-decrypt
2981 /// the file one batch at a time, handing each decrypted plaintext segment
2982 /// (in order) to `on_chunk`. Constant memory — only one decrypt batch is
2983 /// resident at a time. Returns the total plaintext bytes produced.
2984 ///
2985 /// `on_chunk` is async so a sink can apply backpressure (e.g. a bounded
2986 /// channel). Driving the decrypt iterator runs the batched chunk fetch via
2987 /// `block_in_place`, so this requires a multi-threaded Tokio runtime.
2988 ///
2989 /// Every chunk fetch tries `peer_count` closest peers.
2990 ///
2991 /// Progress reporting (via `progress`):
2992 /// 1. Resolves hierarchical DataMaps to the root level first (reports as
2993 /// `ChunksFetched` with `total: 0` during resolution)
2994 /// 2. Once the root DataMap is known, sends `total_chunks` with accurate count
2995 /// 3. Fetches data chunks with accurate `fetched/total` progress
2996 async fn download_decrypted_chunks<F, Fut>(
2997 &self,
2998 data_map: &DataMap,
2999 progress: Option<mpsc::Sender<DownloadEvent>>,
3000 peer_count: usize,
3001 peer_reports: Option<Arc<Mutex<Vec<RecordedFileChunkPeerSweep>>>>,
3002 mut on_chunk: F,
3003 ) -> Result<u64>
3004 where
3005 F: FnMut(Bytes) -> Fut,
3006 Fut: std::future::Future<Output = Result<()>>,
3007 {
3008 let handle = Handle::current();
3009
3010 // Phase 1: Resolve hierarchical DataMap to root level.
3011 // This fetches child DataMap chunks (typically 3) to discover the real chunk count.
3012 let root_map = if data_map.is_child() {
3013 let dm_chunks = data_map.len();
3014 if let Some(ref tx) = progress {
3015 let _ = tx.try_send(DownloadEvent::ResolvingDataMap {
3016 total_map_chunks: dm_chunks,
3017 });
3018 }
3019
3020 let resolve_progress = progress.clone();
3021 let resolve_counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
3022
3023 let resolved = tokio::task::block_in_place(|| {
3024 let counter_ref = resolve_counter.clone();
3025 let progress_ref = resolve_progress.clone();
3026 let fetch_limiter = self.controller().fetch.clone();
3027 let fetch = |batch: &[(usize, XorName)]| {
3028 let batch_owned: Vec<(usize, XorName)> = batch.to_vec();
3029 let counter = counter_ref.clone();
3030 let prog = progress_ref.clone();
3031 let limiter = fetch_limiter.clone();
3032 handle.block_on(async {
3033 // Use rebucketed_unordered so the in-flight cap
3034 // is re-read from the limiter as each slot frees.
3035 // `buffer_unordered` snapshots the cap once at
3036 // pipeline build, which means observe_op
3037 // signals from inside chunk_get cannot reduce
3038 // concurrency on the current batch — exactly
3039 // the case where load-shedding is needed.
3040 let mut results = rebucketed_unordered(
3041 &limiter,
3042 batch_owned,
3043 |(idx, hash): (usize, XorName)| {
3044 let counter = counter.clone();
3045 let prog = prog.clone();
3046 async move {
3047 let addr = hash.0;
3048 // chunk_get_observed feeds the
3049 // adaptive fetch limiter once per
3050 // call via chunk_get_outcome
3051 // (Ok(None) -> Timeout is the
3052 // load-shedding signal for
3053 // sustained close-group exhaustion).
3054 let chunk = self
3055 .chunk_get_observed_from_closest_peers(&addr, peer_count)
3056 .await
3057 .map_err(|e| {
3058 self_encryption::Error::Generic(format!(
3059 "DataMap resolution failed: {e}"
3060 ))
3061 })?
3062 .ok_or_else(|| {
3063 self_encryption::Error::Generic(format!(
3064 "DataMap chunk not found: {}",
3065 hex::encode(addr)
3066 ))
3067 })?;
3068 let fetched = counter
3069 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
3070 + 1;
3071 if let Some(ref tx) = prog {
3072 let _ =
3073 tx.try_send(DownloadEvent::MapChunkFetched { fetched });
3074 }
3075 Ok::<_, self_encryption::Error>((idx, chunk.content))
3076 }
3077 },
3078 )
3079 .await?;
3080 // CRITICAL: self_encryption::get_root_data_map_parallel
3081 // pairs the returned Vec POSITIONALLY with the input
3082 // hashes via .zip() and discards our idx field.
3083 // rebucketed_unordered preserves first-completion
3084 // order, so sort by idx to restore input order
3085 // before returning.
3086 results.sort_by_key(|(idx, _)| *idx);
3087 Ok(results)
3088 })
3089 };
3090 get_root_data_map_parallel(data_map.clone(), &fetch)
3091 })
3092 .map_err(|e| Error::Encryption(format!("DataMap resolution failed: {e}")))?;
3093
3094 info!(
3095 "Resolved hierarchical DataMap: {} data chunks",
3096 resolved.len()
3097 );
3098 resolved
3099 } else {
3100 data_map.clone()
3101 };
3102
3103 // Phase 2: Now we know the real chunk count.
3104 let total_chunks = root_map.len();
3105 if let Some(ref tx) = progress {
3106 let _ = tx.try_send(DownloadEvent::DataMapResolved { total_chunks });
3107 }
3108
3109 // Phase 3: Fetch and decrypt data chunks with accurate progress.
3110 let fetched_counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
3111 let fetched_for_closure = fetched_counter.clone();
3112 let progress_for_closure = progress.clone();
3113 let peer_reports_for_closure = peer_reports.clone();
3114
3115 let fetch_limiter_outer = self.controller().fetch.clone();
3116 let usable_memory = usable_memory_bytes();
3117 let configured_batch_floor = stream_decrypt_batch_size();
3118 let fetch_cap = fetch_limiter_outer.current();
3119 let decrypt_batch_size = adaptive_stream_decrypt_batch_size(
3120 total_chunks,
3121 fetch_cap,
3122 configured_batch_floor,
3123 usable_memory,
3124 );
3125 info!(
3126 total_chunks,
3127 fetch_cap,
3128 configured_batch_floor,
3129 ?usable_memory,
3130 decrypt_batch_size,
3131 "Selected adaptive stream decrypt batch size"
3132 );
3133
3134 let stream = streaming_decrypt_with_batch_size(
3135 &root_map,
3136 |batch: &[(usize, XorName)]| {
3137 let batch_owned: Vec<(usize, XorName)> = batch.to_vec();
3138 let fetch_context = FileDownloadFetchContext {
3139 total_chunks,
3140 peer_count,
3141 fetched_ref: fetched_for_closure.clone(),
3142 progress_ref: progress_for_closure.clone(),
3143 peer_reports: peer_reports_for_closure.clone(),
3144 };
3145 let fetch_limiter = fetch_limiter_outer.clone();
3146
3147 tokio::task::block_in_place(|| {
3148 handle.block_on(async {
3149 // First pass: try every chunk in the batch. Normal mode
3150 // uses chunk_get_observed (early-return after a found
3151 // peer); diagnostic mode asks every selected closest
3152 // peer and records that sweep before returning bytes.
3153 // Any missing chunk or transient fetch error is encoded
3154 // as Err(hash), so one noisy chunk does not abort the
3155 // whole batch before the deferred retry rounds run.
3156 let first_fetch_context = fetch_context.clone();
3157 let raw: Vec<DownloadBatchEntry> = rebucketed_unordered(
3158 &fetch_limiter,
3159 batch_owned,
3160 |(idx, hash): (usize, XorName)| {
3161 let fetch_context = first_fetch_context.clone();
3162 async move {
3163 self.download_fetch_file_chunk(
3164 idx,
3165 hash,
3166 fetch_context,
3167 false,
3168 FIRST_DIAGNOSTIC_FETCH_ATTEMPT,
3169 )
3170 .await
3171 }
3172 },
3173 )
3174 .await?;
3175
3176 // Partition: things we already have vs the
3177 // deferred set we need to retry.
3178 let mut results: Vec<(usize, bytes::Bytes)> = Vec::new();
3179 let mut deferred: Vec<(usize, XorName)> = Vec::new();
3180 for (idx, inner) in raw {
3181 match inner {
3182 Ok(bytes) => results.push((idx, bytes)),
3183 Err(hash) => deferred.push((idx, hash)),
3184 }
3185 }
3186
3187 // Deferred retry pass: retry the deferred chunks
3188 // in CONCURRENT rounds (reusing the fetch
3189 // limiter's cap), not serially. The first round
3190 // fires immediately — most deferrals on a
3191 // healthy-but-lossy link are peer-side noise
3192 // that clears in well under a second, and
3193 // serializing them behind mandatory multi-second
3194 // sleeps was the single biggest throughput sink
3195 // on such links (a batch deferring ~20 chunks
3196 // burned minutes of near-zero throughput even
3197 // though every chunk succeeded on its first
3198 // retry). Only chunks that survive a round get a
3199 // longer back-off before the next, so genuine
3200 // saturation still gets time to settle.
3201 if !deferred.is_empty() {
3202 // Round delays in seconds. Round 0 is
3203 // immediate; later rounds back off to ride
3204 // out sustained saturation.
3205 const DEFERRED_ROUND_DELAYS_SECS: [u64; 3] = [0, 15, 45];
3206 info!(
3207 "Deferring {} chunk(s) for concurrent retry after batch settles",
3208 deferred.len()
3209 );
3210 let mut remaining = deferred;
3211 for (round, &delay_secs) in
3212 DEFERRED_ROUND_DELAYS_SECS.iter().enumerate()
3213 {
3214 if remaining.is_empty() {
3215 break;
3216 }
3217 if delay_secs > 0 {
3218 tokio::time::sleep(std::time::Duration::from_secs(delay_secs))
3219 .await;
3220 }
3221 info!(
3222 "Deferred retry round {}/{}: {} chunk(s)",
3223 round + 1,
3224 DEFERRED_ROUND_DELAYS_SECS.len(),
3225 remaining.len(),
3226 );
3227 let round_input = std::mem::take(&mut remaining);
3228 let retry_fetch_context = fetch_context.clone();
3229 let round_results: Vec<DownloadBatchEntry> = rebucketed_unordered(
3230 &fetch_limiter,
3231 round_input,
3232 |(idx, hash): (usize, XorName)| {
3233 let fetch_context = retry_fetch_context.clone();
3234 async move {
3235 self.download_fetch_file_chunk(
3236 idx,
3237 hash,
3238 fetch_context,
3239 true,
3240 round + DEFERRED_RETRY_ATTEMPT_OFFSET,
3241 )
3242 .await
3243 }
3244 },
3245 )
3246 .await?;
3247 for (idx, inner) in round_results {
3248 match inner {
3249 Ok(bytes) => results.push((idx, bytes)),
3250 Err(hash) => remaining.push((idx, hash)),
3251 }
3252 }
3253 }
3254 if let Some((_, hash)) = remaining.first() {
3255 return Err(self_encryption::Error::Generic(format!(
3256 "Chunk not found after {} deferred retry rounds: {}",
3257 DEFERRED_ROUND_DELAYS_SECS.len(),
3258 hex::encode(hash.0),
3259 )));
3260 }
3261 }
3262
3263 // streaming_decrypt itself sort_by_keys before
3264 // zipping, but the same closure is also passed
3265 // through get_root_data_map_parallel internally
3266 // (see self_encryption::stream_decrypt.rs::new), and
3267 // THAT path zips positionally without sorting. Sort
3268 // here so both consumers see input order.
3269 results.sort_by_key(|(idx, _)| *idx);
3270 Ok(results)
3271 })
3272 })
3273 },
3274 decrypt_batch_size,
3275 )
3276 .map_err(|e| Error::Encryption(format!("streaming decrypt failed: {e}")))?;
3277
3278 // Drive the iterator (each `next()` runs the batched fetch via
3279 // block_in_place) and hand each decrypted segment to the sink in
3280 // order. Awaiting the sink between items yields back to the runtime so
3281 // a bounded sink can apply backpressure.
3282 let mut bytes_total = 0u64;
3283 for chunk_result in stream {
3284 let chunk: Bytes =
3285 chunk_result.map_err(|e| Error::Encryption(format!("decryption failed: {e}")))?;
3286 bytes_total += chunk.len() as u64;
3287 on_chunk(chunk).await?;
3288 }
3289 Ok(bytes_total)
3290 }
3291
3292 /// Download and decrypt a file to disk, with optional progress events.
3293 ///
3294 /// Same as [`Client::file_download`] but sends [`DownloadEvent`]s for UI
3295 /// feedback. Streams to a temp file (one decrypt batch resident at a time)
3296 /// and renames atomically on success. A `TempDownload` guard removes the
3297 /// staging file on any error path, including a panic.
3298 pub async fn file_download_with_progress(
3299 &self,
3300 data_map: &DataMap,
3301 output: &Path,
3302 progress: Option<mpsc::Sender<DownloadEvent>>,
3303 ) -> Result<u64> {
3304 self.file_download_with_progress_using_peer_count(
3305 data_map,
3306 output,
3307 progress,
3308 self.config().close_group_size,
3309 )
3310 .await
3311 }
3312
3313 /// Download and decrypt a file to disk with progress events, trying
3314 /// `peer_count` closest peers for every chunk fetch.
3315 ///
3316 /// Streams to a temp file (one decrypt batch resident at a time) and
3317 /// renames atomically on success.
3318 async fn file_download_with_progress_using_peer_count(
3319 &self,
3320 data_map: &DataMap,
3321 output: &Path,
3322 progress: Option<mpsc::Sender<DownloadEvent>>,
3323 peer_count: usize,
3324 ) -> Result<u64> {
3325 self.file_download_with_progress_using_peer_count_and_reports(
3326 data_map, output, progress, peer_count, None,
3327 )
3328 .await
3329 }
3330
3331 async fn file_download_with_progress_using_peer_count_and_reports(
3332 &self,
3333 data_map: &DataMap,
3334 output: &Path,
3335 progress: Option<mpsc::Sender<DownloadEvent>>,
3336 peer_count: usize,
3337 peer_reports: Option<Arc<Mutex<Vec<RecordedFileChunkPeerSweep>>>>,
3338 ) -> Result<u64> {
3339 debug!("Downloading file to {}", output.display());
3340
3341 let parent = output.parent().unwrap_or_else(|| Path::new("."));
3342 let unique: u64 = rand::random();
3343 let tmp_path = parent.join(format!(".ant_download_{}_{unique}.tmp", std::process::id()));
3344
3345 // Guard removes the staging file on any early return OR a panic unwind
3346 // out of the `block_in_place` decrypt loop; defused only by a
3347 // successful commit(). Centralizes what used to be three duplicated
3348 // cleanup arms.
3349 let tmp = TempDownload::new(tmp_path);
3350 let mut file = std::fs::File::create(tmp.path())?;
3351
3352 let bytes_written = self
3353 .download_decrypted_chunks(data_map, progress, peer_count, peer_reports, |bytes| {
3354 let r = file.write_all(&bytes).map_err(Error::from);
3355 std::future::ready(r)
3356 })
3357 .await?;
3358 file.flush()?;
3359 drop(file); // close the handle before rename (Windows won't rename an open file)
3360
3361 tmp.commit(output)?;
3362 info!(
3363 "File downloaded: {bytes_written} bytes written to {}",
3364 output.display()
3365 );
3366 Ok(bytes_written)
3367 }
3368
3369 /// Download and decrypt a file, streaming the plaintext to `sink` instead
3370 /// of writing to disk.
3371 ///
3372 /// Constant memory (one decrypt batch resident at a time); the caller
3373 /// receives bytes progressively as each batch decrypts, suitable for
3374 /// forwarding to an HTTP chunked body or a gRPC response stream. The
3375 /// bounded `sink` applies backpressure. If the receiver is dropped (e.g.
3376 /// the client disconnected) the download stops early and returns
3377 /// [`Error::Cancelled`].
3378 ///
3379 /// The channel item type is `Result<Bytes, Error>`, so the caller sets up:
3380 ///
3381 /// ```ignore
3382 /// let (tx, rx) = tokio::sync::mpsc::channel::<Result<Bytes, Error>>(8);
3383 /// ```
3384 ///
3385 /// Typically the caller `tokio::spawn`s this and converts the matching
3386 /// `Receiver` into its response stream. Requires a multi-threaded Tokio
3387 /// runtime (the decrypt iterator uses `block_in_place`).
3388 pub async fn file_download_to_sender(
3389 &self,
3390 data_map: &DataMap,
3391 sink: mpsc::Sender<std::result::Result<Bytes, Error>>,
3392 progress: Option<mpsc::Sender<DownloadEvent>>,
3393 ) -> Result<u64> {
3394 let peer_count = self.config().close_group_size;
3395 self.download_decrypted_chunks(data_map, progress, peer_count, None, |bytes| {
3396 let sink = sink.clone();
3397 async move {
3398 sink.send(Ok(bytes))
3399 .await
3400 .map_err(|_| Error::Cancelled("download stream receiver dropped".into()))
3401 }
3402 })
3403 .await
3404 }
3405}
3406
3407#[cfg(test)]
3408#[allow(clippy::unwrap_used)]
3409mod tests {
3410 use super::*;
3411
3412 #[test]
3413 fn merkle_store_cap_clamps_to_memory_bound() {
3414 // Below the ceiling: pass the adaptive cap through unchanged.
3415 assert_eq!(merkle_store_cap(8), 8);
3416 assert_eq!(merkle_store_cap(64), 64);
3417 // A configured `adaptive.max.store` above the ceiling must be clamped so
3418 // the whole-file fan-out can't pin more than ~256 MB of bodies (PR #137).
3419 assert_eq!(merkle_store_cap(512), MERKLE_STORE_MAX_IN_FLIGHT);
3420 assert_eq!(merkle_store_cap(usize::MAX), MERKLE_STORE_MAX_IN_FLIGHT);
3421 // Never zero — always make progress.
3422 assert_eq!(merkle_store_cap(0), 1);
3423 }
3424
3425 #[test]
3426 fn distributed_sample_indices_spreads_across_large_file() {
3427 // cap 5 over 100 chunks: first and last included, evenly spread.
3428 assert_eq!(distributed_sample_indices(100, 5), vec![0, 24, 49, 74, 99]);
3429 }
3430
3431 #[test]
3432 fn distributed_sample_indices_covers_whole_small_file() {
3433 // total <= cap returns every index, preserving the exact
3434 // "whole file sampled" detection in estimate_upload_cost.
3435 assert_eq!(distributed_sample_indices(3, 5), vec![0, 1, 2]);
3436 assert_eq!(distributed_sample_indices(5, 5), vec![0, 1, 2, 3, 4]);
3437 }
3438
3439 #[test]
3440 fn distributed_sample_indices_is_in_range_and_increasing() {
3441 assert!(distributed_sample_indices(0, 5).is_empty());
3442 assert_eq!(distributed_sample_indices(1, 5), vec![0]);
3443 for total in 1..200usize {
3444 let idx = distributed_sample_indices(total, 5);
3445 assert_eq!(*idx.first().unwrap(), 0);
3446 assert_eq!(*idx.last().unwrap(), total - 1);
3447 assert!(idx.iter().all(|&i| i < total));
3448 assert!(idx.windows(2).all(|w| w[0] < w[1]));
3449 }
3450 }
3451
3452 #[test]
3453 fn disk_space_check_passes_for_small_file() {
3454 // A 1 KB file should always pass the disk space check
3455 check_disk_space_for_spill(1024).unwrap();
3456 }
3457
3458 #[test]
3459 fn disk_space_check_fails_for_absurd_size() {
3460 // Requesting space for a 1 exabyte file should fail on any real system
3461 let result = check_disk_space_for_spill(u64::MAX / 2);
3462 assert!(result.is_err());
3463 let err = result.unwrap_err();
3464 assert!(
3465 matches!(err, Error::InsufficientDiskSpace(_)),
3466 "expected InsufficientDiskSpace, got: {err}"
3467 );
3468 }
3469
3470 #[test]
3471 fn adaptive_stream_decrypt_batch_size_tracks_fetch_headroom() {
3472 let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, Some(u64::MAX));
3473
3474 assert_eq!(batch_size, 64 * DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER);
3475 }
3476
3477 #[test]
3478 fn adaptive_stream_decrypt_batch_size_caps_to_total_chunks() {
3479 let batch_size = adaptive_stream_decrypt_batch_size(12, 64, 10, Some(u64::MAX));
3480
3481 assert_eq!(batch_size, 12);
3482 }
3483
3484 #[test]
3485 fn adaptive_stream_decrypt_batch_size_honours_configured_floor() {
3486 let batch_size = adaptive_stream_decrypt_batch_size(1_000, 1, 32, None);
3487
3488 assert_eq!(batch_size, 32);
3489 }
3490
3491 #[test]
3492 fn adaptive_stream_decrypt_batch_size_does_not_expand_without_memory_reading() {
3493 let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, None);
3494
3495 assert_eq!(batch_size, 10);
3496 }
3497
3498 #[test]
3499 fn adaptive_stream_decrypt_batch_size_caps_to_memory_budget() {
3500 let estimated_bytes_per_chunk = (self_encryption::MAX_CHUNK_SIZE as u64)
3501 .saturating_mul(DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER)
3502 .max(1);
3503 let usable_memory = estimated_bytes_per_chunk
3504 .saturating_mul(16)
3505 .saturating_mul(DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR);
3506 let batch_size = adaptive_stream_decrypt_batch_size(1_000, 256, 10, Some(usable_memory));
3507
3508 assert_eq!(batch_size, 16);
3509 }
3510
3511 #[test]
3512 fn adaptive_stream_decrypt_batch_size_keeps_one_chunk_when_memory_is_tight() {
3513 let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, Some(1));
3514
3515 assert_eq!(batch_size, 1);
3516 }
3517
3518 #[test]
3519 fn cached_merkle_covers_only_when_all_addresses_have_proofs() {
3520 let covered = compute_address(&Bytes::from_static(b"covered"));
3521 let extra = compute_address(&Bytes::from_static(b"extra"));
3522 let missing = compute_address(&Bytes::from_static(b"missing"));
3523 let cached = MerkleBatchPaymentResult {
3524 proofs: HashMap::from([(covered, vec![1]), (extra, vec![2])]),
3525 chunk_count: 2,
3526 storage_cost_atto: "0".to_string(),
3527 gas_cost_wei: 0,
3528 merkle_payment_timestamp: 0,
3529 };
3530
3531 assert!(cached_merkle_covers_addresses(&cached, &[covered]));
3532 assert!(cached_merkle_covers_addresses(&cached, &[covered, extra]));
3533 assert!(!cached_merkle_covers_addresses(
3534 &cached,
3535 &[covered, missing]
3536 ));
3537 }
3538
3539 /// A partial merkle payment leaves some addresses without a proof. Those
3540 /// must be split out so `upload_merkle_from_spill` reports them as failed
3541 /// (`PartialUpload`) instead of aborting the whole file — preserving the
3542 /// addresses' original order in each group.
3543 #[test]
3544 fn partition_addresses_by_proof_splits_paid_and_unpaid() {
3545 let paid_a = [1u8; 32];
3546 let unpaid_b = [2u8; 32];
3547 let paid_c = [3u8; 32];
3548 let unpaid_d = [4u8; 32];
3549 let proofs: HashMap<[u8; 32], Vec<u8>> =
3550 HashMap::from([(paid_a, vec![0xaa]), (paid_c, vec![0xcc])]);
3551
3552 let (to_store, missing) =
3553 partition_addresses_by_proof(&[paid_a, unpaid_b, paid_c, unpaid_d], &proofs);
3554
3555 assert_eq!(to_store, vec![paid_a, paid_c]);
3556 assert_eq!(missing, vec![unpaid_b, unpaid_d]);
3557 }
3558
3559 /// A wave that returns `Ok` contributes its stored chunks, parsed cost, and
3560 /// stats; nothing is recorded as failed.
3561 #[test]
3562 fn fold_single_wave_keeps_ok_wave() {
3563 let stored = vec![[1u8; 32], [2u8; 32]];
3564 let stats = WaveAggregateStats {
3565 chunk_attempts_total: 7,
3566 ..Default::default()
3567 };
3568
3569 let outcome = fold_single_wave(Ok((stored.clone(), "100".to_string(), 9, stats))).unwrap();
3570
3571 assert_eq!(outcome.stored, stored);
3572 assert!(outcome.failed.is_empty());
3573 assert_eq!(outcome.storage_atto.to_string(), "100");
3574 assert_eq!(outcome.gas_wei, 9);
3575 assert_eq!(outcome.stats.chunk_attempts_total, 7);
3576 }
3577
3578 /// The core V2-461 semantic: a wave short of quorum (`PartialUpload`) is
3579 /// recoverable — its stored chunks, failed chunks, and on-chain spend are
3580 /// folded so the caller can continue to the next wave rather than aborting
3581 /// the whole file.
3582 #[test]
3583 fn fold_single_wave_folds_partial_upload() {
3584 let stored = vec![[3u8; 32]];
3585 let failed = vec![([4u8; 32], "short of quorum".to_string())];
3586 let err = Error::PartialUpload {
3587 stored: stored.clone(),
3588 stored_count: 1,
3589 failed: failed.clone(),
3590 failed_count: 1,
3591 total_chunks: 2,
3592 spend: Box::new(PartialUploadSpend {
3593 storage_cost_atto: "250".to_string(),
3594 gas_cost_wei: 11,
3595 }),
3596 reason: "wave store failed after retries".to_string(),
3597 };
3598
3599 let outcome = fold_single_wave(Err(err)).unwrap();
3600
3601 assert_eq!(outcome.stored, stored);
3602 assert_eq!(outcome.failed, failed);
3603 assert_eq!(outcome.storage_atto.to_string(), "250");
3604 assert_eq!(outcome.gas_wei, 11);
3605 // `PartialUpload` carries no stats, so the failed wave contributes none.
3606 assert_eq!(outcome.stats.chunk_attempts_total, 0);
3607 }
3608
3609 /// A non-`PartialUpload` error (wallet/payment-infrastructure failure) is
3610 /// fatal and must abort the file, not be folded into the failed set.
3611 #[test]
3612 fn fold_single_wave_propagates_fatal_error() {
3613 let result = fold_single_wave(Err(Error::Payment("wallet unavailable".to_string())));
3614
3615 assert!(
3616 matches!(result, Err(Error::Payment(_))),
3617 "fatal payment error must propagate, got: {result:?}"
3618 );
3619 }
3620
3621 #[test]
3622 fn partition_addresses_by_proof_handles_all_or_nothing() {
3623 let a = [5u8; 32];
3624 let b = [6u8; 32];
3625
3626 // No proofs at all → every address is missing.
3627 let empty: HashMap<[u8; 32], Vec<u8>> = HashMap::new();
3628 let (to_store, missing) = partition_addresses_by_proof(&[a, b], &empty);
3629 assert!(to_store.is_empty());
3630 assert_eq!(missing, vec![a, b]);
3631
3632 // All proofs present → nothing missing.
3633 let full: HashMap<[u8; 32], Vec<u8>> = HashMap::from([(a, vec![1]), (b, vec![2])]);
3634 let (to_store, missing) = partition_addresses_by_proof(&[a, b], &full);
3635 assert_eq!(to_store, vec![a, b]);
3636 assert!(missing.is_empty());
3637 }
3638
3639 #[test]
3640 fn chunk_spill_round_trip() {
3641 let mut spill = ChunkSpill::new().unwrap();
3642 let data1 = vec![0xAA; 1024];
3643 let data2 = vec![0xBB; 2048];
3644
3645 spill.push(&data1).unwrap();
3646 spill.push(&data2).unwrap();
3647
3648 assert_eq!(spill.len(), 2);
3649 assert_eq!(spill.total_bytes(), 1024 + 2048);
3650 let chunk_entries = spill.chunk_entries().unwrap();
3651 let entry_total: u64 = chunk_entries.iter().map(|(_, size)| *size).sum();
3652 assert_eq!(entry_total, 1024 + 2048);
3653
3654 // Read back and verify
3655 let chunk1 = spill.read_chunk(spill.addresses.first().unwrap()).unwrap();
3656 assert_eq!(&chunk1[..], &data1[..]);
3657
3658 let chunk2 = spill.read_chunk(spill.addresses.get(1).unwrap()).unwrap();
3659 assert_eq!(&chunk2[..], &data2[..]);
3660
3661 // Verify waves with 1-chunk wave size
3662 let waves: Vec<_> = spill.addresses.chunks(1).collect();
3663 assert_eq!(waves.len(), 2);
3664 }
3665
3666 #[test]
3667 fn chunk_spill_cleanup_on_drop() {
3668 let dir;
3669 {
3670 let spill = ChunkSpill::new().unwrap();
3671 dir = spill.dir.clone();
3672 assert!(dir.exists());
3673 }
3674 // After drop, the directory should be cleaned up
3675 assert!(!dir.exists(), "spill dir should be removed on drop");
3676 }
3677
3678 #[test]
3679 fn chunk_spill_deduplicates_identical_content() {
3680 let mut spill = ChunkSpill::new().unwrap();
3681 let data = vec![0xCC; 512];
3682
3683 spill.push(&data).unwrap();
3684 spill.push(&data).unwrap(); // same content, should be skipped
3685 spill.push(&data).unwrap(); // again
3686
3687 assert_eq!(spill.len(), 1, "duplicate chunks should be deduplicated");
3688 assert_eq!(
3689 spill.total_bytes(),
3690 512,
3691 "total_bytes should count unique only"
3692 );
3693
3694 // Different content should still be added
3695 let data2 = vec![0xDD; 256];
3696 spill.push(&data2).unwrap();
3697 assert_eq!(spill.len(), 2);
3698 assert_eq!(spill.total_bytes(), 512 + 256);
3699 }
3700}
3701
3702/// Compile-time assertions that Client file method futures are Send.
3703#[cfg(test)]
3704mod send_assertions {
3705 use super::*;
3706
3707 fn _assert_send<T: Send>(_: &T) {}
3708
3709 #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
3710 async fn _file_upload_is_send(client: &Client) {
3711 let fut = client.file_upload(Path::new("/dev/null"));
3712 _assert_send(&fut);
3713 }
3714
3715 #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
3716 async fn _file_upload_with_mode_is_send(client: &Client) {
3717 let fut = client.file_upload_with_mode(Path::new("/dev/null"), PaymentMode::Auto);
3718 _assert_send(&fut);
3719 }
3720
3721 #[allow(
3722 dead_code,
3723 unreachable_code,
3724 unused_variables,
3725 clippy::diverging_sub_expression
3726 )]
3727 async fn _file_download_is_send(client: &Client) {
3728 let dm: DataMap = todo!();
3729 let fut = client.file_download(&dm, Path::new("/dev/null"));
3730 _assert_send(&fut);
3731 }
3732}