ant_core/data/client/file.rs
1//! File operations using streaming self-encryption.
2//!
3//! Upload files directly from disk without loading them entirely into memory.
4//! Uses `stream_encrypt` to process files in 8KB chunks, encrypting and
5//! uploading each piece as it's produced.
6//!
7//! Encrypted chunks are spilled to a temporary directory during encryption
8//! so that peak memory usage is bounded to one wave (~256 MB for 64 × 4 MB
9//! chunks) regardless of file size.
10//!
11//! For in-memory data uploads, see the `data` module.
12
13use crate::data::client::adaptive::{observe_op, rebucketed_unordered};
14use crate::data::client::batch::{
15 finalize_batch_payment, PaymentIntent, PreparedChunk, WaveAggregateStats,
16};
17use crate::data::client::chunk::ChunkPeerGetResult;
18use crate::data::client::classify_error;
19use crate::data::client::merkle::{
20 chunk_contents_for_upload_addresses, finalize_merkle_batch, merkle_deferred_retry,
21 merkle_store_with_retry, should_use_merkle, MerkleBatchPaymentResult, PaymentMode,
22 PreparedMerkleBatch, DEFERRED_ROUND_DELAYS_SECS,
23};
24use crate::data::client::Client;
25use crate::data::error::{Error, PartialUploadSpend, Result};
26use ant_protocol::evm::{Amount, PaymentQuote, QuoteHash, TxHash, MAX_LEAVES};
27use ant_protocol::transport::{MultiAddr, PeerId};
28use ant_protocol::{compute_address, XorName as ChunkAddress, DATA_TYPE_CHUNK};
29use bytes::Bytes;
30use fs2::FileExt;
31use futures::stream::{self, StreamExt};
32use self_encryption::{
33 get_root_data_map_parallel, stream_decrypt_batch_size, stream_encrypt,
34 streaming_decrypt_with_batch_size, DataMap,
35};
36use std::collections::{HashMap, HashSet};
37use std::io::Write;
38use std::num::NonZeroUsize;
39use std::path::{Path, PathBuf};
40use std::sync::{Arc, Mutex};
41use tokio::runtime::Handle;
42use tokio::sync::mpsc;
43use tracing::{debug, info, warn};
44use xor_name::XorName;
45
46/// Progress events emitted during file upload for UI feedback.
47#[derive(Debug, Clone)]
48pub enum UploadEvent {
49 /// A chunk has been encrypted and spilled to disk.
50 Encrypting { chunks_done: usize },
51 /// File encryption complete.
52 Encrypted { total_chunks: usize },
53 /// Starting quote collection for a wave.
54 QuotingChunks {
55 wave: usize,
56 total_waves: usize,
57 chunks_in_wave: usize,
58 },
59 /// A chunk has been quoted (peer discovery + price received).
60 /// This is the slow phase — each quote involves network round-trips.
61 ChunkQuoted { quoted: usize, total: usize },
62 /// A chunk has been stored on the network.
63 ChunkStored { stored: usize, total: usize },
64 /// A wave has completed.
65 WaveComplete {
66 wave: usize,
67 total_waves: usize,
68 stored_so_far: usize,
69 total: usize,
70 },
71}
72
73/// Progress events emitted during file download for UI feedback.
74#[derive(Debug, Clone)]
75pub enum DownloadEvent {
76 /// Resolving hierarchical DataMap to discover real chunk count.
77 ResolvingDataMap { total_map_chunks: usize },
78 /// A DataMap chunk has been fetched during resolution.
79 MapChunkFetched { fetched: usize },
80 /// DataMap resolved — total data chunk count now known.
81 DataMapResolved { total_chunks: usize },
82 /// Data chunks are being fetched from the network.
83 ChunksFetched { fetched: usize, total: usize },
84}
85
86/// File download result when peer-health diagnostics are enabled.
87#[derive(Debug, Clone)]
88pub struct FileDownloadWithPeerReport {
89 /// Number of plaintext bytes written to the destination.
90 pub bytes_written: u64,
91 /// Per-file-chunk closest-peer GET results collected during the actual download.
92 pub chunk_reports: Vec<FileChunkPeerReport>,
93}
94
95/// Closest-peer GET results for one file chunk.
96#[derive(Debug, Clone)]
97pub struct FileChunkPeerReport {
98 /// 1-based chunk index in the resolved file DataMap.
99 pub index: usize,
100 /// Chunk address.
101 pub address: ChunkAddress,
102 /// All diagnostic GET sweeps attempted for this chunk.
103 pub sweeps: Vec<FileChunkPeerSweepReport>,
104}
105
106/// One all-peer diagnostic GET sweep for a file chunk.
107#[derive(Debug, Clone)]
108pub struct FileChunkPeerSweepReport {
109 /// 1-based attempt number for this chunk.
110 pub attempt: usize,
111 /// Whether this sweep happened during a deferred retry round.
112 pub deferred_retry: bool,
113 /// DHT lookup / sweep-level error, if the closest-peer group could not be queried.
114 pub error: Option<String>,
115 /// Per-peer results, sorted closest first.
116 pub peers: Vec<FileChunkPeerReportPeer>,
117}
118
119/// One peer result in a [`FileChunkPeerReport`].
120#[derive(Debug, Clone)]
121pub struct FileChunkPeerReportPeer {
122 /// Peer queried for the chunk.
123 pub peer_id: PeerId,
124 /// Known network addresses used for the peer.
125 pub peer_addrs: Vec<MultiAddr>,
126 /// XOR distance from `peer_id` to the chunk address.
127 pub xor_distance: ChunkAddress,
128 /// Whether this peer returned the chunk or why it did not.
129 pub status: FileChunkPeerStatus,
130}
131
132/// Peer-level file chunk GET diagnostic status.
133#[derive(Debug, Clone)]
134pub enum FileChunkPeerStatus {
135 /// The peer returned the chunk.
136 Found { bytes: usize },
137 /// The peer responded authoritatively that it does not store the chunk.
138 NotFound,
139 /// The peer did not respond before the timeout.
140 Timeout { message: String },
141 /// The transport/network path to the peer failed.
142 NetworkError { message: String },
143 /// Any other per-peer error.
144 Error { message: String },
145}
146
147/// One entry in the per-chunk quote list returned by
148/// [`Client::get_store_quotes`]: the responding peer, its addresses, the
149/// signed quote it returned, and the payment amount it is demanding.
150type QuoteEntry = (PeerId, Vec<MultiAddr>, PaymentQuote, Amount);
151
152type DownloadBatchEntry = (usize, std::result::Result<Bytes, XorName>);
153
154#[derive(Debug, Clone)]
155struct RecordedFileChunkPeerSweep {
156 index: usize,
157 address: ChunkAddress,
158 sweep: FileChunkPeerSweepReport,
159}
160
161#[derive(Clone)]
162struct FileDownloadFetchContext {
163 total_chunks: usize,
164 peer_count: usize,
165 fetched_ref: Arc<std::sync::atomic::AtomicUsize>,
166 progress_ref: Option<mpsc::Sender<DownloadEvent>>,
167 peer_reports: Option<Arc<Mutex<Vec<RecordedFileChunkPeerSweep>>>>,
168}
169
170/// Number of chunks per upload wave (matches batch.rs PAYMENT_WAVE_SIZE).
171const UPLOAD_WAVE_SIZE: usize = 64;
172
173/// Stream decrypt batches should be larger than fetch fan-out so
174/// the rolling fetch scheduler can keep launching new chunk GETs as earlier
175/// ones complete, instead of stopping at each self-encryption batch boundary.
176const DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER: usize = 4;
177
178/// Use at most this fraction of currently usable RAM for one decrypt batch.
179const DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR: u64 = 4;
180
181/// A decrypt batch briefly holds encrypted chunk bytes, decrypted chunk bytes,
182/// and Vec/Bytes overhead. Use a conservative multiplier rather than assuming
183/// payload bytes alone.
184const DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER: u64 = 3;
185
186/// Maximum number of distinct chunk addresses to sample when probing for a
187/// representative quote in [`Client::estimate_upload_cost`].
188///
189/// Bounded small so we never spend more than a couple of round-trips on the
190/// `AlreadyStored` retry path, which only matters when many leading chunks
191/// of a file already live on the network.
192const ESTIMATE_SAMPLE_CAP: usize = 5;
193
194/// First diagnostic all-peer fetch attempt for a file chunk.
195const FIRST_DIAGNOSTIC_FETCH_ATTEMPT: usize = 1;
196
197/// Deferred retry attempt number for retry round 0.
198const DEFERRED_RETRY_ATTEMPT_OFFSET: usize = 2;
199
200/// Pick up to `cap` chunk indices spread evenly across `[0, total)`, always
201/// including the first and last chunk.
202///
203/// Sampling the *first* N chunks biases the probe: a file sharing a leading
204/// prefix with a prior upload (compressed archives, similar headers) reports
205/// those chunks as `AlreadyStored` even when the tail is new, so a positional
206/// sample looks in the worst possible place. Spreading the sample means a
207/// single new chunk anywhere in the file yields a real price.
208///
209/// Returns `[0]` for a single chunk and every index when `total <= cap`, so
210/// [`Client::estimate_upload_cost`] can still detect the "whole file sampled"
211/// case. Indices are strictly increasing.
212fn distributed_sample_indices(total: usize, cap: usize) -> Vec<usize> {
213 if total == 0 {
214 return Vec::new();
215 }
216 let sample_limit = total.min(cap);
217 if sample_limit <= 1 {
218 return vec![0];
219 }
220 let mut indices: Vec<usize> = (0..sample_limit)
221 .map(|i| i * (total - 1) / (sample_limit - 1))
222 .collect();
223 indices.dedup(); // defensive: already strictly increasing for cap >= 2
224 indices
225}
226
227fn file_chunk_sweep_report_from_peer_results(
228 attempt: usize,
229 deferred_retry: bool,
230 results: &[ChunkPeerGetResult],
231) -> (Option<Bytes>, FileChunkPeerSweepReport) {
232 let mut content = None;
233 let peers = results
234 .iter()
235 .map(|result| {
236 if content.is_none() {
237 if let Ok(Some(chunk)) = &result.chunk_result {
238 content = Some(chunk.content.clone());
239 }
240 }
241
242 FileChunkPeerReportPeer {
243 peer_id: result.peer_id,
244 peer_addrs: result.peer_addrs.clone(),
245 xor_distance: result.xor_distance,
246 status: file_chunk_peer_status(&result.chunk_result),
247 }
248 })
249 .collect();
250
251 (
252 content,
253 FileChunkPeerSweepReport {
254 attempt,
255 deferred_retry,
256 error: None,
257 peers,
258 },
259 )
260}
261
262fn file_chunk_sweep_report_from_error(
263 attempt: usize,
264 deferred_retry: bool,
265 error: &Error,
266) -> FileChunkPeerSweepReport {
267 FileChunkPeerSweepReport {
268 attempt,
269 deferred_retry,
270 error: Some(error.to_string()),
271 peers: Vec::new(),
272 }
273}
274
275fn file_chunk_reports_from_recorded_sweeps(
276 mut sweeps: Vec<RecordedFileChunkPeerSweep>,
277) -> Vec<FileChunkPeerReport> {
278 sweeps.sort_by_key(|record| (record.index, record.sweep.attempt));
279
280 let mut reports: Vec<FileChunkPeerReport> = Vec::new();
281 for record in sweeps {
282 if let Some(report) = reports
283 .last_mut()
284 .filter(|report| report.index == record.index)
285 {
286 report.sweeps.push(record.sweep);
287 continue;
288 }
289
290 reports.push(FileChunkPeerReport {
291 index: record.index,
292 address: record.address,
293 sweeps: vec![record.sweep],
294 });
295 }
296
297 reports
298}
299
300fn file_chunk_peer_status(
301 chunk_result: &std::result::Result<Option<ant_protocol::DataChunk>, Error>,
302) -> FileChunkPeerStatus {
303 match chunk_result {
304 Ok(Some(chunk)) => FileChunkPeerStatus::Found {
305 bytes: chunk.content.len(),
306 },
307 Ok(None) => FileChunkPeerStatus::NotFound,
308 Err(Error::Timeout(e)) => FileChunkPeerStatus::Timeout { message: e.clone() },
309 Err(Error::Network(e)) => FileChunkPeerStatus::NetworkError { message: e.clone() },
310 Err(e) => FileChunkPeerStatus::Error {
311 message: e.to_string(),
312 },
313 }
314}
315
316/// Gas used by one `pay_for_quotes` transaction that packs up to
317/// `UPLOAD_WAVE_SIZE` (quote_hash, rewards_address, amount) entries.
318///
319/// `batch_pay` in `batch.rs` flattens every chunk's close-group quotes into a
320/// single EVM call, so the dominant cost is the SSTOREs for each entry plus
321/// the base tx overhead. On Arbitrum that is roughly
322/// `21_000 + 64 × (20_000 + small)` ≈ 1.3M; we round up to 1.5M as a
323/// conservative per-wave upper bound.
324const GAS_PER_WAVE_TX: u128 = 1_500_000;
325
326/// Gas used by one merkle batch payment transaction.
327///
328/// One on-chain tx per merkle sub-batch, but each tx verifies a merkle tree
329/// and posts a pool commitment, so budget higher than a plain transfer.
330const GAS_PER_MERKLE_TX: u128 = 500_000;
331
332/// Advisory gas price (wei/gas) used to turn the gas estimate into an ETH
333/// figure when no live gas oracle is consulted.
334///
335/// Arbitrum One typically settles around 0.1 gwei on quiet blocks; we use
336/// that as the default so the CLI prints a sensible order-of-magnitude
337/// number. Users should treat the reported gas cost as an estimate, not a
338/// commitment — real gas is bid at submission time.
339const ARBITRUM_GAS_PRICE_WEI: u128 = 100_000_000;
340
341/// Extra headroom percentage for disk space check.
342///
343/// Encrypted chunks are slightly larger than the source data due to padding
344/// and self-encryption overhead. We require file_size + 10% free space in
345/// the temp directory to account for this.
346const DISK_SPACE_HEADROOM_PERCENT: u64 = 10;
347
348/// Temporary on-disk buffer for encrypted chunks.
349///
350/// During file encryption, chunks are written to a temp directory so that
351/// only their 32-byte addresses stay in memory. At upload time chunks are
352/// read back one wave at a time, keeping peak RAM at ~`UPLOAD_WAVE_SIZE × 4 MB`.
353/// Grace period (in seconds) before a spill dir is eligible for stale cleanup.
354///
355/// This is a small TOCTOU guard covering the sub-millisecond window inside
356/// [`ChunkSpill::new`] between `create_dir` and `try_lock_exclusive`. Once a
357/// dir is older than this and its lockfile is releasable, the owning process
358/// is gone and the dir is safe to reap — regardless of how old it is.
359///
360/// The previous policy waited 24 h before reaping any orphan, which meant
361/// that any non-graceful exit (SIGKILL, kernel OOM, panic abort) leaked its
362/// spill dir until the next day's upload — and on a host being restart-looped
363/// by systemd, orphans could fill the disk well within that window.
364const SPILL_STALE_GRACE_SECS: u64 = 30;
365
366/// Prefix for spill directory names to distinguish from user files.
367const SPILL_DIR_PREFIX: &str = "spill_";
368
369/// Lockfile name inside each spill dir to signal active use.
370const SPILL_LOCK_NAME: &str = ".lock";
371
372struct ChunkSpill {
373 /// Directory holding spilled chunk files (named by hex address).
374 dir: PathBuf,
375 /// Lockfile held for the lifetime of this spill (prevents stale cleanup).
376 _lock: std::fs::File,
377 /// Deduplicated list of chunk addresses.
378 addresses: Vec<[u8; 32]>,
379 /// Tracks seen addresses for deduplication.
380 seen: HashSet<[u8; 32]>,
381 /// Byte size per spilled chunk address.
382 sizes: HashMap<[u8; 32], u64>,
383 /// Running total of unique chunk byte sizes (for average-size calculation).
384 total_bytes: u64,
385}
386
387impl ChunkSpill {
388 /// Return the parent directory for all spill dirs: `<data_dir>/spill/`.
389 fn spill_root() -> Result<PathBuf> {
390 use crate::config;
391 let root = config::data_dir()
392 .map_err(|e| Error::Config(format!("cannot determine data dir for spill: {e}")))?
393 .join("spill");
394 Ok(root)
395 }
396
397 /// Create a new spill directory under `<data_dir>/spill/`.
398 ///
399 /// Directory name is `spill_<timestamp>_<random>` so orphans can be
400 /// identified by prefix and cleaned up by age. A lockfile inside the
401 /// dir prevents concurrent cleanup from deleting an active spill.
402 fn new() -> Result<Self> {
403 let root = Self::spill_root()?;
404 std::fs::create_dir_all(&root)?;
405
406 // Clean up stale spill dirs from previous crashed runs.
407 Self::cleanup_stale(&root);
408
409 let now = std::time::SystemTime::now()
410 .duration_since(std::time::UNIX_EPOCH)
411 .unwrap_or_default()
412 .as_secs();
413 let unique: u64 = rand::random();
414 let dir = root.join(format!("{SPILL_DIR_PREFIX}{now}_{unique}"));
415 std::fs::create_dir(&dir)?;
416
417 // Create and hold a lockfile for the lifetime of this spill.
418 // cleanup_stale() will skip dirs with locked files.
419 let lock_path = dir.join(SPILL_LOCK_NAME);
420 let lock_file = std::fs::File::create(&lock_path).map_err(|e| {
421 Error::Io(std::io::Error::new(
422 e.kind(),
423 format!("failed to create spill lockfile: {e}"),
424 ))
425 })?;
426 lock_file.try_lock_exclusive().map_err(|e| {
427 Error::Io(std::io::Error::new(
428 e.kind(),
429 format!("failed to lock spill lockfile: {e}"),
430 ))
431 })?;
432
433 Ok(Self {
434 dir,
435 _lock: lock_file,
436 addresses: Vec::new(),
437 seen: HashSet::new(),
438 sizes: HashMap::new(),
439 total_bytes: 0,
440 })
441 }
442
443 /// Clean up stale spill directories. Best-effort, errors are logged.
444 ///
445 /// A spill dir is reaped when:
446 /// 1. Its name starts with `SPILL_DIR_PREFIX` (ignores unrelated files)
447 /// 2. It is an actual directory, not a symlink (prevents symlink attacks)
448 /// 3. Its timestamp is older than `SPILL_STALE_GRACE_SECS` (TOCTOU guard)
449 /// 4. Its lockfile is releasable — i.e. no live process holds it
450 ///
451 /// The lockfile is the primary correctness gate: a releasable lock means
452 /// the owning `ChunkSpill` has been dropped or the process is gone, so
453 /// the dir is fair game. The grace period covers only the brief window
454 /// inside [`Self::new`] between `create_dir` and `try_lock_exclusive`.
455 ///
456 /// Safe to call concurrently from multiple processes.
457 fn cleanup_stale(root: &Path) {
458 let now = std::time::SystemTime::now()
459 .duration_since(std::time::UNIX_EPOCH)
460 .unwrap_or_default()
461 .as_secs();
462
463 if now == 0 {
464 // Clock is broken (before Unix epoch). Skip cleanup to avoid
465 // misidentifying dirs as stale.
466 warn!("System clock before Unix epoch, skipping spill cleanup");
467 return;
468 }
469
470 let entries = match std::fs::read_dir(root) {
471 Ok(entries) => entries,
472 Err(_) => return,
473 };
474
475 for entry in entries.flatten() {
476 let name = entry.file_name();
477 let name_str = name.to_string_lossy();
478
479 // Only process dirs with our prefix.
480 let suffix = match name_str.strip_prefix(SPILL_DIR_PREFIX) {
481 Some(s) => s,
482 None => continue,
483 };
484
485 // Parse timestamp: "spill_<timestamp>_<random>"
486 let timestamp: u64 = match suffix.split('_').next().and_then(|s| s.parse().ok()) {
487 Some(ts) => ts,
488 None => continue,
489 };
490
491 if now.saturating_sub(timestamp) < SPILL_STALE_GRACE_SECS {
492 continue;
493 }
494
495 // Safety: only delete actual directories, not symlinks.
496 let file_type = match entry.file_type() {
497 Ok(ft) => ft,
498 Err(_) => continue,
499 };
500 if !file_type.is_dir() {
501 continue;
502 }
503
504 let path = entry.path();
505
506 // Check lockfile: if locked, the dir is in active use -- skip it.
507 let lock_path = path.join(SPILL_LOCK_NAME);
508 if let Ok(lock_file) = std::fs::File::open(&lock_path) {
509 use fs2::FileExt;
510 if lock_file.try_lock_exclusive().is_err() {
511 // Lock held by another process -- dir is active.
512 debug!("Skipping active spill dir: {}", path.display());
513 continue;
514 }
515 // We acquired the lock, so no one else holds it.
516 // Drop it before deleting.
517 drop(lock_file);
518 }
519
520 info!("Cleaning up stale spill dir: {}", path.display());
521 if let Err(e) = std::fs::remove_dir_all(&path) {
522 warn!("Failed to clean up stale spill dir {}: {e}", path.display());
523 }
524 }
525 }
526
527 /// Run stale spill cleanup. Call at client startup or periodically.
528 #[allow(dead_code)]
529 pub(crate) fn run_cleanup() {
530 if let Ok(root) = Self::spill_root() {
531 Self::cleanup_stale(&root);
532 }
533 }
534
535 /// Write one encrypted chunk to disk and record its address.
536 ///
537 /// Deduplicates by content address: if the same chunk was already
538 /// spilled, the write and accounting are skipped. This prevents
539 /// double-uploads and inflated quoting metrics.
540 fn push(&mut self, content: &[u8]) -> Result<()> {
541 let address = compute_address(content);
542 if !self.seen.insert(address) {
543 return Ok(());
544 }
545 let path = self.dir.join(hex::encode(address));
546 std::fs::write(&path, content)?;
547 let content_len = content.len() as u64;
548 self.sizes.insert(address, content_len);
549 self.total_bytes += content_len;
550 self.addresses.push(address);
551 Ok(())
552 }
553
554 /// Number of chunks stored.
555 fn len(&self) -> usize {
556 self.addresses.len()
557 }
558
559 /// Total bytes of all spilled chunks.
560 fn total_bytes(&self) -> u64 {
561 self.total_bytes
562 }
563
564 /// Address and byte-size pairs for all spilled chunks.
565 fn chunk_entries(&self) -> Result<Vec<([u8; 32], u64)>> {
566 self.addresses
567 .iter()
568 .map(|address| {
569 self.sizes
570 .get(address)
571 .copied()
572 .map(|size| (*address, size))
573 .ok_or_else(|| {
574 Error::Storage(format!(
575 "missing size for spilled chunk {}",
576 hex::encode(address)
577 ))
578 })
579 })
580 .collect()
581 }
582
583 /// Read a single chunk back from disk by address.
584 fn read_chunk(&self, address: &[u8; 32]) -> Result<Bytes> {
585 let path = self.dir.join(hex::encode(address));
586 let data = std::fs::read(&path).map_err(|e| {
587 Error::Io(std::io::Error::new(
588 e.kind(),
589 format!("reading spilled chunk {}: {e}", hex::encode(address)),
590 ))
591 })?;
592 Ok(Bytes::from(data))
593 }
594
595 /// Read a wave of chunks from disk.
596 fn read_wave(&self, wave_addrs: &[[u8; 32]]) -> Result<Vec<(Bytes, [u8; 32])>> {
597 let mut out = Vec::with_capacity(wave_addrs.len());
598 for addr in wave_addrs {
599 let content = self.read_chunk(addr)?;
600 out.push((content, *addr));
601 }
602 Ok(out)
603 }
604
605 /// Clean up the spill directory.
606 fn cleanup(&self) {
607 if let Err(e) = std::fs::remove_dir_all(&self.dir) {
608 warn!(
609 "Failed to clean up chunk spill dir {}: {e}",
610 self.dir.display()
611 );
612 }
613 }
614}
615
616impl Drop for ChunkSpill {
617 fn drop(&mut self) {
618 self.cleanup();
619 }
620}
621
622fn cached_merkle_covers_addresses(
623 cached: &MerkleBatchPaymentResult,
624 addresses: &[[u8; 32]],
625) -> bool {
626 addresses
627 .iter()
628 .all(|addr| cached.proofs.contains_key(addr))
629}
630
631/// Split `addresses` into `(to_store, missing_proof)`: those that have a merkle
632/// proof in `proofs`, and those that don't.
633///
634/// A partial [`MerkleBatchPaymentResult`] (from a `pay_for_merkle_multi_batch`
635/// where a later sub-batch's payment failed) carries proofs only for the
636/// already-paid sub-batches, so unpaid chunks reach the upload path with no
637/// proof. `upload_waves_merkle` reports those as failed via
638/// [`Error::PartialUpload`] rather than aborting the whole file. Order within
639/// each group follows `addresses`.
640fn partition_addresses_by_proof(
641 addresses: &[[u8; 32]],
642 proofs: &HashMap<[u8; 32], Vec<u8>>,
643) -> (Vec<[u8; 32]>, Vec<[u8; 32]>) {
644 addresses
645 .iter()
646 .copied()
647 .partition(|addr| proofs.contains_key(addr))
648}
649
650/// Build a `PartialUpload` after a fatal merkle store error, with accurate
651/// counts.
652///
653/// A fatal abort can leave chunks in three states: confirmed stored (in
654/// `stored_addresses`), known-failed (in `known_failed` — missing proofs, the
655/// quorum shortfalls and the fatal chunk seen so far), and "in flight when the
656/// abort hit" (neither). Rather than trust the helpers to enumerate the last
657/// group, this derives the failed set authoritatively as *every* `addresses`
658/// entry not in `stored_addresses`, preferring a known per-chunk message and
659/// falling back to the fatal `reason`. That guarantees
660/// `stored_count + failed_count` accounts for the whole file — fixing the
661/// under-reporting where a fatal wave could surface `failed_count = 0` and omit
662/// same-pass successes.
663fn partial_upload_after_fatal(
664 addresses: &[[u8; 32]],
665 stored_addresses: Vec<[u8; 32]>,
666 stored_count: usize,
667 total_chunks: usize,
668 known_failed: Vec<([u8; 32], String)>,
669 spend: PartialUploadSpend,
670 reason: String,
671) -> Error {
672 let stored_set: HashSet<[u8; 32]> = stored_addresses.iter().copied().collect();
673 let mut failed_map: HashMap<[u8; 32], String> = HashMap::new();
674 for (addr, msg) in known_failed {
675 if !stored_set.contains(&addr) {
676 failed_map.entry(addr).or_insert(msg);
677 }
678 }
679 for addr in addresses {
680 if !stored_set.contains(addr) {
681 failed_map.entry(*addr).or_insert_with(|| reason.clone());
682 }
683 }
684 let failed: Vec<([u8; 32], String)> = failed_map.into_iter().collect();
685 let failed_count = failed.len();
686 Error::PartialUpload {
687 stored: stored_addresses,
688 stored_count,
689 failed,
690 failed_count,
691 total_chunks,
692 spend: Box::new(spend),
693 reason,
694 }
695}
696
697/// One wave's contribution to a single-node upload, distilled from its
698/// `batch_upload_chunks_with_events` result.
699#[derive(Debug)]
700struct SingleWaveOutcome {
701 /// Addresses confirmed stored in this wave.
702 stored: Vec<[u8; 32]>,
703 /// Chunks that failed after retries in this wave.
704 failed: Vec<([u8; 32], String)>,
705 /// Storage cost paid on-chain for this wave, in atto-tokens.
706 storage_atto: Amount,
707 /// Gas paid on-chain for this wave, in wei.
708 gas_wei: u128,
709 /// Per-wave store/retry statistics. Empty for a quorum-short wave, whose
710 /// `PartialUpload` carries no stats.
711 stats: WaveAggregateStats,
712}
713
714/// Fold one wave's batch-upload result for the single-node path.
715///
716/// A `PartialUpload` (chunks short of quorum after retries) is **recoverable**:
717/// its stored/failed chunks and on-chain spend are returned so the caller
718/// records them and continues to the next wave, making the file make maximum
719/// progress exactly like `upload_waves_merkle`. Every other error is **fatal**
720/// (wallet/payment-infrastructure failures, missing proofs, spill reads) and is
721/// returned via `Err` to abort the file. Because `UPLOAD_WAVE_SIZE ==
722/// PAYMENT_WAVE_SIZE`, each batch call is exactly one payment wave, so folding a
723/// `PartialUpload` leaves nothing un-attempted within the wave.
724fn fold_single_wave(
725 result: Result<(Vec<[u8; 32]>, String, u128, WaveAggregateStats)>,
726) -> Result<SingleWaveOutcome> {
727 match result {
728 Ok((stored, storage, gas, stats)) => Ok(SingleWaveOutcome {
729 stored,
730 failed: Vec::new(),
731 storage_atto: storage.parse().unwrap_or(Amount::ZERO),
732 gas_wei: gas,
733 stats,
734 }),
735 Err(Error::PartialUpload {
736 stored,
737 failed,
738 spend,
739 ..
740 }) => Ok(SingleWaveOutcome {
741 stored,
742 failed,
743 storage_atto: spend.storage_cost_atto.parse().unwrap_or(Amount::ZERO),
744 gas_wei: spend.gas_cost_wei,
745 stats: WaveAggregateStats::default(),
746 }),
747 Err(e) => Err(e),
748 }
749}
750
751/// Check that the spill directory has enough free space for the spilled chunks.
752///
753/// `file_size` is the source file's byte count. We require
754/// `file_size + 10%` free space to account for self-encryption overhead.
755fn check_disk_space_for_spill(file_size: u64) -> Result<()> {
756 let spill_root = ChunkSpill::spill_root()?;
757
758 // Ensure the root exists so fs2 can query it.
759 std::fs::create_dir_all(&spill_root)?;
760
761 let available = fs2::available_space(&spill_root).map_err(|e| {
762 Error::Io(std::io::Error::new(
763 e.kind(),
764 format!(
765 "failed to query disk space on {}: {e}",
766 spill_root.display()
767 ),
768 ))
769 })?;
770
771 // Use integer arithmetic to avoid f64 precision loss on large file sizes.
772 let headroom = file_size / DISK_SPACE_HEADROOM_PERCENT;
773 let required = file_size.saturating_add(headroom);
774
775 if available < required {
776 let avail_mb = available / (1024 * 1024);
777 let req_mb = required / (1024 * 1024);
778 return Err(Error::InsufficientDiskSpace(format!(
779 "need ~{req_mb} MB in spill dir ({}) but only {avail_mb} MB available",
780 spill_root.display()
781 )));
782 }
783
784 debug!(
785 "Disk space check passed: {available} bytes available, {required} bytes required (spill: {})",
786 spill_root.display()
787 );
788 Ok(())
789}
790
791fn usable_memory_bytes() -> Option<u64> {
792 let mut system = sysinfo::System::new();
793 system.refresh_memory();
794
795 let available_memory = system.available_memory();
796 let free_memory = system.free_memory();
797 let used_memory = system.used_memory();
798 let total_memory = system.total_memory();
799 let unused_memory = total_memory.saturating_sub(used_memory);
800
801 let mut usable = [available_memory, free_memory, unused_memory]
802 .into_iter()
803 .filter(|bytes| *bytes > 0)
804 .max();
805
806 let cgroup_free_memory = system
807 .cgroup_limits()
808 .filter(|limits| limits.total_memory > 0)
809 .map(|limits| limits.free_memory);
810 if let Some(cgroup_free_memory) = cgroup_free_memory {
811 usable = Some(usable.unwrap_or(u64::MAX).min(cgroup_free_memory));
812 }
813
814 debug!(
815 available_memory,
816 free_memory,
817 used_memory,
818 total_memory,
819 cgroup_free_memory,
820 usable_memory = ?usable,
821 "Detected usable memory for stream decrypt batch sizing"
822 );
823
824 usable
825}
826
827fn stream_decrypt_batch_memory_cap(usable_memory_bytes: u64) -> usize {
828 let budget = usable_memory_bytes / DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR;
829 let estimated_bytes_per_chunk = (self_encryption::MAX_CHUNK_SIZE as u64)
830 .saturating_mul(DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER)
831 .max(1);
832 let cap = (budget / estimated_bytes_per_chunk).max(1);
833
834 usize::try_from(cap).unwrap_or(usize::MAX)
835}
836
837fn adaptive_stream_decrypt_batch_size(
838 total_chunks: usize,
839 fetch_cap: usize,
840 configured_batch_floor: usize,
841 usable_memory_bytes: Option<u64>,
842) -> usize {
843 let fetch_target = fetch_cap
844 .max(1)
845 .saturating_mul(DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER);
846 let requested = match usable_memory_bytes {
847 Some(bytes) => {
848 let memory_cap = stream_decrypt_batch_memory_cap(bytes);
849 configured_batch_floor
850 .max(fetch_target)
851 .max(1)
852 .min(memory_cap)
853 }
854 None => configured_batch_floor.max(1),
855 };
856
857 requested.min(total_chunks.max(1)).max(1)
858}
859
860/// Whether the data map is published to the network for address-based retrieval.
861///
862/// A private upload stores only the data chunks and returns the `DataMap` to
863/// the caller — only someone holding that `DataMap` can reconstruct the file.
864/// A public upload additionally stores the serialized `DataMap` as a chunk on
865/// the network, yielding a single chunk address that anyone can use to
866/// retrieve the `DataMap` (via [`Client::data_map_fetch`]) and then the file.
867#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
868pub enum Visibility {
869 /// Keep the data map local; only the holder can retrieve the file.
870 #[default]
871 Private,
872 /// Publish the data map as a network chunk so anyone with the returned
873 /// address can retrieve and decrypt the file.
874 Public,
875}
876
877/// Confidence attached to an [`UploadCostEstimate`]'s `storage_cost_atto`.
878///
879/// `estimate_upload_cost` prices a file by sampling a few of its chunk
880/// addresses and extrapolating. When every sampled chunk is already stored
881/// there is no live price to extrapolate from, so a `"0"` cost can mean either
882/// "provably free" (the whole file was sampled) or only "probably free" (the
883/// tail was unsampled). This lets callers tell those apart instead of treating
884/// every `"0"` as unconditionally free.
885#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, serde::Serialize, serde::Deserialize)]
886#[serde(rename_all = "snake_case")]
887pub enum CostEstimateConfidence {
888 /// At least one sampled chunk returned a live quote; `storage_cost_atto`
889 /// is extrapolated from a real per-chunk price. The normal case.
890 #[default]
891 PricedSample,
892 /// Every chunk in the file was sampled and every one was already stored.
893 /// `storage_cost_atto` is exactly `"0"` — the upload is genuinely free.
894 VerifiedAllAlreadyStored,
895 /// Every *sampled* chunk was already stored, but not all chunks were
896 /// sampled. `storage_cost_atto` is `"0"` as a best-effort guess; the real
897 /// upload reconciles the true cost at payment time. Render this as "likely
898 /// already stored", not a guaranteed-free price.
899 AllSamplesAlreadyStoredIncomplete,
900}
901
902/// Estimated cost of uploading a file, returned by
903/// [`Client::estimate_upload_cost`].
904///
905/// Marked `#[non_exhaustive]` so adding a field later is not a breaking change
906/// for downstream consumers that construct or pattern-match on this struct.
907#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
908#[non_exhaustive]
909pub struct UploadCostEstimate {
910 /// Original file size in bytes.
911 pub file_size: u64,
912 /// Number of chunks the file would be split into (data chunks only,
913 /// does not include the DataMap chunk added during public uploads).
914 pub chunk_count: usize,
915 /// Estimated total storage cost in atto (token smallest unit).
916 pub storage_cost_atto: String,
917 /// Estimated gas cost in wei as a string. This is a rough heuristic
918 /// based on chunk count and payment mode, NOT a live gas price query.
919 pub estimated_gas_cost_wei: String,
920 /// Payment mode that would be used.
921 pub payment_mode: PaymentMode,
922 /// How much to trust `storage_cost_atto`. See [`CostEstimateConfidence`].
923 #[serde(default)]
924 pub confidence: CostEstimateConfidence,
925}
926
927/// Result of a file upload: the `DataMap` needed to retrieve the file.
928///
929/// Marked `#[non_exhaustive]` so adding a new field in future is not a
930/// breaking change for downstream consumers that construct or pattern-match
931/// on this struct.
932#[derive(Debug, Clone)]
933#[non_exhaustive]
934pub struct FileUploadResult {
935 /// The data map containing chunk metadata for reconstruction.
936 pub data_map: DataMap,
937 /// Number of chunks stored on the network.
938 pub chunks_stored: usize,
939 /// Number of chunks that failed to store. Always 0 for a successful
940 /// upload — partial-failure information is conveyed via
941 /// [`crate::data::Error::PartialUpload`] instead.
942 pub chunks_failed: usize,
943 /// Total number of chunks in the upload, including chunks that were
944 /// already stored and skipped. On full success this equals `chunks_stored`.
945 pub total_chunks: usize,
946 /// Which payment mode was actually used (not just requested).
947 pub payment_mode_used: PaymentMode,
948 /// Total storage cost paid in token units (atto). "0" if all chunks already existed.
949 pub storage_cost_atto: String,
950 /// Total gas cost in wei. 0 if no on-chain transactions were made.
951 pub gas_cost_wei: u128,
952 /// Chunk address of the serialized `DataMap`, set only for
953 /// [`Visibility::Public`] uploads. **`Some` means this address is
954 /// retrievable from the network (via [`Client::data_map_fetch`])**, not
955 /// necessarily that *this* upload paid to store it — if the serialized
956 /// `DataMap` hashed to a chunk that was already on the network (same
957 /// file uploaded before; deterministic via self-encryption), the address
958 /// is still returned but no storage payment was made for it.
959 pub data_map_address: Option<[u8; 32]>,
960 /// Sum of chunk-store RPC attempts across the upload
961 /// (`>= chunks_stored` on full success; more if any chunk retried).
962 /// `0` for paths that don't run the wave store loop.
963 pub chunk_attempts_total: usize,
964 /// Per-chunk store wall-clock in ms (length == `chunks_stored` on full
965 /// success, empty for paths that don't run the wave store loop).
966 pub store_durations_ms: Vec<u64>,
967 /// Count of stored chunks that succeeded on each retry round
968 /// (index 0 = first attempt, 1 = first retry, etc.). All zeros for
969 /// paths that don't run the wave store loop.
970 pub retries_histogram: [usize; 4],
971}
972
973/// Payment information for external signing — either wave-batch or merkle.
974#[derive(Debug)]
975pub enum ExternalPaymentInfo {
976 /// Wave-batch: individual (quote_hash, rewards_address, amount) tuples.
977 WaveBatch {
978 /// Chunks ready for payment (needed for finalize).
979 prepared_chunks: Vec<PreparedChunk>,
980 /// Payment intent for external signing.
981 payment_intent: PaymentIntent,
982 },
983 /// Merkle: single on-chain call with depth, pool commitments, timestamp.
984 Merkle {
985 /// The prepared merkle batch (public fields sent to frontend, private fields stay in Rust).
986 prepared_batch: PreparedMerkleBatch,
987 /// Raw chunk contents that still need upload after the preflight check.
988 chunk_contents: Vec<Bytes>,
989 /// Chunk addresses that still need upload after the preflight check.
990 chunk_addresses: Vec<[u8; 32]>,
991 },
992}
993
994/// Prepared upload ready for external payment.
995///
996/// Contains everything needed to construct the on-chain payment transaction
997/// externally (e.g. via WalletConnect in a desktop app) and then finalize
998/// the upload without a Rust-side wallet.
999///
1000/// Note: This struct stays in Rust memory — only the public fields of
1001/// `payment_info` are sent to the frontend. `PreparedChunk` contains
1002/// non-serializable network types, so the full struct cannot derive `Serialize`.
1003///
1004/// Marked `#[non_exhaustive]` so adding a new field in future is not a
1005/// breaking change for downstream consumers.
1006#[derive(Debug)]
1007#[non_exhaustive]
1008pub struct PreparedUpload {
1009 /// The data map for later retrieval.
1010 pub data_map: DataMap,
1011 /// Payment information for chunks that still need payment after the
1012 /// already-stored preflight. This may be wave-batch even when the original
1013 /// chunk count was merkle-eligible if the remaining count is below the
1014 /// merkle threshold.
1015 pub payment_info: ExternalPaymentInfo,
1016 /// Chunk address of the serialized `DataMap` when this upload was
1017 /// prepared with [`Visibility::Public`]. `Some` means the address is
1018 /// retrievable on the network after finalization — either because this
1019 /// upload paid to store the chunk in `payment_info`, or because the
1020 /// chunk was already on the network (deterministic self-encryption).
1021 /// Carried through to [`FileUploadResult::data_map_address`].
1022 pub data_map_address: Option<[u8; 32]>,
1023 /// Chunk addresses already present on the network when this upload was
1024 /// prepared. These do not require payment or PUT during finalization.
1025 pub already_stored_addresses: Vec<[u8; 32]>,
1026 /// Total chunk count for the upload, including already-stored chunks.
1027 pub total_chunks: usize,
1028}
1029
1030/// Return type for [`spawn_file_encryption`]: chunk receiver, `DataMap` oneshot, join handle.
1031type EncryptionChannels = (
1032 tokio::sync::mpsc::Receiver<Bytes>,
1033 tokio::sync::oneshot::Receiver<DataMap>,
1034 tokio::task::JoinHandle<Result<()>>,
1035);
1036
1037/// Spawn a blocking task that streams file encryption through a channel.
1038fn spawn_file_encryption(path: PathBuf) -> Result<EncryptionChannels> {
1039 let metadata = std::fs::metadata(&path)?;
1040 let data_size = usize::try_from(metadata.len())
1041 .map_err(|e| Error::Encryption(format!("file size exceeds platform usize: {e}")))?;
1042
1043 let (chunk_tx, chunk_rx) = tokio::sync::mpsc::channel(2);
1044 let (datamap_tx, datamap_rx) = tokio::sync::oneshot::channel();
1045
1046 let handle = tokio::task::spawn_blocking(move || {
1047 let file = std::fs::File::open(&path)?;
1048 let mut reader = std::io::BufReader::new(file);
1049
1050 let read_error: Arc<Mutex<Option<std::io::Error>>> = Arc::new(Mutex::new(None));
1051 let read_error_clone = Arc::clone(&read_error);
1052
1053 let data_iter = std::iter::from_fn(move || {
1054 let mut buffer = vec![0u8; 8192];
1055 match std::io::Read::read(&mut reader, &mut buffer) {
1056 Ok(0) => None,
1057 Ok(n) => {
1058 buffer.truncate(n);
1059 Some(Bytes::from(buffer))
1060 }
1061 Err(e) => {
1062 let mut guard = read_error_clone
1063 .lock()
1064 .unwrap_or_else(|poisoned| poisoned.into_inner());
1065 *guard = Some(e);
1066 None
1067 }
1068 }
1069 });
1070
1071 let mut stream = stream_encrypt(data_size, data_iter)
1072 .map_err(|e| Error::Encryption(format!("stream_encrypt failed: {e}")))?;
1073
1074 for chunk_result in stream.chunks() {
1075 // Check for captured read errors immediately after each chunk.
1076 // stream_encrypt sees None (EOF) when a read fails, so it stops
1077 // producing chunks. We must detect this before sending the
1078 // partial results to avoid uploading a truncated DataMap.
1079 {
1080 let guard = read_error
1081 .lock()
1082 .unwrap_or_else(|poisoned| poisoned.into_inner());
1083 if let Some(ref e) = *guard {
1084 return Err(Error::Io(std::io::Error::new(e.kind(), e.to_string())));
1085 }
1086 }
1087
1088 let (_hash, content) = chunk_result
1089 .map_err(|e| Error::Encryption(format!("chunk encryption failed: {e}")))?;
1090 if chunk_tx.blocking_send(content).is_err() {
1091 return Err(Error::Encryption("upload receiver dropped".to_string()));
1092 }
1093 }
1094
1095 // Final check: read error after last chunk (stream saw EOF).
1096 {
1097 let guard = read_error
1098 .lock()
1099 .unwrap_or_else(|poisoned| poisoned.into_inner());
1100 if let Some(ref e) = *guard {
1101 return Err(Error::Io(std::io::Error::new(e.kind(), e.to_string())));
1102 }
1103 }
1104
1105 let datamap = stream
1106 .into_datamap()
1107 .ok_or_else(|| Error::Encryption("no DataMap after encryption".to_string()))?;
1108 if datamap_tx.send(datamap).is_err() {
1109 warn!("DataMap receiver dropped — upload may have been cancelled");
1110 }
1111 Ok(())
1112 });
1113
1114 Ok((chunk_rx, datamap_rx, handle))
1115}
1116
1117/// RAII guard for the staging temp file used during a disk download.
1118///
1119/// Removes the file on drop — including a panic unwind out of the
1120/// `block_in_place` decrypt loop — unless [`commit`](Self::commit) has
1121/// promoted it to its final path. Centralizes the cleanup the explicit error
1122/// arms used to repeat.
1123struct TempDownload {
1124 /// `Some` while the staging file may need cleanup; `None` once committed.
1125 path: Option<PathBuf>,
1126}
1127
1128impl TempDownload {
1129 fn new(path: PathBuf) -> Self {
1130 Self { path: Some(path) }
1131 }
1132
1133 /// Path of the staging file (valid until `commit`).
1134 fn path(&self) -> &Path {
1135 self.path
1136 .as_deref()
1137 .expect("TempDownload::path called after commit")
1138 }
1139
1140 /// Rename the staged file to `dest`. On success the guard is defused so
1141 /// `Drop` is a no-op; on failure the guard stays armed and `Drop` removes
1142 /// the orphaned temp file.
1143 fn commit(mut self, dest: &Path) -> std::io::Result<()> {
1144 std::fs::rename(self.path(), dest)?; // err → guard armed → Drop cleans up
1145 self.path = None; // success → nothing left to clean
1146 Ok(())
1147 }
1148}
1149
1150impl Drop for TempDownload {
1151 fn drop(&mut self) {
1152 if let Some(path) = self.path.take() {
1153 if let Err(e) = std::fs::remove_file(&path) {
1154 // Absent file is fine (never created / already gone).
1155 if e.kind() != std::io::ErrorKind::NotFound {
1156 warn!(
1157 "Failed to remove temp download file {}: {e}",
1158 path.display()
1159 );
1160 }
1161 }
1162 }
1163 }
1164}
1165
1166impl Client {
1167 /// Upload a file to the network using streaming self-encryption.
1168 ///
1169 /// Automatically selects merkle batch payment for files that produce
1170 /// 64+ chunks (saves gas). Encrypted chunks are spilled to a temp
1171 /// directory so peak memory stays at ~256 MB regardless of file size.
1172 ///
1173 /// # Errors
1174 ///
1175 /// Returns an error if the file cannot be read, encryption fails,
1176 /// or any chunk cannot be stored.
1177 pub async fn file_upload(&self, path: &Path) -> Result<FileUploadResult> {
1178 self.file_upload_with_mode(path, PaymentMode::Auto).await
1179 }
1180
1181 /// Estimate the cost of uploading a file without actually uploading.
1182 ///
1183 /// Encrypts the file to determine chunk count and sizes, then requests
1184 /// a single quote from the network for a representative chunk. The
1185 /// per-chunk price is extrapolated to the total chunk count.
1186 ///
1187 /// The estimate is fast (~2-5s) and does not require a wallet. Spilled
1188 /// chunks are cleaned up automatically when the function returns.
1189 ///
1190 /// Gas cost is an advisory heuristic, not a live gas-oracle query. It is
1191 /// derived from realistic per-transaction budgets (`GAS_PER_WAVE_TX`,
1192 /// `GAS_PER_MERKLE_TX`) priced at `ARBITRUM_GAS_PRICE_WEI`. Real gas
1193 /// varies with network conditions.
1194 ///
1195 /// Sampled chunk addresses are spread across the whole file (not the first
1196 /// N) so a shared leading prefix doesn't bias the sample. When a sample
1197 /// returns a live quote the per-chunk price is extrapolated and the result
1198 /// is tagged [`CostEstimateConfidence::PricedSample`].
1199 ///
1200 /// When every sampled chunk is already stored the result is still `Ok`
1201 /// with `storage_cost_atto: "0"`, tagged either
1202 /// [`CostEstimateConfidence::VerifiedAllAlreadyStored`] when the whole file
1203 /// was sampled (exactly free) or
1204 /// [`CostEstimateConfidence::AllSamplesAlreadyStoredIncomplete`] when the
1205 /// tail was unsampled (a best-effort guess that payment reconciles).
1206 ///
1207 /// # Errors
1208 ///
1209 /// Returns an error if the file cannot be read, encryption fails, or the
1210 /// network cannot provide a quote.
1211 pub async fn estimate_upload_cost(
1212 &self,
1213 path: &Path,
1214 mode: PaymentMode,
1215 progress: Option<mpsc::Sender<UploadEvent>>,
1216 ) -> Result<UploadCostEstimate> {
1217 let file_size = std::fs::metadata(path).map_err(Error::Io)?.len();
1218
1219 if file_size < 3 {
1220 return Err(Error::InvalidData(
1221 "File too small: self-encryption requires at least 3 bytes".into(),
1222 ));
1223 }
1224
1225 check_disk_space_for_spill(file_size)?;
1226
1227 info!(
1228 "Estimating upload cost for {} ({file_size} bytes)",
1229 path.display()
1230 );
1231
1232 let (spill, _data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
1233 let chunk_count = spill.len();
1234
1235 if let Some(ref tx) = progress {
1236 let _ = tx
1237 .send(UploadEvent::Encrypted {
1238 total_chunks: chunk_count,
1239 })
1240 .await;
1241 }
1242
1243 info!("Encrypted into {chunk_count} chunks, requesting quote");
1244 let uses_merkle = should_use_merkle(chunk_count, mode);
1245
1246 // Sample chunk addresses spread evenly across the file (see
1247 // `distributed_sample_indices`) rather than the first N. A single
1248 // AlreadyStored result says nothing about the rest of the file, and a
1249 // positional sample lands on a shared leading prefix in the worst case,
1250 // so we spread the probe and only treat the whole file as "fully
1251 // stored" when every sample comes back stored.
1252 let sample_indices = distributed_sample_indices(spill.addresses.len(), ESTIMATE_SAMPLE_CAP);
1253 let mut sampled = 0usize;
1254 let mut all_already_stored = true;
1255 let mut quotes_opt: Option<Vec<QuoteEntry>> = None;
1256
1257 for &idx in &sample_indices {
1258 let addr = &spill.addresses[idx];
1259 sampled += 1;
1260 let chunk_bytes = spill.read_chunk(addr)?;
1261 let data_size = u64::try_from(chunk_bytes.len())
1262 .map_err(|e| Error::InvalidData(format!("chunk size too large: {e}")))?;
1263 let result = if uses_merkle {
1264 self.get_store_quotes_with_fault_tolerance(addr, data_size, DATA_TYPE_CHUNK)
1265 .await
1266 } else {
1267 self.get_store_quotes(addr, data_size, DATA_TYPE_CHUNK)
1268 .await
1269 };
1270 match result {
1271 Ok(q) => {
1272 quotes_opt = Some(q);
1273 all_already_stored = false;
1274 break;
1275 }
1276 Err(Error::AlreadyStored) => {
1277 debug!(
1278 "Sample chunk {} already stored; trying next address ({sampled}/{})",
1279 hex::encode(addr),
1280 sample_indices.len()
1281 );
1282 continue;
1283 }
1284 Err(e) => return Err(e),
1285 }
1286 }
1287
1288 let quotes = match quotes_opt {
1289 Some(q) => q,
1290 None if all_already_stored && sampled == chunk_count => {
1291 // Every address in the file was sampled and every one is
1292 // already on the network — a zero-cost estimate is exact here.
1293 info!("All {chunk_count} chunks already stored; returning zero-cost estimate");
1294 return Ok(UploadCostEstimate {
1295 file_size,
1296 chunk_count,
1297 storage_cost_atto: "0".into(),
1298 estimated_gas_cost_wei: "0".into(),
1299 payment_mode: if uses_merkle {
1300 PaymentMode::Merkle
1301 } else {
1302 PaymentMode::Single
1303 },
1304 confidence: CostEstimateConfidence::VerifiedAllAlreadyStored,
1305 });
1306 }
1307 None => {
1308 // Every sampled chunk was already stored but the tail was not
1309 // sampled, so there is no live price to extrapolate. The
1310 // estimate is display-only and payment reconciles the true
1311 // cost, so return an optimistic zero flagged as incomplete
1312 // rather than erroring — callers still get a value to show.
1313 info!(
1314 "All {sampled}/{chunk_count} sampled chunks already stored; \
1315 returning incomplete zero-cost estimate"
1316 );
1317 return Ok(UploadCostEstimate {
1318 file_size,
1319 chunk_count,
1320 storage_cost_atto: "0".into(),
1321 estimated_gas_cost_wei: "0".into(),
1322 payment_mode: if uses_merkle {
1323 PaymentMode::Merkle
1324 } else {
1325 PaymentMode::Single
1326 },
1327 confidence: CostEstimateConfidence::AllSamplesAlreadyStoredIncomplete,
1328 });
1329 }
1330 };
1331
1332 // Use the median price × 3 (matches SingleNodePayment::from_quotes
1333 // which pays 3x the median to incentivize reliable storage).
1334 let mut prices: Vec<Amount> = quotes.iter().map(|(_, _, _, price)| *price).collect();
1335 prices.sort();
1336 let median_price = prices
1337 .get(prices.len() / 2)
1338 .copied()
1339 .unwrap_or(Amount::ZERO);
1340 let per_chunk_cost = median_price * Amount::from(3u64);
1341
1342 let chunk_count_u64 = u64::try_from(chunk_count).unwrap_or(u64::MAX);
1343 let total_storage = per_chunk_cost * Amount::from(chunk_count_u64);
1344
1345 // Estimate gas cost from realistic per-transaction budgets rather
1346 // than a flat per-chunk or per-wave number.
1347 //
1348 // - Single mode: `batch_pay` packs up to UPLOAD_WAVE_SIZE chunks'
1349 // close-group quotes into one `pay_for_quotes` call on Arbitrum.
1350 // The dominant cost is one SSTORE per entry plus base tx overhead,
1351 // so we use GAS_PER_WAVE_TX (≈1.5M) as a conservative upper bound
1352 // on a full wave and multiply by the number of waves. The previous
1353 // per-wave figure of 150k was closer to a single-entry transfer
1354 // and understated cost by 5–10x for full waves.
1355 // - Merkle mode: one tx per sub-batch that verifies a merkle tree
1356 // and posts a pool commitment (GAS_PER_MERKLE_TX ≈ 500k each).
1357 //
1358 // Gas is priced at ARBITRUM_GAS_PRICE_WEI (~0.1 gwei, a typical
1359 // Arbitrum baseline). Treat the result as advisory, not a commitment.
1360 let waves = u128::try_from(chunk_count.div_ceil(UPLOAD_WAVE_SIZE)).unwrap_or(u128::MAX);
1361 let merkle_batches = u128::try_from(chunk_count.div_ceil(MAX_LEAVES)).unwrap_or(u128::MAX);
1362 let estimated_gas: u128 = if uses_merkle {
1363 merkle_batches
1364 .saturating_mul(GAS_PER_MERKLE_TX)
1365 .saturating_mul(ARBITRUM_GAS_PRICE_WEI)
1366 } else {
1367 waves
1368 .saturating_mul(GAS_PER_WAVE_TX)
1369 .saturating_mul(ARBITRUM_GAS_PRICE_WEI)
1370 };
1371
1372 info!(
1373 "Estimate: {chunk_count} chunks, storage={total_storage} atto, gas~={estimated_gas} wei"
1374 );
1375
1376 Ok(UploadCostEstimate {
1377 file_size,
1378 chunk_count,
1379 storage_cost_atto: total_storage.to_string(),
1380 estimated_gas_cost_wei: estimated_gas.to_string(),
1381 payment_mode: if uses_merkle {
1382 PaymentMode::Merkle
1383 } else {
1384 PaymentMode::Single
1385 },
1386 confidence: CostEstimateConfidence::PricedSample,
1387 })
1388 }
1389
1390 /// Phase 1 of external-signer upload: encrypt file and prepare chunks.
1391 ///
1392 /// Equivalent to [`Client::file_prepare_upload_with_visibility`] with
1393 /// [`Visibility::Private`] — see that method for details.
1394 pub async fn file_prepare_upload(&self, path: &Path) -> Result<PreparedUpload> {
1395 self.file_prepare_upload_with_progress(path, Visibility::Private, None)
1396 .await
1397 }
1398
1399 /// Phase 1 of external-signer upload with explicit [`Visibility`] control.
1400 ///
1401 /// Equivalent to [`Client::file_prepare_upload_with_progress`] with
1402 /// `progress: None` — see that method for details.
1403 pub async fn file_prepare_upload_with_visibility(
1404 &self,
1405 path: &Path,
1406 visibility: Visibility,
1407 ) -> Result<PreparedUpload> {
1408 self.file_prepare_upload_with_progress(path, visibility, None)
1409 .await
1410 }
1411
1412 /// Phase 1 of external-signer upload with progress events.
1413 ///
1414 /// Requires an EVM network (for contract price queries) but NOT a wallet.
1415 /// Returns a [`PreparedUpload`] containing the data map, prepared chunks,
1416 /// and a [`PaymentIntent`] that the external signer uses to construct
1417 /// and submit the on-chain payment transaction.
1418 ///
1419 /// When `visibility` is [`Visibility::Public`], the serialized `DataMap`
1420 /// is bundled into the payment batch as an additional chunk and its
1421 /// address is recorded on the returned [`PreparedUpload`]. After
1422 /// [`Client::finalize_upload`] (or `_merkle`) succeeds, that address is
1423 /// surfaced via [`FileUploadResult::data_map_address`] so the uploader
1424 /// can share a single address from which anyone can retrieve the file.
1425 ///
1426 /// When `progress` is `Some`, [`UploadEvent`]s are emitted on the channel
1427 /// during encryption ([`UploadEvent::Encrypting`] / [`UploadEvent::Encrypted`])
1428 /// and per-chunk quoting ([`UploadEvent::ChunkQuoted`]). Storage events are
1429 /// emitted later by [`Client::finalize_upload_with_progress`] /
1430 /// [`Client::finalize_upload_merkle_with_progress`].
1431 ///
1432 /// **Memory note:** Encryption uses disk spilling for bounded memory, but
1433 /// the returned [`PreparedUpload`] holds all chunk content in memory (each
1434 /// [`PreparedChunk`] contains a `Bytes` with the full chunk data). This is
1435 /// inherent to the two-phase external-signer protocol — the chunks must
1436 /// stay in memory until [`Client::finalize_upload`] stores them. For very
1437 /// large files, prefer [`Client::file_upload`] which streams directly.
1438 ///
1439 /// # Errors
1440 ///
1441 /// Returns an error if there is insufficient disk space, the file cannot
1442 /// be read, encryption fails, or quote collection fails.
1443 pub async fn file_prepare_upload_with_progress(
1444 &self,
1445 path: &Path,
1446 visibility: Visibility,
1447 progress: Option<mpsc::Sender<UploadEvent>>,
1448 ) -> Result<PreparedUpload> {
1449 debug!(
1450 "Preparing file upload for external signing (visibility={visibility:?}): {}",
1451 path.display()
1452 );
1453
1454 let file_size = std::fs::metadata(path)?.len();
1455 check_disk_space_for_spill(file_size)?;
1456
1457 let (spill, data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
1458
1459 info!(
1460 "Encrypted {} into {} chunks for external signing (spilled to disk)",
1461 path.display(),
1462 spill.len()
1463 );
1464
1465 // Read each chunk from disk and collect quotes concurrently.
1466 // Note: all PreparedChunks accumulate in memory because the external-signer
1467 // protocol requires them for finalize_upload. NOT memory-bounded for large files.
1468 let mut chunk_data: Vec<Bytes> = spill
1469 .addresses
1470 .iter()
1471 .map(|addr| spill.read_chunk(addr))
1472 .collect::<std::result::Result<Vec<_>, _>>()?;
1473
1474 // For public uploads, bundle the serialized DataMap as an extra chunk
1475 // in the same payment batch. This lets the external signer pay for
1476 // the data chunks and the DataMap chunk in one flow, and lets the
1477 // finalize step return the DataMap's chunk address as the shareable
1478 // retrieval address.
1479 let data_map_address = match visibility {
1480 Visibility::Private => None,
1481 Visibility::Public => {
1482 let serialized = rmp_serde::to_vec(&data_map).map_err(|e| {
1483 Error::Serialization(format!("Failed to serialize DataMap: {e}"))
1484 })?;
1485 let bytes = Bytes::from(serialized);
1486 let address = compute_address(&bytes);
1487 info!(
1488 "Public upload: bundling DataMap chunk ({} bytes) at address {}",
1489 bytes.len(),
1490 hex::encode(address)
1491 );
1492 chunk_data.push(bytes);
1493 Some(address)
1494 }
1495 };
1496
1497 let chunk_count = chunk_data.len();
1498
1499 if let Some(ref tx) = progress {
1500 let _ = tx
1501 .send(UploadEvent::Encrypted {
1502 total_chunks: chunk_count,
1503 })
1504 .await;
1505 }
1506
1507 let (payment_info, already_stored_addresses) = if should_use_merkle(
1508 chunk_count,
1509 PaymentMode::Auto,
1510 ) {
1511 // Merkle path: build tree, collect candidate pools, return for external payment.
1512 info!("Using merkle batch preparation for {chunk_count} file chunks");
1513
1514 let chunk_entries: Vec<([u8; 32], u64)> = chunk_data
1515 .iter()
1516 .map(|chunk| {
1517 let size = u64::try_from(chunk.len())
1518 .map_err(|e| Error::InvalidData(format!("chunk size too large: {e}")))?;
1519 Ok((compute_address(chunk), size))
1520 })
1521 .collect::<Result<Vec<_>>>()?;
1522
1523 let merkle_plan = self
1524 .plan_merkle_upload(chunk_entries, DATA_TYPE_CHUNK, progress.as_ref())
1525 .await?;
1526
1527 if merkle_plan.to_upload.is_empty() {
1528 info!("All {chunk_count} file chunks already stored; no external payment needed");
1529 (
1530 ExternalPaymentInfo::WaveBatch {
1531 prepared_chunks: Vec::new(),
1532 payment_intent: PaymentIntent::from_prepared_chunks(&[]),
1533 },
1534 merkle_plan.already_stored,
1535 )
1536 } else {
1537 let chunk_data =
1538 chunk_contents_for_upload_addresses(chunk_data, &merkle_plan.to_upload)?;
1539
1540 if !should_use_merkle(merkle_plan.to_upload.len(), PaymentMode::Auto) {
1541 info!(
1542 "{} file chunks need upload after merkle preflight; preparing wave-batch payment",
1543 merkle_plan.to_upload.len()
1544 );
1545 let (payment_info, mut wave_already_stored) = self
1546 .prepare_wave_batch_external_chunks(
1547 chunk_data,
1548 progress.as_ref(),
1549 chunk_count,
1550 )
1551 .await?;
1552 let mut already_stored = merkle_plan.already_stored;
1553 already_stored.append(&mut wave_already_stored);
1554 (payment_info, already_stored)
1555 } else {
1556 match self
1557 .prepare_merkle_batch_external(
1558 &merkle_plan.to_upload,
1559 DATA_TYPE_CHUNK,
1560 merkle_plan.to_upload_avg_size(),
1561 )
1562 .await
1563 {
1564 Ok(prepared_batch) => {
1565 info!(
1566 "File prepared for external merkle signing: {} chunks, depth={} ({})",
1567 merkle_plan.to_upload.len(),
1568 prepared_batch.depth,
1569 path.display()
1570 );
1571
1572 (
1573 ExternalPaymentInfo::Merkle {
1574 prepared_batch,
1575 chunk_contents: chunk_data,
1576 chunk_addresses: merkle_plan.to_upload,
1577 },
1578 merkle_plan.already_stored,
1579 )
1580 }
1581 Err(Error::InsufficientPeers(ref msg)) => {
1582 info!(
1583 "External merkle preparation needs more peers ({msg}); preparing wave-batch payment"
1584 );
1585 let (payment_info, mut wave_already_stored) = self
1586 .prepare_wave_batch_external_chunks(
1587 chunk_data,
1588 progress.as_ref(),
1589 chunk_count,
1590 )
1591 .await?;
1592 let mut already_stored = merkle_plan.already_stored;
1593 already_stored.append(&mut wave_already_stored);
1594 (payment_info, already_stored)
1595 }
1596 Err(e) => return Err(e),
1597 }
1598 }
1599 }
1600 } else {
1601 self.prepare_wave_batch_external_chunks(chunk_data, progress.as_ref(), chunk_count)
1602 .await?
1603 };
1604
1605 // Surface the "DataMap chunk was already on the network" case
1606 // so debugging "why is data_map_address set but no storage cost
1607 // appears for it?" doesn't require reading the source. See the
1608 // `data_map_address` doc comment for why this is still a valid
1609 // `Some(addr)` outcome.
1610 if let Some(addr) = data_map_address {
1611 let data_map_needs_payment = match &payment_info {
1612 ExternalPaymentInfo::WaveBatch {
1613 prepared_chunks, ..
1614 } => prepared_chunks.iter().any(|c| c.address == addr),
1615 ExternalPaymentInfo::Merkle {
1616 chunk_addresses, ..
1617 } => chunk_addresses.contains(&addr),
1618 };
1619 if !data_map_needs_payment {
1620 info!(
1621 "Public upload: DataMap chunk {} was already stored \
1622 on the network — address is retrievable without a \
1623 new payment",
1624 hex::encode(addr)
1625 );
1626 }
1627 }
1628
1629 Ok(PreparedUpload {
1630 data_map,
1631 payment_info,
1632 data_map_address,
1633 already_stored_addresses,
1634 total_chunks: chunk_count,
1635 })
1636 }
1637
1638 async fn prepare_wave_batch_external_chunks(
1639 &self,
1640 chunk_data: Vec<Bytes>,
1641 progress: Option<&mpsc::Sender<UploadEvent>>,
1642 progress_total: usize,
1643 ) -> Result<(ExternalPaymentInfo, Vec<[u8; 32]>)> {
1644 let chunk_count = chunk_data.len();
1645 let chunks_with_addr: Vec<(Bytes, [u8; 32])> = chunk_data
1646 .into_iter()
1647 .map(|content| {
1648 let address = compute_address(&content);
1649 (content, address)
1650 })
1651 .collect();
1652
1653 // Wave-batch path: collect quotes per chunk concurrently, emitting
1654 // a `ChunkQuoted` event after each completion so callers can drive
1655 // a progress bar through the slow quote phase.
1656 let quote_limiter = self.controller().quote.clone();
1657 let quote_concurrency = quote_limiter.current().min(chunk_count.max(1));
1658 let mut quote_stream = stream::iter(chunks_with_addr)
1659 .map(|(content, address)| {
1660 let limiter = quote_limiter.clone();
1661 async move {
1662 let result = observe_op(
1663 &limiter,
1664 || async move { self.prepare_chunk_payment(content).await },
1665 classify_error,
1666 )
1667 .await;
1668 (address, result)
1669 }
1670 })
1671 .buffer_unordered(quote_concurrency);
1672
1673 let mut prepared_chunks = Vec::with_capacity(chunk_count);
1674 let mut already_stored = Vec::new();
1675 let mut quoted = 0usize;
1676 while let Some((address, result)) = quote_stream.next().await {
1677 match result? {
1678 Some(prepared) => prepared_chunks.push(prepared),
1679 None => already_stored.push(address),
1680 }
1681 quoted += 1;
1682 if let Some(tx) = progress {
1683 let _ = tx.try_send(UploadEvent::ChunkQuoted {
1684 quoted,
1685 total: progress_total,
1686 });
1687 }
1688 }
1689
1690 let payment_intent = PaymentIntent::from_prepared_chunks(&prepared_chunks);
1691 info!(
1692 "Prepared external wave-batch payment: {} chunks, {} already stored, total {} atto",
1693 prepared_chunks.len(),
1694 already_stored.len(),
1695 payment_intent.total_amount,
1696 );
1697
1698 Ok((
1699 ExternalPaymentInfo::WaveBatch {
1700 prepared_chunks,
1701 payment_intent,
1702 },
1703 already_stored,
1704 ))
1705 }
1706
1707 /// Phase 2 of external-signer upload (wave-batch): finalize with externally-signed tx hashes.
1708 ///
1709 /// Takes a [`PreparedUpload`] that used wave-batch payment and a map
1710 /// of `quote_hash -> tx_hash` provided by the external signer after on-chain
1711 /// payment. Builds payment proofs and stores chunks on the network.
1712 ///
1713 /// # Errors
1714 ///
1715 /// Returns an error if the prepared upload used merkle payment (use
1716 /// [`Client::finalize_upload_merkle`] instead), proof construction fails,
1717 /// or any chunk cannot be stored.
1718 pub async fn finalize_upload(
1719 &self,
1720 prepared: PreparedUpload,
1721 tx_hash_map: &HashMap<QuoteHash, TxHash>,
1722 ) -> Result<FileUploadResult> {
1723 self.finalize_upload_with_progress(prepared, tx_hash_map, None)
1724 .await
1725 }
1726
1727 /// Phase 2 of external-signer upload (wave-batch) with progress events.
1728 ///
1729 /// Same as [`Client::finalize_upload`] but emits [`UploadEvent::ChunkStored`]
1730 /// on the provided channel as each chunk is successfully stored.
1731 ///
1732 /// # Errors
1733 ///
1734 /// Same as [`Client::finalize_upload`].
1735 pub async fn finalize_upload_with_progress(
1736 &self,
1737 prepared: PreparedUpload,
1738 tx_hash_map: &HashMap<QuoteHash, TxHash>,
1739 progress: Option<mpsc::Sender<UploadEvent>>,
1740 ) -> Result<FileUploadResult> {
1741 let data_map_address = prepared.data_map_address;
1742 let already_stored_addresses = prepared.already_stored_addresses;
1743 let already_stored_count = already_stored_addresses.len();
1744 let total_chunks = prepared.total_chunks;
1745 match prepared.payment_info {
1746 ExternalPaymentInfo::WaveBatch {
1747 prepared_chunks,
1748 payment_intent,
1749 } => {
1750 let paid_chunks = finalize_batch_payment(prepared_chunks, tx_hash_map)?;
1751 let wave_result = self
1752 .store_paid_chunks_with_events(
1753 paid_chunks,
1754 progress.as_ref(),
1755 already_stored_count,
1756 total_chunks,
1757 )
1758 .await;
1759 if !wave_result.failed.is_empty() {
1760 let failed_count = wave_result.failed.len();
1761 let stored_count = already_stored_count + wave_result.stored.len();
1762 let mut stored = already_stored_addresses;
1763 stored.extend(wave_result.stored);
1764 return Err(Error::PartialUpload {
1765 stored,
1766 stored_count,
1767 failed: wave_result.failed,
1768 failed_count,
1769 total_chunks,
1770 // Report the storage spend known from the payment intent
1771 // the external signer was handed. Gas is paid by the
1772 // signer out-of-band, so it stays unknown (0).
1773 spend: Box::new(PartialUploadSpend {
1774 storage_cost_atto: payment_intent.total_amount.to_string(),
1775 gas_cost_wei: 0,
1776 }),
1777 reason: "finalize_upload: chunk storage failed after retries".into(),
1778 });
1779 }
1780 let chunks_stored = already_stored_count + wave_result.stored.len();
1781
1782 info!("External-signer upload finalized: {chunks_stored} chunks stored");
1783
1784 let mut stats = WaveAggregateStats::default();
1785 stats.absorb(&wave_result);
1786
1787 Ok(FileUploadResult {
1788 data_map: prepared.data_map,
1789 chunks_stored,
1790 chunks_failed: 0,
1791 total_chunks,
1792 payment_mode_used: PaymentMode::Single,
1793 // Storage spend is known from the payment intent; gas is
1794 // paid by the external signer out-of-band (unknown here).
1795 storage_cost_atto: payment_intent.total_amount.to_string(),
1796 gas_cost_wei: 0,
1797 data_map_address,
1798 chunk_attempts_total: stats.chunk_attempts_total,
1799 store_durations_ms: stats.store_durations_ms,
1800 retries_histogram: stats.retries_histogram,
1801 })
1802 }
1803 ExternalPaymentInfo::Merkle { .. } => Err(Error::Payment(
1804 "Cannot finalize merkle upload with wave-batch tx hashes. \
1805 Use finalize_upload_merkle() instead."
1806 .to_string(),
1807 )),
1808 }
1809 }
1810
1811 /// Phase 2 of external-signer upload (merkle): finalize with winner pool hash.
1812 ///
1813 /// Takes a [`PreparedUpload`] that used merkle payment and the `winner_pool_hash`
1814 /// returned by the on-chain merkle payment transaction. Generates proofs and
1815 /// stores chunks on the network.
1816 ///
1817 /// # Errors
1818 ///
1819 /// Returns an error if the prepared upload used wave-batch payment (use
1820 /// [`Client::finalize_upload`] instead), proof generation fails,
1821 /// or any chunk cannot be stored.
1822 pub async fn finalize_upload_merkle(
1823 &self,
1824 prepared: PreparedUpload,
1825 winner_pool_hash: [u8; 32],
1826 ) -> Result<FileUploadResult> {
1827 self.finalize_upload_merkle_with_progress(prepared, winner_pool_hash, None)
1828 .await
1829 }
1830
1831 /// Phase 2 of external-signer upload (merkle) with progress events.
1832 ///
1833 /// Same as [`Client::finalize_upload_merkle`] but emits [`UploadEvent::ChunkStored`]
1834 /// on the provided channel as each chunk is successfully stored.
1835 ///
1836 /// # Errors
1837 ///
1838 /// Same as [`Client::finalize_upload_merkle`].
1839 pub async fn finalize_upload_merkle_with_progress(
1840 &self,
1841 prepared: PreparedUpload,
1842 winner_pool_hash: [u8; 32],
1843 progress: Option<mpsc::Sender<UploadEvent>>,
1844 ) -> Result<FileUploadResult> {
1845 let data_map_address = prepared.data_map_address;
1846 let already_stored_count = prepared.already_stored_addresses.len();
1847 let total_chunks = prepared.total_chunks;
1848 match prepared.payment_info {
1849 ExternalPaymentInfo::Merkle {
1850 prepared_batch,
1851 chunk_contents,
1852 chunk_addresses,
1853 } => {
1854 let batch_result = finalize_merkle_batch(prepared_batch, winner_pool_hash)?;
1855 let outcome = self
1856 .merkle_upload_chunks(
1857 chunk_contents,
1858 chunk_addresses,
1859 &batch_result,
1860 progress.as_ref(),
1861 already_stored_count,
1862 total_chunks,
1863 )
1864 .await?;
1865
1866 info!(
1867 "External-signer merkle upload finalized: {} chunks stored, {} failed",
1868 outcome.stored, outcome.failed
1869 );
1870
1871 Ok(FileUploadResult {
1872 data_map: prepared.data_map,
1873 chunks_stored: outcome.stored,
1874 chunks_failed: outcome.failed,
1875 total_chunks,
1876 payment_mode_used: PaymentMode::Merkle,
1877 storage_cost_atto: "0".into(),
1878 gas_cost_wei: 0,
1879 data_map_address,
1880 chunk_attempts_total: outcome.stats.chunk_attempts_total,
1881 store_durations_ms: outcome.stats.store_durations_ms,
1882 retries_histogram: outcome.stats.retries_histogram,
1883 })
1884 }
1885 ExternalPaymentInfo::WaveBatch { .. } => Err(Error::Payment(
1886 "Cannot finalize wave-batch upload with merkle winner hash. \
1887 Use finalize_upload() instead."
1888 .to_string(),
1889 )),
1890 }
1891 }
1892
1893 /// Upload a file with a specific payment mode.
1894 ///
1895 /// Before encryption, checks that the temp directory has enough free
1896 /// disk space for the spilled chunks (~1.1× source file size).
1897 ///
1898 /// Encrypted chunks are spilled to a temp directory during encryption
1899 /// so that only their 32-byte addresses stay in memory. At upload time,
1900 /// chunks are read back one wave at a time (~64 × 4 MB ≈ 256 MB peak).
1901 ///
1902 /// # Errors
1903 ///
1904 /// Returns an error if there is insufficient disk space, the file cannot
1905 /// be read, encryption fails, or any chunk cannot be stored.
1906 #[allow(clippy::too_many_lines)]
1907 pub async fn file_upload_with_mode(
1908 &self,
1909 path: &Path,
1910 mode: PaymentMode,
1911 ) -> Result<FileUploadResult> {
1912 self.file_upload_with_progress(path, mode, None).await
1913 }
1914
1915 /// Upload a file publicly, storing the serialized [`DataMap`] as part of
1916 /// the same upload payment batch.
1917 ///
1918 /// The returned [`FileUploadResult::data_map_address`] can be shared for
1919 /// public downloads via [`Client::data_map_fetch`].
1920 #[allow(clippy::too_many_lines)]
1921 pub async fn file_upload_public_with_mode(
1922 &self,
1923 path: &Path,
1924 mode: PaymentMode,
1925 ) -> Result<FileUploadResult> {
1926 self.file_upload_with_visibility_and_progress(path, mode, Visibility::Public, None)
1927 .await
1928 }
1929
1930 /// Upload a file with progress events sent to the given channel.
1931 ///
1932 /// Same as [`Client::file_upload_with_mode`] but sends [`UploadEvent`]s to the
1933 /// provided channel for UI progress feedback.
1934 #[allow(clippy::too_many_lines)]
1935 pub async fn file_upload_with_progress(
1936 &self,
1937 path: &Path,
1938 mode: PaymentMode,
1939 progress: Option<mpsc::Sender<UploadEvent>>,
1940 ) -> Result<FileUploadResult> {
1941 self.file_upload_with_visibility_and_progress(path, mode, Visibility::Private, progress)
1942 .await
1943 }
1944
1945 /// Public file upload with progress events.
1946 ///
1947 /// Same as [`Client::file_upload_public_with_mode`] but sends
1948 /// [`UploadEvent`]s to the provided channel for UI progress feedback.
1949 #[allow(clippy::too_many_lines)]
1950 pub async fn file_upload_public_with_progress(
1951 &self,
1952 path: &Path,
1953 mode: PaymentMode,
1954 progress: Option<mpsc::Sender<UploadEvent>>,
1955 ) -> Result<FileUploadResult> {
1956 self.file_upload_with_visibility_and_progress(path, mode, Visibility::Public, progress)
1957 .await
1958 }
1959
1960 #[allow(clippy::too_many_lines)]
1961 async fn file_upload_with_visibility_and_progress(
1962 &self,
1963 path: &Path,
1964 mode: PaymentMode,
1965 visibility: Visibility,
1966 progress: Option<mpsc::Sender<UploadEvent>>,
1967 ) -> Result<FileUploadResult> {
1968 debug!(
1969 "Streaming file upload with mode {mode:?}, visibility {visibility:?}: {}",
1970 path.display()
1971 );
1972
1973 // Pre-flight: verify enough temp disk space for the chunk spill.
1974 let file_size = std::fs::metadata(path)?.len();
1975 check_disk_space_for_spill(file_size)?;
1976
1977 // Phase 1: Encrypt file and spill chunks to temp directory.
1978 // Only 32-byte addresses stay in memory — chunk data lives on disk.
1979 let (mut spill, data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
1980
1981 let data_map_address = match visibility {
1982 Visibility::Private => None,
1983 Visibility::Public => {
1984 let serialized = rmp_serde::to_vec(&data_map).map_err(|e| {
1985 Error::Serialization(format!("Failed to serialize DataMap: {e}"))
1986 })?;
1987 let address = compute_address(&serialized);
1988 info!(
1989 "Public upload: adding DataMap chunk ({} bytes) at address {} to payment batch",
1990 serialized.len(),
1991 hex::encode(address)
1992 );
1993 spill.push(&serialized)?;
1994 Some(address)
1995 }
1996 };
1997
1998 let chunk_count = spill.len();
1999 info!(
2000 "Encrypted {} into {chunk_count} chunks (spilled to disk)",
2001 path.display()
2002 );
2003 if let Some(ref tx) = progress {
2004 let _ = tx
2005 .send(UploadEvent::Encrypted {
2006 total_chunks: chunk_count,
2007 })
2008 .await;
2009 }
2010
2011 // Phase 2: Decide payment mode and upload in waves from disk.
2012 //
2013 // For the merkle path, attempt to resume from a cached
2014 // receipt before paying again. The cache is keyed by the
2015 // CANONICAL source path so `./foo`, `/abs/foo`, and any
2016 // symlink alias all resolve to the same cache entry — a
2017 // crash-and-retry from a different cwd or via a different
2018 // alias still hits the receipt. Canonicalize may fail (the
2019 // file could have been moved between phase 1 and here); we
2020 // fall back to the display string in that case, which
2021 // preserves pre-fix behaviour rather than dropping cache
2022 // resume entirely.
2023 let file_path_key = std::fs::canonicalize(path)
2024 .map(|p| p.display().to_string())
2025 .unwrap_or_else(|_| path.display().to_string());
2026 let (chunks_stored, actual_mode, storage_cost_atto, gas_cost_wei, stats) = if self
2027 .should_use_merkle(chunk_count, mode)
2028 {
2029 info!("Using merkle batch payment for {chunk_count} file chunks");
2030
2031 let cached_merkle =
2032 crate::data::client::cached_merkle::try_load_for_file(&file_path_key)
2033 .map(|(_cache_path, cached)| cached);
2034
2035 let merkle_plan = match self
2036 .plan_merkle_upload(spill.chunk_entries()?, DATA_TYPE_CHUNK, progress.as_ref())
2037 .await
2038 {
2039 Ok(plan) => plan,
2040 Err(e) => {
2041 if let Some(cached) = cached_merkle
2042 .as_ref()
2043 .filter(|cached| cached_merkle_covers_addresses(cached, &spill.addresses))
2044 {
2045 info!(
2046 "Merkle preflight failed ({e}); \
2047 resuming with cached merkle proofs"
2048 );
2049 let (stored, sc, gc, stats) = self
2050 .upload_waves_merkle(
2051 &spill,
2052 &spill.addresses,
2053 cached,
2054 &[],
2055 progress.as_ref(),
2056 )
2057 .await?;
2058 crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
2059 return Ok(FileUploadResult {
2060 data_map,
2061 chunks_stored: stored,
2062 chunks_failed: 0,
2063 total_chunks: chunk_count,
2064 payment_mode_used: PaymentMode::Merkle,
2065 storage_cost_atto: sc,
2066 gas_cost_wei: gc,
2067 data_map_address,
2068 chunk_attempts_total: stats.chunk_attempts_total,
2069 store_durations_ms: stats.store_durations_ms,
2070 retries_histogram: stats.retries_histogram,
2071 });
2072 }
2073 match &e {
2074 Error::InsufficientPeers(msg) if mode == PaymentMode::Auto => {
2075 info!(
2076 "Merkle preflight needs more peers ({msg}), \
2077 falling back to wave-batch"
2078 );
2079 let (stored, sc, gc, fb_stats) = self
2080 .upload_waves_single(
2081 &spill,
2082 progress.as_ref(),
2083 Some(&file_path_key),
2084 )
2085 .await?;
2086 crate::data::client::cached_single::try_delete_for_file(&file_path_key);
2087 return Ok(FileUploadResult {
2088 data_map,
2089 chunks_stored: stored,
2090 chunks_failed: 0,
2091 total_chunks: chunk_count,
2092 payment_mode_used: PaymentMode::Single,
2093 storage_cost_atto: sc,
2094 gas_cost_wei: gc,
2095 data_map_address,
2096 chunk_attempts_total: fb_stats.chunk_attempts_total,
2097 store_durations_ms: fb_stats.store_durations_ms,
2098 retries_histogram: fb_stats.retries_histogram,
2099 });
2100 }
2101 _ => return Err(e),
2102 }
2103 }
2104 };
2105
2106 if merkle_plan.to_upload.is_empty() {
2107 info!("All {chunk_count} merkle chunks already stored; skipping payment");
2108 crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
2109 crate::data::client::cached_single::try_delete_for_file(&file_path_key);
2110 (
2111 chunk_count,
2112 PaymentMode::Merkle,
2113 "0".to_string(),
2114 0,
2115 WaveAggregateStats::default(),
2116 )
2117 } else if !self.should_use_merkle(merkle_plan.to_upload.len(), mode) {
2118 let remaining_chunks = merkle_plan.to_upload.len();
2119 if let Some(cached) = cached_merkle
2120 .as_ref()
2121 .filter(|cached| cached_merkle_covers_addresses(cached, &merkle_plan.to_upload))
2122 {
2123 info!(
2124 "{remaining_chunks} chunks remain below merkle threshold; \
2125 reusing cached merkle proofs"
2126 );
2127 let (stored, sc, gc, stats) = self
2128 .upload_waves_merkle(
2129 &spill,
2130 &merkle_plan.to_upload,
2131 cached,
2132 &merkle_plan.already_stored,
2133 progress.as_ref(),
2134 )
2135 .await?;
2136 crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
2137 (stored, PaymentMode::Merkle, sc, gc, stats)
2138 } else {
2139 if cached_merkle.is_some() {
2140 info!(
2141 "{remaining_chunks} chunks remain below merkle threshold, \
2142 and the cached merkle receipt does not cover them. \
2143 Discarding cache and using single-node payment."
2144 );
2145 crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
2146 } else {
2147 info!(
2148 "{remaining_chunks} chunks need upload after merkle preflight; \
2149 using single-node payment"
2150 );
2151 }
2152 let (stored, sc, gc, stats) = self
2153 .upload_spill_addresses_single(
2154 &spill,
2155 &merkle_plan.to_upload,
2156 progress.as_ref(),
2157 &merkle_plan.already_stored,
2158 chunk_count,
2159 Some(&file_path_key),
2160 )
2161 .await?;
2162 crate::data::client::cached_single::try_delete_for_file(&file_path_key);
2163 (stored, PaymentMode::Single, sc, gc, stats)
2164 }
2165 } else {
2166 let batch_result = if let Some(cached) = cached_merkle.as_ref() {
2167 // Validate the cache against the chunks that still need
2168 // storage. Extra proofs are harmless: a previous attempt
2169 // may have paid for chunks that are now already stored.
2170 if cached_merkle_covers_addresses(cached, &merkle_plan.to_upload) {
2171 info!(
2172 "Skipping merkle payment phase; resuming with \
2173 cached proofs for {} remaining chunks",
2174 merkle_plan.to_upload.len()
2175 );
2176 Ok(cached.clone())
2177 } else {
2178 info!(
2179 "Cached merkle receipt does not cover the current \
2180 remaining chunks (cached={}, remaining={}). \
2181 Discarding cache and paying fresh.",
2182 cached.proofs.len(),
2183 merkle_plan.to_upload.len()
2184 );
2185 crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
2186 self.pay_for_merkle_batch(
2187 &merkle_plan.to_upload,
2188 DATA_TYPE_CHUNK,
2189 merkle_plan.to_upload_avg_size(),
2190 )
2191 .await
2192 .inspect(|result| {
2193 crate::data::client::cached_merkle::try_save(&file_path_key, result);
2194 })
2195 }
2196 } else {
2197 self.pay_for_merkle_batch(
2198 &merkle_plan.to_upload,
2199 DATA_TYPE_CHUNK,
2200 merkle_plan.to_upload_avg_size(),
2201 )
2202 .await
2203 .inspect(|result| {
2204 // Save BEFORE the store phase so a crash
2205 // mid-upload leaves a resumable receipt.
2206 crate::data::client::cached_merkle::try_save(&file_path_key, result);
2207 })
2208 };
2209
2210 let batch_result = match batch_result {
2211 Ok(result) => result,
2212 Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => {
2213 info!("Merkle needs more peers ({msg}), falling back to wave-batch");
2214 let (stored, sc, gc, fb_stats) = self
2215 .upload_spill_addresses_single(
2216 &spill,
2217 &merkle_plan.to_upload,
2218 progress.as_ref(),
2219 &merkle_plan.already_stored,
2220 chunk_count,
2221 Some(&file_path_key),
2222 )
2223 .await?;
2224 crate::data::client::cached_single::try_delete_for_file(&file_path_key);
2225 return Ok(FileUploadResult {
2226 data_map,
2227 chunks_stored: stored,
2228 chunks_failed: 0,
2229 total_chunks: chunk_count,
2230 payment_mode_used: PaymentMode::Single,
2231 storage_cost_atto: sc,
2232 gas_cost_wei: gc,
2233 data_map_address,
2234 chunk_attempts_total: fb_stats.chunk_attempts_total,
2235 store_durations_ms: fb_stats.store_durations_ms,
2236 retries_histogram: fb_stats.retries_histogram,
2237 });
2238 }
2239 Err(e) => return Err(e),
2240 };
2241
2242 let (stored, sc, gc, stats) = self
2243 .upload_waves_merkle(
2244 &spill,
2245 &merkle_plan.to_upload,
2246 &batch_result,
2247 &merkle_plan.already_stored,
2248 progress.as_ref(),
2249 )
2250 .await?;
2251 // Upload succeeded end-to-end; the cached receipt is
2252 // no longer needed.
2253 crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
2254 (stored, PaymentMode::Merkle, sc, gc, stats)
2255 }
2256 } else {
2257 let (stored, sc, gc, stats) = self
2258 .upload_waves_single(&spill, progress.as_ref(), Some(&file_path_key))
2259 .await?;
2260 // Full file success: drop any cached single-node receipt.
2261 crate::data::client::cached_single::try_delete_for_file(&file_path_key);
2262 (stored, PaymentMode::Single, sc, gc, stats)
2263 };
2264
2265 info!(
2266 "File uploaded with {actual_mode:?}: {chunks_stored} chunks stored ({})",
2267 path.display()
2268 );
2269
2270 Ok(FileUploadResult {
2271 data_map,
2272 chunks_stored,
2273 chunks_failed: 0,
2274 total_chunks: chunk_count,
2275 payment_mode_used: actual_mode,
2276 storage_cost_atto,
2277 gas_cost_wei,
2278 data_map_address,
2279 chunk_attempts_total: stats.chunk_attempts_total,
2280 store_durations_ms: stats.store_durations_ms,
2281 retries_histogram: stats.retries_histogram,
2282 })
2283 }
2284
2285 /// Encrypt a file and spill chunks to a temp directory.
2286 ///
2287 /// Logs progress every 100 chunks so users get feedback during
2288 /// multi-GB encryptions.
2289 ///
2290 /// Returns the spill buffer (addresses on disk) and the `DataMap`.
2291 async fn encrypt_file_to_spill(
2292 &self,
2293 path: &Path,
2294 progress: Option<&mpsc::Sender<UploadEvent>>,
2295 ) -> Result<(ChunkSpill, DataMap)> {
2296 let (mut chunk_rx, datamap_rx, handle) = spawn_file_encryption(path.to_path_buf())?;
2297
2298 let mut spill = ChunkSpill::new()?;
2299 while let Some(content) = chunk_rx.recv().await {
2300 spill.push(&content)?;
2301 let chunks_done = spill.len();
2302 if let Some(tx) = progress {
2303 if chunks_done.is_multiple_of(10) {
2304 let _ = tx.send(UploadEvent::Encrypting { chunks_done }).await;
2305 }
2306 }
2307 if chunks_done % 100 == 0 {
2308 let mb = spill.total_bytes() / (1024 * 1024);
2309 info!(
2310 "Encryption progress: {chunks_done} chunks spilled ({mb} MB) — {}",
2311 path.display()
2312 );
2313 }
2314 }
2315
2316 // Await encryption completion to catch errors before paying.
2317 handle
2318 .await
2319 .map_err(|e| Error::Encryption(format!("encryption task panicked: {e}")))?
2320 .map_err(|e| Error::Encryption(format!("encryption failed: {e}")))?;
2321
2322 let data_map = datamap_rx
2323 .await
2324 .map_err(|_| Error::Encryption("no DataMap from encryption thread".to_string()))?;
2325
2326 Ok((spill, data_map))
2327 }
2328
2329 /// Upload chunks from a spill using wave-based per-chunk (single) payments.
2330 ///
2331 /// Reads one wave at a time from disk, prepares quotes, pays, and stores.
2332 /// Peak memory: ~`UPLOAD_WAVE_SIZE × MAX_CHUNK_SIZE` (~256 MB).
2333 ///
2334 /// Returns `(chunks_stored, storage_cost_atto, gas_cost_wei)`.
2335 async fn upload_waves_single(
2336 &self,
2337 spill: &ChunkSpill,
2338 progress: Option<&mpsc::Sender<UploadEvent>>,
2339 resume_key: Option<&str>,
2340 ) -> Result<(usize, String, u128, WaveAggregateStats)> {
2341 self.upload_spill_addresses_single(
2342 spill,
2343 &spill.addresses,
2344 progress,
2345 &[],
2346 spill.len(),
2347 resume_key,
2348 )
2349 .await
2350 }
2351
2352 async fn upload_spill_addresses_single(
2353 &self,
2354 spill: &ChunkSpill,
2355 addresses: &[[u8; 32]],
2356 progress: Option<&mpsc::Sender<UploadEvent>>,
2357 already_stored_addresses: &[[u8; 32]],
2358 total_chunks: usize,
2359 resume_key: Option<&str>,
2360 ) -> Result<(usize, String, u128, WaveAggregateStats)> {
2361 let mut total_stored = already_stored_addresses.len();
2362 let mut total_storage = Amount::ZERO;
2363 let mut total_gas: u128 = 0;
2364 let mut agg_stats = WaveAggregateStats::default();
2365 // A wave whose chunks fall short of quorum after retries must not abort
2366 // the file: its failures are accumulated here and surfaced as a single
2367 // `PartialUpload` only after every wave has been attempted, mirroring
2368 // `upload_waves_merkle`. Aborting on the first failed wave (the old `?`)
2369 // discarded all later waves' progress — already self-encrypted, spilled,
2370 // and in some cases already paid for — converting high per-chunk success
2371 // into 0% per-file success.
2372 // Seed with the addresses a preflight already confirmed stored (e.g.
2373 // the merkle-fallback path passes `merkle_plan.already_stored`), so a
2374 // returned `PartialUpload.stored` lists every stored chunk and
2375 // `stored_count == stored.len()` holds for programmatic callers.
2376 let mut stored_addresses: Vec<[u8; 32]> = already_stored_addresses.to_vec();
2377 let mut failed: Vec<([u8; 32], String)> = Vec::new();
2378 let waves: Vec<&[[u8; 32]]> = addresses.chunks(UPLOAD_WAVE_SIZE).collect();
2379 let wave_count = waves.len();
2380
2381 // Unconditional breadcrumb: lets a clean run confirm the continue-on-
2382 // partial single-node path is in effect (the old path aborted the file
2383 // on the first failed wave instead of continuing across all waves).
2384 info!(
2385 "single-node upload: {} chunk(s) in {wave_count} wave(s) (continue-on-partial)",
2386 addresses.len()
2387 );
2388
2389 for (wave_idx, wave_addrs) in waves.into_iter().enumerate() {
2390 let wave_num = wave_idx + 1;
2391 let wave_data: Vec<Bytes> = wave_addrs
2392 .iter()
2393 .map(|addr| spill.read_chunk(addr))
2394 .collect::<Result<Vec<_>>>()?;
2395
2396 info!(
2397 "Wave {wave_num}/{wave_count}: quoting {} chunks — {total_stored}/{total_chunks} stored so far",
2398 wave_data.len()
2399 );
2400 if let Some(tx) = progress {
2401 let _ = tx
2402 .send(UploadEvent::QuotingChunks {
2403 wave: wave_num,
2404 total_waves: wave_count,
2405 chunks_in_wave: wave_data.len(),
2406 })
2407 .await;
2408 }
2409 // Fold this wave's result. A quorum shortfall (`PartialUpload`) is
2410 // recoverable and its parts are returned to be recorded here;
2411 // genuinely fatal errors propagate via `?` and abort the file, as in
2412 // `upload_waves_merkle`.
2413 let outcome = fold_single_wave(
2414 self.batch_upload_chunks_with_events(
2415 wave_data,
2416 progress,
2417 total_stored,
2418 total_chunks,
2419 resume_key,
2420 )
2421 .await,
2422 )?;
2423
2424 if !outcome.failed.is_empty() {
2425 warn!(
2426 "Wave {wave_num}/{wave_count}: {} chunk(s) failed to store after retries; \
2427 continuing with remaining waves",
2428 outcome.failed.len()
2429 );
2430 }
2431
2432 total_stored += outcome.stored.len();
2433 stored_addresses.extend(outcome.stored);
2434 failed.extend(outcome.failed);
2435 total_storage += outcome.storage_atto;
2436 total_gas = total_gas.saturating_add(outcome.gas_wei);
2437 // Merge per-wave stats (a quorum-short wave contributes none, since
2438 // `PartialUpload` carries no stats).
2439 agg_stats.chunk_attempts_total = agg_stats
2440 .chunk_attempts_total
2441 .saturating_add(outcome.stats.chunk_attempts_total);
2442 agg_stats
2443 .store_durations_ms
2444 .extend(outcome.stats.store_durations_ms);
2445 for (slot, count) in agg_stats
2446 .retries_histogram
2447 .iter_mut()
2448 .zip(outcome.stats.retries_histogram.iter())
2449 {
2450 *slot = slot.saturating_add(*count);
2451 }
2452
2453 if let Some(tx) = progress {
2454 let _ = tx
2455 .send(UploadEvent::WaveComplete {
2456 wave: wave_num,
2457 total_waves: wave_count,
2458 stored_so_far: total_stored,
2459 total: total_chunks,
2460 })
2461 .await;
2462 }
2463 }
2464
2465 // Any chunk still failed after every wave was attempted means the file
2466 // is not fully stored — surface it as `PartialUpload` (never silently
2467 // succeed with missing chunks), carrying the real on-chain spend.
2468 if !failed.is_empty() {
2469 let failed_count = failed.len();
2470 warn!(
2471 "single-node upload incomplete: {failed_count}/{total_chunks} chunks failed after retries"
2472 );
2473 return Err(Error::PartialUpload {
2474 stored: stored_addresses,
2475 stored_count: total_stored,
2476 failed,
2477 failed_count,
2478 total_chunks,
2479 spend: Box::new(PartialUploadSpend {
2480 storage_cost_atto: total_storage.to_string(),
2481 gas_cost_wei: total_gas,
2482 }),
2483 reason: format!("{failed_count} chunk(s) failed to store after retries"),
2484 });
2485 }
2486
2487 Ok((
2488 total_stored,
2489 total_storage.to_string(),
2490 total_gas,
2491 agg_stats,
2492 ))
2493 }
2494
2495 /// Upload chunks from a spill using pre-computed merkle proofs.
2496 ///
2497 /// Reads one wave at a time from disk, pairs each chunk with its proof,
2498 /// and uploads concurrently. Peak memory: ~`UPLOAD_WAVE_SIZE × MAX_CHUNK_SIZE`.
2499 ///
2500 /// A chunk that is transiently short of quorum (`InsufficientPeers`) does
2501 /// **not** abort the file, nor does it block the pipeline: each wave is
2502 /// stored in a **single pass** (no in-wave backoff barrier), and chunks
2503 /// short of quorum are collected into a file-level deferred set rather than
2504 /// retried in place. After the last wave, [`merkle_deferred_retry`] retries
2505 /// the whole deferred set in concurrent rounds ([`DEFERRED_ROUND_DELAYS_SECS`]
2506 /// delays), re-reading each chunk's body from the spill and reusing its
2507 /// proof. This keeps every wave running at full fan-out instead of parking
2508 /// idle slots behind one slow chunk's backoff, while peak memory stays
2509 /// bounded (bodies are re-read from disk, never pinned). Non-quorum errors
2510 /// (e.g. a missing proof) stay fatal and abort immediately.
2511 ///
2512 /// Returns `(chunks_stored, storage_cost_atto, gas_cost_wei)` on success.
2513 /// Costs come from the `batch_result` which was populated during payment.
2514 ///
2515 /// # Errors
2516 ///
2517 /// Returns [`Error::PartialUpload`] if any chunk is still short of quorum
2518 /// after all retries across every wave (other chunks remain stored), or the
2519 /// underlying error for a non-quorum failure.
2520 async fn upload_waves_merkle(
2521 &self,
2522 spill: &ChunkSpill,
2523 addresses: &[[u8; 32]],
2524 batch_result: &MerkleBatchPaymentResult,
2525 already_stored_addresses: &[[u8; 32]],
2526 progress: Option<&mpsc::Sender<UploadEvent>>,
2527 ) -> Result<(usize, String, u128, WaveAggregateStats)> {
2528 let mut total_stored = already_stored_addresses.len();
2529 let total_chunks = total_stored + addresses.len();
2530 let mut stored_addresses: Vec<[u8; 32]> = already_stored_addresses.to_vec();
2531 let mut failed: Vec<([u8; 32], String)> = Vec::new();
2532 // Chunks short of quorum on their single wave pass are collected here and
2533 // retried after the last wave (see `merkle_deferred_retry`), so a slow
2534 // chunk never holds its wave's other slots idle behind a backoff.
2535 let mut deferred: Vec<([u8; 32], String)> = Vec::new();
2536 let mut agg_stats = WaveAggregateStats::default();
2537
2538 // Chunks without a merkle proof were never paid for: a partial
2539 // `pay_for_merkle_multi_batch` result carries proofs only for the
2540 // sub-batches whose on-chain payment succeeded. Such a chunk cannot be
2541 // stored, so record it as failed (surfaced via `PartialUpload` once the
2542 // storable chunks have been attempted) rather than letting its
2543 // "missing proof" error abort the whole file and discard every other
2544 // chunk's progress.
2545 let (to_store, missing_proof) =
2546 partition_addresses_by_proof(addresses, &batch_result.proofs);
2547 if !missing_proof.is_empty() {
2548 warn!(
2549 "{} chunk(s) lack a merkle proof (partial payment); reporting them as failed",
2550 missing_proof.len()
2551 );
2552 for addr in &missing_proof {
2553 failed.push((
2554 *addr,
2555 format!("Missing merkle proof for chunk {}", hex::encode(addr)),
2556 ));
2557 }
2558 }
2559
2560 let waves: Vec<&[[u8; 32]]> = to_store.chunks(UPLOAD_WAVE_SIZE).collect();
2561 let wave_count = waves.len();
2562
2563 let store_limiter = self.controller().store.clone();
2564
2565 // Store one chunk to its (freshly re-collected) close group, reusing the
2566 // chunk's merkle proof. Shared across every retry round so a converged
2567 // routing table yields a fresh group. Only `InsufficientPeers` is
2568 // recoverable; a missing proof stays fatal. Mirrors the external-signer
2569 // path's closure in `merkle_upload_chunks`.
2570 let store_one = |addr: [u8; 32], content: Bytes| {
2571 let limiter = store_limiter.clone();
2572 let proof_bytes = batch_result.proofs.get(&addr).cloned();
2573 async move {
2574 let started = std::time::Instant::now();
2575 let proof = proof_bytes.ok_or_else(|| {
2576 Error::Payment(format!(
2577 "Missing merkle proof for chunk {}",
2578 hex::encode(addr)
2579 ))
2580 })?;
2581 let peers = self.put_target_peers(&addr).await?;
2582 observe_op(
2583 &limiter,
2584 || async move { self.chunk_put_to_close_group(content, proof, &peers).await },
2585 classify_error,
2586 )
2587 .await
2588 .map(|_| started)
2589 }
2590 };
2591
2592 for (wave_idx, wave_addrs) in waves.into_iter().enumerate() {
2593 let wave_num = wave_idx + 1;
2594 let wave = spill.read_wave(wave_addrs)?;
2595
2596 info!(
2597 "Wave {wave_num}/{wave_count}: storing {} chunks (merkle) — {total_stored}/{total_chunks} stored so far",
2598 wave.len()
2599 );
2600
2601 // Clamp fan-out to wave size — partial last wave should
2602 // not pay for extra slots (see PERF-RESULTS.md).
2603 let store_concurrency = store_limiter.current().min(wave.len().max(1));
2604 let chunks: Vec<([u8; 32], Bytes)> = wave
2605 .into_iter()
2606 .map(|(content, addr)| (addr, content))
2607 .collect();
2608
2609 // Store the wave in a SINGLE pass (`max_attempts = 1`, no backoff):
2610 // quorum-short chunks are collected and deferred to a post-wave
2611 // concurrent retry rather than parking this wave's other slots
2612 // behind a backoff. `stored_offset` is the running cumulative count
2613 // so the progress events the driver emits stay correctly numbered
2614 // across waves.
2615 let outcome = merkle_store_with_retry(
2616 chunks,
2617 store_concurrency,
2618 1,
2619 std::time::Duration::ZERO,
2620 progress,
2621 total_stored,
2622 total_chunks,
2623 &store_one,
2624 )
2625 .await?;
2626
2627 // Record this wave's confirmed stores from the explicit set the
2628 // store helper reports. Using that set (rather than inferring
2629 // "wave chunks minus failed") keeps `stored_addresses` correct even
2630 // when a fatal abort leaves some of the wave neither stored nor
2631 // reported short of quorum.
2632 stored_addresses.extend(&outcome.stored_addresses);
2633 total_stored = outcome.stored;
2634
2635 // Merge per-wave stats (durations, attempts, per-round histogram).
2636 agg_stats.chunk_attempts_total = agg_stats
2637 .chunk_attempts_total
2638 .saturating_add(outcome.stats.chunk_attempts_total);
2639 agg_stats
2640 .store_durations_ms
2641 .extend(outcome.stats.store_durations_ms);
2642 for (slot, count) in agg_stats
2643 .retries_histogram
2644 .iter_mut()
2645 .zip(outcome.stats.retries_histogram.iter())
2646 {
2647 *slot = slot.saturating_add(*count);
2648 }
2649
2650 if let Some(e) = outcome.fatal {
2651 // A non-quorum store error is fatal (missing proofs were
2652 // filtered out above, so this is a genuine network/store
2653 // failure). Preserve every chunk stored so far — including this
2654 // wave's same-pass successes — and report every not-stored chunk
2655 // as failed, so the `PartialUpload` counts are accurate rather
2656 // than omitting same-wave stores and under-counting failures.
2657 warn!("merkle wave {wave_num}/{wave_count} aborted: {e}");
2658 // Best per-chunk messages we already have: missing-proof, this
2659 // wave's shortfalls, and earlier waves' deferred shortfalls.
2660 let mut known_failed = failed;
2661 known_failed.extend(outcome.failed_addresses);
2662 known_failed.extend(std::mem::take(&mut deferred));
2663 return Err(partial_upload_after_fatal(
2664 addresses,
2665 stored_addresses,
2666 total_stored,
2667 total_chunks,
2668 known_failed,
2669 PartialUploadSpend {
2670 storage_cost_atto: batch_result.storage_cost_atto.clone(),
2671 gas_cost_wei: batch_result.gas_cost_wei,
2672 },
2673 format!("merkle chunk store aborted: {e}"),
2674 ));
2675 }
2676
2677 // Non-fatal: this wave's quorum-short chunks are deferred (not failed
2678 // yet) for the post-wave concurrent retry. A deferred chunk joins
2679 // `stored_addresses` only if/when a later round stores it.
2680 deferred.extend(outcome.failed_addresses);
2681
2682 if let Some(tx) = progress {
2683 let _ = tx
2684 .send(UploadEvent::WaveComplete {
2685 wave: wave_num,
2686 total_waves: wave_count,
2687 stored_so_far: total_stored,
2688 total: total_chunks,
2689 })
2690 .await;
2691 }
2692 }
2693
2694 // The wave passes never blocked on backoff; now retry the whole
2695 // file-level deferred set in concurrent rounds. Bodies are re-read from
2696 // the spill at retry time (peak RAM unchanged) and proofs are re-attached
2697 // by `store_one`. Chunks still short after the final round become
2698 // `failed`; a non-quorum error aborts as `PartialUpload`.
2699 if !deferred.is_empty() {
2700 info!(
2701 "Deferring {} merkle chunk(s) short of quorum for concurrent retry after final wave",
2702 deferred.len()
2703 );
2704 let dr = merkle_deferred_retry(
2705 deferred,
2706 &DEFERRED_ROUND_DELAYS_SECS,
2707 // Read and store at most one wave's worth of bodies at a time so
2708 // the deferred path keeps the wave path's ~256 MB peak-memory
2709 // bound regardless of how many chunks were deferred file-wide.
2710 UPLOAD_WAVE_SIZE,
2711 |addrs: &[[u8; 32]]| {
2712 spill.read_wave(addrs).map(|wave| {
2713 wave.into_iter()
2714 .map(|(content, addr)| (addr, content))
2715 .collect()
2716 })
2717 },
2718 |n: usize| store_limiter.current().min(n.max(1)),
2719 progress,
2720 total_stored,
2721 total_chunks,
2722 &store_one,
2723 )
2724 .await?;
2725
2726 stored_addresses.extend(dr.stored_addresses);
2727 total_stored = dr.stored;
2728
2729 // Merge the deferred pass's stats — its histogram is already mapped
2730 // to the right per-round slots — into the file aggregate.
2731 agg_stats.chunk_attempts_total = agg_stats
2732 .chunk_attempts_total
2733 .saturating_add(dr.stats.chunk_attempts_total);
2734 agg_stats
2735 .store_durations_ms
2736 .extend(dr.stats.store_durations_ms);
2737 for (slot, count) in agg_stats
2738 .retries_histogram
2739 .iter_mut()
2740 .zip(dr.stats.retries_histogram.iter())
2741 {
2742 *slot = slot.saturating_add(*count);
2743 }
2744
2745 if let Some(reason) = dr.fatal {
2746 // A non-quorum store error during a deferred round is fatal, the
2747 // same as in the wave path: preserve everything stored so far and
2748 // report every not-stored chunk as failed.
2749 warn!("merkle deferred retry aborted: {reason}");
2750 let mut known_failed = failed;
2751 known_failed.extend(dr.failed_addresses);
2752 return Err(partial_upload_after_fatal(
2753 addresses,
2754 stored_addresses,
2755 total_stored,
2756 total_chunks,
2757 known_failed,
2758 PartialUploadSpend {
2759 storage_cost_atto: batch_result.storage_cost_atto.clone(),
2760 gas_cost_wei: batch_result.gas_cost_wei,
2761 },
2762 format!("merkle chunk store aborted: {reason}"),
2763 ));
2764 }
2765 failed.extend(dr.failed_addresses);
2766 }
2767
2768 // A file with any permanently-failed chunk is not fully stored — surface
2769 // it as `PartialUpload`, but only after the single wave pass and every
2770 // deferred retry round are exhausted (never silently succeed with
2771 // missing chunks).
2772 if !failed.is_empty() {
2773 let failed_count = failed.len();
2774 let total_attempts = 1 + DEFERRED_ROUND_DELAYS_SECS.len();
2775 warn!(
2776 "merkle upload incomplete: {failed_count}/{total_chunks} chunks short of quorum after retries"
2777 );
2778 return Err(Error::PartialUpload {
2779 stored: stored_addresses,
2780 stored_count: total_stored,
2781 failed,
2782 failed_count,
2783 total_chunks,
2784 spend: Box::new(PartialUploadSpend {
2785 storage_cost_atto: batch_result.storage_cost_atto.clone(),
2786 gas_cost_wei: batch_result.gas_cost_wei,
2787 }),
2788 reason: format!(
2789 "{failed_count} chunk(s) short of quorum after {total_attempts} attempts"
2790 ),
2791 });
2792 }
2793
2794 Ok((
2795 total_stored,
2796 batch_result.storage_cost_atto.clone(),
2797 batch_result.gas_cost_wei,
2798 agg_stats,
2799 ))
2800 }
2801
2802 /// Download and decrypt a file from the network, writing it to disk.
2803 ///
2804 /// Uses `streaming_decrypt` so that only one batch of chunks lives in
2805 /// memory at a time, avoiding OOM on large files. Chunks are fetched
2806 /// concurrently within each batch, then decrypted data is written to
2807 /// disk incrementally.
2808 ///
2809 /// Returns the number of bytes written.
2810 ///
2811 /// # Panics
2812 ///
2813 /// Requires a multi-threaded Tokio runtime (`flavor = "multi_thread"`).
2814 /// Will panic if called from a `current_thread` runtime because
2815 /// `streaming_decrypt` takes a synchronous callback that must bridge
2816 /// back to async via `block_in_place`.
2817 ///
2818 /// # Errors
2819 ///
2820 /// Returns an error if any chunk cannot be retrieved, decryption fails,
2821 /// or the file cannot be written.
2822 pub async fn file_download(&self, data_map: &DataMap, output: &Path) -> Result<u64> {
2823 self.file_download_with_progress(data_map, output, None)
2824 .await
2825 }
2826
2827 /// Download and decrypt a file, trying the requested number of
2828 /// closest peers for every chunk fetch.
2829 ///
2830 /// Returns the number of bytes written.
2831 ///
2832 /// # Errors
2833 ///
2834 /// Returns an error if any chunk cannot be retrieved, decryption fails,
2835 /// or the file cannot be written.
2836 pub async fn file_download_from_closest_peers(
2837 &self,
2838 data_map: &DataMap,
2839 output: &Path,
2840 peer_count: NonZeroUsize,
2841 ) -> Result<u64> {
2842 self.file_download_with_progress_using_peer_count(data_map, output, None, peer_count.get())
2843 .await
2844 }
2845
2846 /// Download and decrypt a file with progress events, trying the
2847 /// requested number of closest peers for every chunk fetch.
2848 ///
2849 /// Same as [`Client::file_download_from_closest_peers`] but sends
2850 /// [`DownloadEvent`]s for UI feedback.
2851 ///
2852 /// # Errors
2853 ///
2854 /// Returns an error if any chunk cannot be retrieved, decryption fails,
2855 /// or the file cannot be written.
2856 pub async fn file_download_with_progress_from_closest_peers(
2857 &self,
2858 data_map: &DataMap,
2859 output: &Path,
2860 progress: Option<mpsc::Sender<DownloadEvent>>,
2861 peer_count: NonZeroUsize,
2862 ) -> Result<u64> {
2863 self.file_download_with_progress_using_peer_count(
2864 data_map,
2865 output,
2866 progress,
2867 peer_count.get(),
2868 )
2869 .await
2870 }
2871
2872 /// Download and decrypt a file with peer-health diagnostics.
2873 ///
2874 /// Each file chunk is fetched by querying every selected closest peer,
2875 /// not by returning after the first successful peer. The returned report
2876 /// records which peers had each chunk and which did not. DataMap
2877 /// resolution still uses the normal early-return fetch path; diagnostics
2878 /// are for file chunks only.
2879 ///
2880 /// # Errors
2881 ///
2882 /// Returns an error if any chunk cannot be retrieved, decryption fails,
2883 /// or the file cannot be written.
2884 pub async fn file_download_with_peer_report_from_closest_peers(
2885 &self,
2886 data_map: &DataMap,
2887 output: &Path,
2888 progress: Option<mpsc::Sender<DownloadEvent>>,
2889 peer_count: NonZeroUsize,
2890 ) -> Result<FileDownloadWithPeerReport> {
2891 let chunk_reports = Arc::new(Mutex::new(Vec::new()));
2892 let bytes_written = self
2893 .file_download_with_progress_using_peer_count_and_reports(
2894 data_map,
2895 output,
2896 progress,
2897 peer_count.get(),
2898 Some(chunk_reports.clone()),
2899 )
2900 .await?;
2901
2902 let chunk_reports = chunk_reports
2903 .lock()
2904 .map_err(|_| Error::Storage("file chunk peer report lock poisoned".to_string()))?
2905 .clone();
2906 let chunk_reports = file_chunk_reports_from_recorded_sweeps(chunk_reports);
2907
2908 Ok(FileDownloadWithPeerReport {
2909 bytes_written,
2910 chunk_reports,
2911 })
2912 }
2913
2914 async fn download_fetch_file_chunk(
2915 &self,
2916 idx: usize,
2917 hash: XorName,
2918 context: FileDownloadFetchContext,
2919 is_deferred_retry: bool,
2920 attempt: usize,
2921 ) -> std::result::Result<DownloadBatchEntry, self_encryption::Error> {
2922 let addr = hash.0;
2923 let addr_hex = hex::encode(addr);
2924
2925 let chunk_content = if let Some(peer_reports) = context.peer_reports {
2926 match self
2927 .chunk_get_from_closest_peer_group(&addr, context.peer_count)
2928 .await
2929 {
2930 Ok(results) => {
2931 let (content, sweep) = file_chunk_sweep_report_from_peer_results(
2932 attempt,
2933 is_deferred_retry,
2934 &results,
2935 );
2936 peer_reports
2937 .lock()
2938 .map_err(|_| {
2939 self_encryption::Error::Generic(
2940 "file chunk peer report lock poisoned".to_string(),
2941 )
2942 })?
2943 .push(RecordedFileChunkPeerSweep {
2944 index: idx + 1,
2945 address: addr,
2946 sweep,
2947 });
2948 content
2949 }
2950 Err(e) => {
2951 if is_deferred_retry {
2952 info!(
2953 "Deferred all-peer retry for {addr_hex} hit transient error: {e}; re-deferring"
2954 );
2955 } else {
2956 info!("First-pass all-peer fetch error for {addr_hex}: {e}; deferring");
2957 }
2958 peer_reports
2959 .lock()
2960 .map_err(|_| {
2961 self_encryption::Error::Generic(
2962 "file chunk peer report lock poisoned".to_string(),
2963 )
2964 })?
2965 .push(RecordedFileChunkPeerSweep {
2966 index: idx + 1,
2967 address: addr,
2968 sweep: file_chunk_sweep_report_from_error(
2969 attempt,
2970 is_deferred_retry,
2971 &e,
2972 ),
2973 });
2974 None
2975 }
2976 }
2977 } else {
2978 match self
2979 .chunk_get_observed_from_closest_peers(&addr, context.peer_count)
2980 .await
2981 {
2982 Ok(Some(chunk)) => Some(chunk.content),
2983 Ok(None) => None,
2984 Err(e) => {
2985 if is_deferred_retry {
2986 info!(
2987 "Deferred retry for {addr_hex} hit transient error: {e}; re-deferring"
2988 );
2989 } else {
2990 info!("First-pass fetch error for {addr_hex}: {e}; deferring");
2991 }
2992 None
2993 }
2994 }
2995 };
2996
2997 let Some(content) = chunk_content else {
2998 return Ok((idx, Err(hash)));
2999 };
3000
3001 let fetched = context
3002 .fetched_ref
3003 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
3004 + 1;
3005 if is_deferred_retry {
3006 info!(
3007 "Downloaded {fetched}/{} (deferred retry)",
3008 context.total_chunks
3009 );
3010 } else {
3011 let total_chunks = context.total_chunks;
3012 info!("Downloaded {fetched}/{total_chunks}");
3013 }
3014 if let Some(ref tx) = context.progress_ref {
3015 let _ = tx.try_send(DownloadEvent::ChunksFetched {
3016 fetched,
3017 total: context.total_chunks,
3018 });
3019 }
3020
3021 Ok((idx, Ok(content)))
3022 }
3023
3024 /// Shared download core: resolve the DataMap, then fetch + streaming-decrypt
3025 /// the file one batch at a time, handing each decrypted plaintext segment
3026 /// (in order) to `on_chunk`. Constant memory — only one decrypt batch is
3027 /// resident at a time. Returns the total plaintext bytes produced.
3028 ///
3029 /// `on_chunk` is async so a sink can apply backpressure (e.g. a bounded
3030 /// channel). Driving the decrypt iterator runs the batched chunk fetch via
3031 /// `block_in_place`, so this requires a multi-threaded Tokio runtime.
3032 ///
3033 /// Every chunk fetch tries `peer_count` closest peers.
3034 ///
3035 /// Progress reporting (via `progress`):
3036 /// 1. Resolves hierarchical DataMaps to the root level first (reports as
3037 /// `ChunksFetched` with `total: 0` during resolution)
3038 /// 2. Once the root DataMap is known, sends `total_chunks` with accurate count
3039 /// 3. Fetches data chunks with accurate `fetched/total` progress
3040 async fn download_decrypted_chunks<F, Fut>(
3041 &self,
3042 data_map: &DataMap,
3043 progress: Option<mpsc::Sender<DownloadEvent>>,
3044 peer_count: usize,
3045 peer_reports: Option<Arc<Mutex<Vec<RecordedFileChunkPeerSweep>>>>,
3046 mut on_chunk: F,
3047 ) -> Result<u64>
3048 where
3049 F: FnMut(Bytes) -> Fut,
3050 Fut: std::future::Future<Output = Result<()>>,
3051 {
3052 let handle = Handle::current();
3053
3054 // Phase 1: Resolve hierarchical DataMap to root level.
3055 // This fetches child DataMap chunks (typically 3) to discover the real chunk count.
3056 let root_map = if data_map.is_child() {
3057 let dm_chunks = data_map.len();
3058 if let Some(ref tx) = progress {
3059 let _ = tx.try_send(DownloadEvent::ResolvingDataMap {
3060 total_map_chunks: dm_chunks,
3061 });
3062 }
3063
3064 let resolve_progress = progress.clone();
3065 let resolve_counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
3066
3067 let resolved = tokio::task::block_in_place(|| {
3068 let counter_ref = resolve_counter.clone();
3069 let progress_ref = resolve_progress.clone();
3070 let fetch_limiter = self.controller().fetch.clone();
3071 let fetch = |batch: &[(usize, XorName)]| {
3072 let batch_owned: Vec<(usize, XorName)> = batch.to_vec();
3073 let counter = counter_ref.clone();
3074 let prog = progress_ref.clone();
3075 let limiter = fetch_limiter.clone();
3076 handle.block_on(async {
3077 // Use rebucketed_unordered so the in-flight cap
3078 // is re-read from the limiter as each slot frees.
3079 // `buffer_unordered` snapshots the cap once at
3080 // pipeline build, which means observe_op
3081 // signals from inside chunk_get cannot reduce
3082 // concurrency on the current batch — exactly
3083 // the case where load-shedding is needed.
3084 let mut results = rebucketed_unordered(
3085 &limiter,
3086 batch_owned,
3087 |(idx, hash): (usize, XorName)| {
3088 let counter = counter.clone();
3089 let prog = prog.clone();
3090 async move {
3091 let addr = hash.0;
3092 // chunk_get_observed feeds the
3093 // adaptive fetch limiter once per
3094 // call via chunk_get_outcome
3095 // (Ok(None) -> Timeout is the
3096 // load-shedding signal for
3097 // sustained close-group exhaustion).
3098 let chunk = self
3099 .chunk_get_observed_from_closest_peers(&addr, peer_count)
3100 .await
3101 .map_err(|e| {
3102 self_encryption::Error::Generic(format!(
3103 "DataMap resolution failed: {e}"
3104 ))
3105 })?
3106 .ok_or_else(|| {
3107 self_encryption::Error::Generic(format!(
3108 "DataMap chunk not found: {}",
3109 hex::encode(addr)
3110 ))
3111 })?;
3112 let fetched = counter
3113 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
3114 + 1;
3115 if let Some(ref tx) = prog {
3116 let _ =
3117 tx.try_send(DownloadEvent::MapChunkFetched { fetched });
3118 }
3119 Ok::<_, self_encryption::Error>((idx, chunk.content))
3120 }
3121 },
3122 )
3123 .await?;
3124 // CRITICAL: self_encryption::get_root_data_map_parallel
3125 // pairs the returned Vec POSITIONALLY with the input
3126 // hashes via .zip() and discards our idx field.
3127 // rebucketed_unordered preserves first-completion
3128 // order, so sort by idx to restore input order
3129 // before returning.
3130 results.sort_by_key(|(idx, _)| *idx);
3131 Ok(results)
3132 })
3133 };
3134 get_root_data_map_parallel(data_map.clone(), &fetch)
3135 })
3136 .map_err(|e| Error::Encryption(format!("DataMap resolution failed: {e}")))?;
3137
3138 info!(
3139 "Resolved hierarchical DataMap: {} data chunks",
3140 resolved.len()
3141 );
3142 resolved
3143 } else {
3144 data_map.clone()
3145 };
3146
3147 // Phase 2: Now we know the real chunk count.
3148 let total_chunks = root_map.len();
3149 if let Some(ref tx) = progress {
3150 let _ = tx.try_send(DownloadEvent::DataMapResolved { total_chunks });
3151 }
3152
3153 // Phase 3: Fetch and decrypt data chunks with accurate progress.
3154 let fetched_counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
3155 let fetched_for_closure = fetched_counter.clone();
3156 let progress_for_closure = progress.clone();
3157 let peer_reports_for_closure = peer_reports.clone();
3158
3159 let fetch_limiter_outer = self.controller().fetch.clone();
3160 let usable_memory = usable_memory_bytes();
3161 let configured_batch_floor = stream_decrypt_batch_size();
3162 let fetch_cap = fetch_limiter_outer.current();
3163 let decrypt_batch_size = adaptive_stream_decrypt_batch_size(
3164 total_chunks,
3165 fetch_cap,
3166 configured_batch_floor,
3167 usable_memory,
3168 );
3169 info!(
3170 total_chunks,
3171 fetch_cap,
3172 configured_batch_floor,
3173 ?usable_memory,
3174 decrypt_batch_size,
3175 "Selected adaptive stream decrypt batch size"
3176 );
3177
3178 let stream = streaming_decrypt_with_batch_size(
3179 &root_map,
3180 |batch: &[(usize, XorName)]| {
3181 let batch_owned: Vec<(usize, XorName)> = batch.to_vec();
3182 let fetch_context = FileDownloadFetchContext {
3183 total_chunks,
3184 peer_count,
3185 fetched_ref: fetched_for_closure.clone(),
3186 progress_ref: progress_for_closure.clone(),
3187 peer_reports: peer_reports_for_closure.clone(),
3188 };
3189 let fetch_limiter = fetch_limiter_outer.clone();
3190
3191 tokio::task::block_in_place(|| {
3192 handle.block_on(async {
3193 // First pass: try every chunk in the batch. Normal mode
3194 // uses chunk_get_observed (early-return after a found
3195 // peer); diagnostic mode asks every selected closest
3196 // peer and records that sweep before returning bytes.
3197 // Any missing chunk or transient fetch error is encoded
3198 // as Err(hash), so one noisy chunk does not abort the
3199 // whole batch before the deferred retry rounds run.
3200 let first_fetch_context = fetch_context.clone();
3201 let raw: Vec<DownloadBatchEntry> = rebucketed_unordered(
3202 &fetch_limiter,
3203 batch_owned,
3204 |(idx, hash): (usize, XorName)| {
3205 let fetch_context = first_fetch_context.clone();
3206 async move {
3207 self.download_fetch_file_chunk(
3208 idx,
3209 hash,
3210 fetch_context,
3211 false,
3212 FIRST_DIAGNOSTIC_FETCH_ATTEMPT,
3213 )
3214 .await
3215 }
3216 },
3217 )
3218 .await?;
3219
3220 // Partition: things we already have vs the
3221 // deferred set we need to retry.
3222 let mut results: Vec<(usize, bytes::Bytes)> = Vec::new();
3223 let mut deferred: Vec<(usize, XorName)> = Vec::new();
3224 for (idx, inner) in raw {
3225 match inner {
3226 Ok(bytes) => results.push((idx, bytes)),
3227 Err(hash) => deferred.push((idx, hash)),
3228 }
3229 }
3230
3231 // Deferred retry pass: retry the deferred chunks
3232 // in CONCURRENT rounds (reusing the fetch
3233 // limiter's cap), not serially. The first round
3234 // fires immediately — most deferrals on a
3235 // healthy-but-lossy link are peer-side noise
3236 // that clears in well under a second, and
3237 // serializing them behind mandatory multi-second
3238 // sleeps was the single biggest throughput sink
3239 // on such links (a batch deferring ~20 chunks
3240 // burned minutes of near-zero throughput even
3241 // though every chunk succeeded on its first
3242 // retry). Only chunks that survive a round get a
3243 // longer back-off before the next, so genuine
3244 // saturation still gets time to settle.
3245 if !deferred.is_empty() {
3246 // Round delays in seconds. Round 0 is
3247 // immediate; later rounds back off to ride
3248 // out sustained saturation.
3249 const DEFERRED_ROUND_DELAYS_SECS: [u64; 3] = [0, 15, 45];
3250 info!(
3251 "Deferring {} chunk(s) for concurrent retry after batch settles",
3252 deferred.len()
3253 );
3254 let mut remaining = deferred;
3255 for (round, &delay_secs) in
3256 DEFERRED_ROUND_DELAYS_SECS.iter().enumerate()
3257 {
3258 if remaining.is_empty() {
3259 break;
3260 }
3261 if delay_secs > 0 {
3262 tokio::time::sleep(std::time::Duration::from_secs(delay_secs))
3263 .await;
3264 }
3265 info!(
3266 "Deferred retry round {}/{}: {} chunk(s)",
3267 round + 1,
3268 DEFERRED_ROUND_DELAYS_SECS.len(),
3269 remaining.len(),
3270 );
3271 let round_input = std::mem::take(&mut remaining);
3272 let retry_fetch_context = fetch_context.clone();
3273 let round_results: Vec<DownloadBatchEntry> = rebucketed_unordered(
3274 &fetch_limiter,
3275 round_input,
3276 |(idx, hash): (usize, XorName)| {
3277 let fetch_context = retry_fetch_context.clone();
3278 async move {
3279 self.download_fetch_file_chunk(
3280 idx,
3281 hash,
3282 fetch_context,
3283 true,
3284 round + DEFERRED_RETRY_ATTEMPT_OFFSET,
3285 )
3286 .await
3287 }
3288 },
3289 )
3290 .await?;
3291 for (idx, inner) in round_results {
3292 match inner {
3293 Ok(bytes) => results.push((idx, bytes)),
3294 Err(hash) => remaining.push((idx, hash)),
3295 }
3296 }
3297 }
3298 if let Some((_, hash)) = remaining.first() {
3299 return Err(self_encryption::Error::Generic(format!(
3300 "Chunk not found after {} deferred retry rounds: {}",
3301 DEFERRED_ROUND_DELAYS_SECS.len(),
3302 hex::encode(hash.0),
3303 )));
3304 }
3305 }
3306
3307 // streaming_decrypt itself sort_by_keys before
3308 // zipping, but the same closure is also passed
3309 // through get_root_data_map_parallel internally
3310 // (see self_encryption::stream_decrypt.rs::new), and
3311 // THAT path zips positionally without sorting. Sort
3312 // here so both consumers see input order.
3313 results.sort_by_key(|(idx, _)| *idx);
3314 Ok(results)
3315 })
3316 })
3317 },
3318 decrypt_batch_size,
3319 )
3320 .map_err(|e| Error::Encryption(format!("streaming decrypt failed: {e}")))?;
3321
3322 // Drive the iterator (each `next()` runs the batched fetch via
3323 // block_in_place) and hand each decrypted segment to the sink in
3324 // order. Awaiting the sink between items yields back to the runtime so
3325 // a bounded sink can apply backpressure.
3326 let mut bytes_total = 0u64;
3327 for chunk_result in stream {
3328 let chunk: Bytes =
3329 chunk_result.map_err(|e| Error::Encryption(format!("decryption failed: {e}")))?;
3330 bytes_total += chunk.len() as u64;
3331 on_chunk(chunk).await?;
3332 }
3333 Ok(bytes_total)
3334 }
3335
3336 /// Download and decrypt a file to disk, with optional progress events.
3337 ///
3338 /// Same as [`Client::file_download`] but sends [`DownloadEvent`]s for UI
3339 /// feedback. Streams to a temp file (one decrypt batch resident at a time)
3340 /// and renames atomically on success. A `TempDownload` guard removes the
3341 /// staging file on any error path, including a panic.
3342 pub async fn file_download_with_progress(
3343 &self,
3344 data_map: &DataMap,
3345 output: &Path,
3346 progress: Option<mpsc::Sender<DownloadEvent>>,
3347 ) -> Result<u64> {
3348 self.file_download_with_progress_using_peer_count(
3349 data_map,
3350 output,
3351 progress,
3352 self.config().close_group_size,
3353 )
3354 .await
3355 }
3356
3357 /// Download and decrypt a file to disk with progress events, trying
3358 /// `peer_count` closest peers for every chunk fetch.
3359 ///
3360 /// Streams to a temp file (one decrypt batch resident at a time) and
3361 /// renames atomically on success.
3362 async fn file_download_with_progress_using_peer_count(
3363 &self,
3364 data_map: &DataMap,
3365 output: &Path,
3366 progress: Option<mpsc::Sender<DownloadEvent>>,
3367 peer_count: usize,
3368 ) -> Result<u64> {
3369 self.file_download_with_progress_using_peer_count_and_reports(
3370 data_map, output, progress, peer_count, None,
3371 )
3372 .await
3373 }
3374
3375 async fn file_download_with_progress_using_peer_count_and_reports(
3376 &self,
3377 data_map: &DataMap,
3378 output: &Path,
3379 progress: Option<mpsc::Sender<DownloadEvent>>,
3380 peer_count: usize,
3381 peer_reports: Option<Arc<Mutex<Vec<RecordedFileChunkPeerSweep>>>>,
3382 ) -> Result<u64> {
3383 debug!("Downloading file to {}", output.display());
3384
3385 let parent = output.parent().unwrap_or_else(|| Path::new("."));
3386 let unique: u64 = rand::random();
3387 let tmp_path = parent.join(format!(".ant_download_{}_{unique}.tmp", std::process::id()));
3388
3389 // Guard removes the staging file on any early return OR a panic unwind
3390 // out of the `block_in_place` decrypt loop; defused only by a
3391 // successful commit(). Centralizes what used to be three duplicated
3392 // cleanup arms.
3393 let tmp = TempDownload::new(tmp_path);
3394 let mut file = std::fs::File::create(tmp.path())?;
3395
3396 let bytes_written = self
3397 .download_decrypted_chunks(data_map, progress, peer_count, peer_reports, |bytes| {
3398 let r = file.write_all(&bytes).map_err(Error::from);
3399 std::future::ready(r)
3400 })
3401 .await?;
3402 file.flush()?;
3403 drop(file); // close the handle before rename (Windows won't rename an open file)
3404
3405 tmp.commit(output)?;
3406 info!(
3407 "File downloaded: {bytes_written} bytes written to {}",
3408 output.display()
3409 );
3410 Ok(bytes_written)
3411 }
3412
3413 /// Download and decrypt a file, streaming the plaintext to `sink` instead
3414 /// of writing to disk.
3415 ///
3416 /// Constant memory (one decrypt batch resident at a time); the caller
3417 /// receives bytes progressively as each batch decrypts, suitable for
3418 /// forwarding to an HTTP chunked body or a gRPC response stream. The
3419 /// bounded `sink` applies backpressure. If the receiver is dropped (e.g.
3420 /// the client disconnected) the download stops early and returns
3421 /// [`Error::Cancelled`].
3422 ///
3423 /// The channel item type is `Result<Bytes, Error>`, so the caller sets up:
3424 ///
3425 /// ```ignore
3426 /// let (tx, rx) = tokio::sync::mpsc::channel::<Result<Bytes, Error>>(8);
3427 /// ```
3428 ///
3429 /// Typically the caller `tokio::spawn`s this and converts the matching
3430 /// `Receiver` into its response stream. Requires a multi-threaded Tokio
3431 /// runtime (the decrypt iterator uses `block_in_place`).
3432 pub async fn file_download_to_sender(
3433 &self,
3434 data_map: &DataMap,
3435 sink: mpsc::Sender<std::result::Result<Bytes, Error>>,
3436 progress: Option<mpsc::Sender<DownloadEvent>>,
3437 ) -> Result<u64> {
3438 let peer_count = self.config().close_group_size;
3439 self.download_decrypted_chunks(data_map, progress, peer_count, None, |bytes| {
3440 let sink = sink.clone();
3441 async move {
3442 sink.send(Ok(bytes))
3443 .await
3444 .map_err(|_| Error::Cancelled("download stream receiver dropped".into()))
3445 }
3446 })
3447 .await
3448 }
3449}
3450
3451#[cfg(test)]
3452#[allow(clippy::unwrap_used)]
3453mod tests {
3454 use super::*;
3455
3456 #[test]
3457 fn distributed_sample_indices_spreads_across_large_file() {
3458 // cap 5 over 100 chunks: first and last included, evenly spread.
3459 assert_eq!(distributed_sample_indices(100, 5), vec![0, 24, 49, 74, 99]);
3460 }
3461
3462 #[test]
3463 fn distributed_sample_indices_covers_whole_small_file() {
3464 // total <= cap returns every index, preserving the exact
3465 // "whole file sampled" detection in estimate_upload_cost.
3466 assert_eq!(distributed_sample_indices(3, 5), vec![0, 1, 2]);
3467 assert_eq!(distributed_sample_indices(5, 5), vec![0, 1, 2, 3, 4]);
3468 }
3469
3470 #[test]
3471 fn distributed_sample_indices_is_in_range_and_increasing() {
3472 assert!(distributed_sample_indices(0, 5).is_empty());
3473 assert_eq!(distributed_sample_indices(1, 5), vec![0]);
3474 for total in 1..200usize {
3475 let idx = distributed_sample_indices(total, 5);
3476 assert_eq!(*idx.first().unwrap(), 0);
3477 assert_eq!(*idx.last().unwrap(), total - 1);
3478 assert!(idx.iter().all(|&i| i < total));
3479 assert!(idx.windows(2).all(|w| w[0] < w[1]));
3480 }
3481 }
3482
3483 #[test]
3484 fn disk_space_check_passes_for_small_file() {
3485 // A 1 KB file should always pass the disk space check
3486 check_disk_space_for_spill(1024).unwrap();
3487 }
3488
3489 #[test]
3490 fn disk_space_check_fails_for_absurd_size() {
3491 // Requesting space for a 1 exabyte file should fail on any real system
3492 let result = check_disk_space_for_spill(u64::MAX / 2);
3493 assert!(result.is_err());
3494 let err = result.unwrap_err();
3495 assert!(
3496 matches!(err, Error::InsufficientDiskSpace(_)),
3497 "expected InsufficientDiskSpace, got: {err}"
3498 );
3499 }
3500
3501 #[test]
3502 fn adaptive_stream_decrypt_batch_size_tracks_fetch_headroom() {
3503 let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, Some(u64::MAX));
3504
3505 assert_eq!(batch_size, 64 * DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER);
3506 }
3507
3508 #[test]
3509 fn adaptive_stream_decrypt_batch_size_caps_to_total_chunks() {
3510 let batch_size = adaptive_stream_decrypt_batch_size(12, 64, 10, Some(u64::MAX));
3511
3512 assert_eq!(batch_size, 12);
3513 }
3514
3515 #[test]
3516 fn adaptive_stream_decrypt_batch_size_honours_configured_floor() {
3517 let batch_size = adaptive_stream_decrypt_batch_size(1_000, 1, 32, None);
3518
3519 assert_eq!(batch_size, 32);
3520 }
3521
3522 #[test]
3523 fn adaptive_stream_decrypt_batch_size_does_not_expand_without_memory_reading() {
3524 let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, None);
3525
3526 assert_eq!(batch_size, 10);
3527 }
3528
3529 #[test]
3530 fn adaptive_stream_decrypt_batch_size_caps_to_memory_budget() {
3531 let estimated_bytes_per_chunk = (self_encryption::MAX_CHUNK_SIZE as u64)
3532 .saturating_mul(DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER)
3533 .max(1);
3534 let usable_memory = estimated_bytes_per_chunk
3535 .saturating_mul(16)
3536 .saturating_mul(DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR);
3537 let batch_size = adaptive_stream_decrypt_batch_size(1_000, 256, 10, Some(usable_memory));
3538
3539 assert_eq!(batch_size, 16);
3540 }
3541
3542 #[test]
3543 fn adaptive_stream_decrypt_batch_size_keeps_one_chunk_when_memory_is_tight() {
3544 let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, Some(1));
3545
3546 assert_eq!(batch_size, 1);
3547 }
3548
3549 #[test]
3550 fn cached_merkle_covers_only_when_all_addresses_have_proofs() {
3551 let covered = compute_address(&Bytes::from_static(b"covered"));
3552 let extra = compute_address(&Bytes::from_static(b"extra"));
3553 let missing = compute_address(&Bytes::from_static(b"missing"));
3554 let cached = MerkleBatchPaymentResult {
3555 proofs: HashMap::from([(covered, vec![1]), (extra, vec![2])]),
3556 chunk_count: 2,
3557 storage_cost_atto: "0".to_string(),
3558 gas_cost_wei: 0,
3559 merkle_payment_timestamp: 0,
3560 };
3561
3562 assert!(cached_merkle_covers_addresses(&cached, &[covered]));
3563 assert!(cached_merkle_covers_addresses(&cached, &[covered, extra]));
3564 assert!(!cached_merkle_covers_addresses(
3565 &cached,
3566 &[covered, missing]
3567 ));
3568 }
3569
3570 /// A partial merkle payment leaves some addresses without a proof. Those
3571 /// must be split out so `upload_waves_merkle` reports them as failed
3572 /// (`PartialUpload`) instead of aborting the whole file — preserving the
3573 /// addresses' original order in each group.
3574 #[test]
3575 fn partition_addresses_by_proof_splits_paid_and_unpaid() {
3576 let paid_a = [1u8; 32];
3577 let unpaid_b = [2u8; 32];
3578 let paid_c = [3u8; 32];
3579 let unpaid_d = [4u8; 32];
3580 let proofs: HashMap<[u8; 32], Vec<u8>> =
3581 HashMap::from([(paid_a, vec![0xaa]), (paid_c, vec![0xcc])]);
3582
3583 let (to_store, missing) =
3584 partition_addresses_by_proof(&[paid_a, unpaid_b, paid_c, unpaid_d], &proofs);
3585
3586 assert_eq!(to_store, vec![paid_a, paid_c]);
3587 assert_eq!(missing, vec![unpaid_b, unpaid_d]);
3588 }
3589
3590 /// A wave that returns `Ok` contributes its stored chunks, parsed cost, and
3591 /// stats; nothing is recorded as failed.
3592 #[test]
3593 fn fold_single_wave_keeps_ok_wave() {
3594 let stored = vec![[1u8; 32], [2u8; 32]];
3595 let stats = WaveAggregateStats {
3596 chunk_attempts_total: 7,
3597 ..Default::default()
3598 };
3599
3600 let outcome = fold_single_wave(Ok((stored.clone(), "100".to_string(), 9, stats))).unwrap();
3601
3602 assert_eq!(outcome.stored, stored);
3603 assert!(outcome.failed.is_empty());
3604 assert_eq!(outcome.storage_atto.to_string(), "100");
3605 assert_eq!(outcome.gas_wei, 9);
3606 assert_eq!(outcome.stats.chunk_attempts_total, 7);
3607 }
3608
3609 /// The core V2-461 semantic: a wave short of quorum (`PartialUpload`) is
3610 /// recoverable — its stored chunks, failed chunks, and on-chain spend are
3611 /// folded so the caller can continue to the next wave rather than aborting
3612 /// the whole file.
3613 #[test]
3614 fn fold_single_wave_folds_partial_upload() {
3615 let stored = vec![[3u8; 32]];
3616 let failed = vec![([4u8; 32], "short of quorum".to_string())];
3617 let err = Error::PartialUpload {
3618 stored: stored.clone(),
3619 stored_count: 1,
3620 failed: failed.clone(),
3621 failed_count: 1,
3622 total_chunks: 2,
3623 spend: Box::new(PartialUploadSpend {
3624 storage_cost_atto: "250".to_string(),
3625 gas_cost_wei: 11,
3626 }),
3627 reason: "wave store failed after retries".to_string(),
3628 };
3629
3630 let outcome = fold_single_wave(Err(err)).unwrap();
3631
3632 assert_eq!(outcome.stored, stored);
3633 assert_eq!(outcome.failed, failed);
3634 assert_eq!(outcome.storage_atto.to_string(), "250");
3635 assert_eq!(outcome.gas_wei, 11);
3636 // `PartialUpload` carries no stats, so the failed wave contributes none.
3637 assert_eq!(outcome.stats.chunk_attempts_total, 0);
3638 }
3639
3640 /// A non-`PartialUpload` error (wallet/payment-infrastructure failure) is
3641 /// fatal and must abort the file, not be folded into the failed set.
3642 #[test]
3643 fn fold_single_wave_propagates_fatal_error() {
3644 let result = fold_single_wave(Err(Error::Payment("wallet unavailable".to_string())));
3645
3646 assert!(
3647 matches!(result, Err(Error::Payment(_))),
3648 "fatal payment error must propagate, got: {result:?}"
3649 );
3650 }
3651
3652 #[test]
3653 fn partition_addresses_by_proof_handles_all_or_nothing() {
3654 let a = [5u8; 32];
3655 let b = [6u8; 32];
3656
3657 // No proofs at all → every address is missing.
3658 let empty: HashMap<[u8; 32], Vec<u8>> = HashMap::new();
3659 let (to_store, missing) = partition_addresses_by_proof(&[a, b], &empty);
3660 assert!(to_store.is_empty());
3661 assert_eq!(missing, vec![a, b]);
3662
3663 // All proofs present → nothing missing.
3664 let full: HashMap<[u8; 32], Vec<u8>> = HashMap::from([(a, vec![1]), (b, vec![2])]);
3665 let (to_store, missing) = partition_addresses_by_proof(&[a, b], &full);
3666 assert_eq!(to_store, vec![a, b]);
3667 assert!(missing.is_empty());
3668 }
3669
3670 #[test]
3671 fn chunk_spill_round_trip() {
3672 let mut spill = ChunkSpill::new().unwrap();
3673 let data1 = vec![0xAA; 1024];
3674 let data2 = vec![0xBB; 2048];
3675
3676 spill.push(&data1).unwrap();
3677 spill.push(&data2).unwrap();
3678
3679 assert_eq!(spill.len(), 2);
3680 assert_eq!(spill.total_bytes(), 1024 + 2048);
3681 let chunk_entries = spill.chunk_entries().unwrap();
3682 let entry_total: u64 = chunk_entries.iter().map(|(_, size)| *size).sum();
3683 assert_eq!(entry_total, 1024 + 2048);
3684
3685 // Read back and verify
3686 let chunk1 = spill.read_chunk(spill.addresses.first().unwrap()).unwrap();
3687 assert_eq!(&chunk1[..], &data1[..]);
3688
3689 let chunk2 = spill.read_chunk(spill.addresses.get(1).unwrap()).unwrap();
3690 assert_eq!(&chunk2[..], &data2[..]);
3691
3692 // Verify waves with 1-chunk wave size
3693 let waves: Vec<_> = spill.addresses.chunks(1).collect();
3694 assert_eq!(waves.len(), 2);
3695 }
3696
3697 #[test]
3698 fn chunk_spill_cleanup_on_drop() {
3699 let dir;
3700 {
3701 let spill = ChunkSpill::new().unwrap();
3702 dir = spill.dir.clone();
3703 assert!(dir.exists());
3704 }
3705 // After drop, the directory should be cleaned up
3706 assert!(!dir.exists(), "spill dir should be removed on drop");
3707 }
3708
3709 #[test]
3710 fn chunk_spill_deduplicates_identical_content() {
3711 let mut spill = ChunkSpill::new().unwrap();
3712 let data = vec![0xCC; 512];
3713
3714 spill.push(&data).unwrap();
3715 spill.push(&data).unwrap(); // same content, should be skipped
3716 spill.push(&data).unwrap(); // again
3717
3718 assert_eq!(spill.len(), 1, "duplicate chunks should be deduplicated");
3719 assert_eq!(
3720 spill.total_bytes(),
3721 512,
3722 "total_bytes should count unique only"
3723 );
3724
3725 // Different content should still be added
3726 let data2 = vec![0xDD; 256];
3727 spill.push(&data2).unwrap();
3728 assert_eq!(spill.len(), 2);
3729 assert_eq!(spill.total_bytes(), 512 + 256);
3730 }
3731}
3732
3733/// Compile-time assertions that Client file method futures are Send.
3734#[cfg(test)]
3735mod send_assertions {
3736 use super::*;
3737
3738 fn _assert_send<T: Send>(_: &T) {}
3739
3740 #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
3741 async fn _file_upload_is_send(client: &Client) {
3742 let fut = client.file_upload(Path::new("/dev/null"));
3743 _assert_send(&fut);
3744 }
3745
3746 #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
3747 async fn _file_upload_with_mode_is_send(client: &Client) {
3748 let fut = client.file_upload_with_mode(Path::new("/dev/null"), PaymentMode::Auto);
3749 _assert_send(&fut);
3750 }
3751
3752 #[allow(
3753 dead_code,
3754 unreachable_code,
3755 unused_variables,
3756 clippy::diverging_sub_expression
3757 )]
3758 async fn _file_download_is_send(client: &Client) {
3759 let dm: DataMap = todo!();
3760 let fut = client.file_download(&dm, Path::new("/dev/null"));
3761 _assert_send(&fut);
3762 }
3763}