1use crate::data::client::adaptive::observe_op;
14use crate::data::client::batch::{
15 finalize_batch_payment, PaymentIntent, PreparedChunk, WaveAggregateStats,
16};
17use crate::data::client::classify_error;
18use crate::data::client::merkle::{
19 finalize_merkle_batch, should_use_merkle, MerkleBatchPaymentResult, PaymentMode,
20 PreparedMerkleBatch,
21};
22use crate::data::client::Client;
23use crate::data::error::{Error, Result};
24use ant_protocol::evm::{Amount, PaymentQuote, QuoteHash, TxHash, MAX_LEAVES};
25use ant_protocol::transport::{MultiAddr, PeerId};
26use ant_protocol::{compute_address, DATA_TYPE_CHUNK};
27use bytes::Bytes;
28use fs2::FileExt;
29use futures::stream::{self, StreamExt};
30use self_encryption::{
31 get_root_data_map_parallel, stream_decrypt_batch_size, stream_encrypt,
32 streaming_decrypt_with_batch_size, DataMap,
33};
34use std::collections::{HashMap, HashSet};
35use std::io::Write;
36use std::path::{Path, PathBuf};
37use std::sync::{Arc, Mutex};
38use tokio::runtime::Handle;
39use tokio::sync::mpsc;
40use tracing::{debug, info, warn};
41use xor_name::XorName;
42
43#[derive(Debug, Clone)]
45pub enum UploadEvent {
46 Encrypting { chunks_done: usize },
48 Encrypted { total_chunks: usize },
50 QuotingChunks {
52 wave: usize,
53 total_waves: usize,
54 chunks_in_wave: usize,
55 },
56 ChunkQuoted { quoted: usize, total: usize },
59 ChunkStored { stored: usize, total: usize },
61 WaveComplete {
63 wave: usize,
64 total_waves: usize,
65 stored_so_far: usize,
66 total: usize,
67 },
68}
69
70#[derive(Debug, Clone)]
72pub enum DownloadEvent {
73 ResolvingDataMap { total_map_chunks: usize },
75 MapChunkFetched { fetched: usize },
77 DataMapResolved { total_chunks: usize },
79 ChunksFetched { fetched: usize, total: usize },
81}
82
83type QuoteEntry = (PeerId, Vec<MultiAddr>, PaymentQuote, Amount);
87
88const UPLOAD_WAVE_SIZE: usize = 64;
90
91const DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER: usize = 4;
95
96const DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR: u64 = 4;
98
99const DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER: u64 = 3;
103
104const ESTIMATE_SAMPLE_CAP: usize = 5;
111
112const GAS_PER_WAVE_TX: u128 = 1_500_000;
121
122const GAS_PER_MERKLE_TX: u128 = 500_000;
127
128const ARBITRUM_GAS_PRICE_WEI: u128 = 100_000_000;
136
137const DISK_SPACE_HEADROOM_PERCENT: u64 = 10;
143
144const SPILL_STALE_GRACE_SECS: u64 = 30;
161
162const SPILL_DIR_PREFIX: &str = "spill_";
164
165const SPILL_LOCK_NAME: &str = ".lock";
167
168struct ChunkSpill {
169 dir: PathBuf,
171 _lock: std::fs::File,
173 addresses: Vec<[u8; 32]>,
175 seen: HashSet<[u8; 32]>,
177 total_bytes: u64,
179}
180
181impl ChunkSpill {
182 fn spill_root() -> Result<PathBuf> {
184 use crate::config;
185 let root = config::data_dir()
186 .map_err(|e| Error::Config(format!("cannot determine data dir for spill: {e}")))?
187 .join("spill");
188 Ok(root)
189 }
190
191 fn new() -> Result<Self> {
197 let root = Self::spill_root()?;
198 std::fs::create_dir_all(&root)?;
199
200 Self::cleanup_stale(&root);
202
203 let now = std::time::SystemTime::now()
204 .duration_since(std::time::UNIX_EPOCH)
205 .unwrap_or_default()
206 .as_secs();
207 let unique: u64 = rand::random();
208 let dir = root.join(format!("{SPILL_DIR_PREFIX}{now}_{unique}"));
209 std::fs::create_dir(&dir)?;
210
211 let lock_path = dir.join(SPILL_LOCK_NAME);
214 let lock_file = std::fs::File::create(&lock_path).map_err(|e| {
215 Error::Io(std::io::Error::new(
216 e.kind(),
217 format!("failed to create spill lockfile: {e}"),
218 ))
219 })?;
220 lock_file.try_lock_exclusive().map_err(|e| {
221 Error::Io(std::io::Error::new(
222 e.kind(),
223 format!("failed to lock spill lockfile: {e}"),
224 ))
225 })?;
226
227 Ok(Self {
228 dir,
229 _lock: lock_file,
230 addresses: Vec::new(),
231 seen: HashSet::new(),
232 total_bytes: 0,
233 })
234 }
235
236 fn cleanup_stale(root: &Path) {
251 let now = std::time::SystemTime::now()
252 .duration_since(std::time::UNIX_EPOCH)
253 .unwrap_or_default()
254 .as_secs();
255
256 if now == 0 {
257 warn!("System clock before Unix epoch, skipping spill cleanup");
260 return;
261 }
262
263 let entries = match std::fs::read_dir(root) {
264 Ok(entries) => entries,
265 Err(_) => return,
266 };
267
268 for entry in entries.flatten() {
269 let name = entry.file_name();
270 let name_str = name.to_string_lossy();
271
272 let suffix = match name_str.strip_prefix(SPILL_DIR_PREFIX) {
274 Some(s) => s,
275 None => continue,
276 };
277
278 let timestamp: u64 = match suffix.split('_').next().and_then(|s| s.parse().ok()) {
280 Some(ts) => ts,
281 None => continue,
282 };
283
284 if now.saturating_sub(timestamp) < SPILL_STALE_GRACE_SECS {
285 continue;
286 }
287
288 let file_type = match entry.file_type() {
290 Ok(ft) => ft,
291 Err(_) => continue,
292 };
293 if !file_type.is_dir() {
294 continue;
295 }
296
297 let path = entry.path();
298
299 let lock_path = path.join(SPILL_LOCK_NAME);
301 if let Ok(lock_file) = std::fs::File::open(&lock_path) {
302 use fs2::FileExt;
303 if lock_file.try_lock_exclusive().is_err() {
304 debug!("Skipping active spill dir: {}", path.display());
306 continue;
307 }
308 drop(lock_file);
311 }
312
313 info!("Cleaning up stale spill dir: {}", path.display());
314 if let Err(e) = std::fs::remove_dir_all(&path) {
315 warn!("Failed to clean up stale spill dir {}: {e}", path.display());
316 }
317 }
318 }
319
320 #[allow(dead_code)]
322 pub(crate) fn run_cleanup() {
323 if let Ok(root) = Self::spill_root() {
324 Self::cleanup_stale(&root);
325 }
326 }
327
328 fn push(&mut self, content: &[u8]) -> Result<()> {
334 let address = compute_address(content);
335 if !self.seen.insert(address) {
336 return Ok(());
337 }
338 let path = self.dir.join(hex::encode(address));
339 std::fs::write(&path, content)?;
340 self.total_bytes += content.len() as u64;
341 self.addresses.push(address);
342 Ok(())
343 }
344
345 fn len(&self) -> usize {
347 self.addresses.len()
348 }
349
350 fn total_bytes(&self) -> u64 {
352 self.total_bytes
353 }
354
355 fn avg_chunk_size(&self) -> u64 {
357 if self.addresses.is_empty() {
358 return 0;
359 }
360 self.total_bytes / self.addresses.len() as u64
361 }
362
363 fn read_chunk(&self, address: &[u8; 32]) -> Result<Bytes> {
365 let path = self.dir.join(hex::encode(address));
366 let data = std::fs::read(&path).map_err(|e| {
367 Error::Io(std::io::Error::new(
368 e.kind(),
369 format!("reading spilled chunk {}: {e}", hex::encode(address)),
370 ))
371 })?;
372 Ok(Bytes::from(data))
373 }
374
375 fn waves(&self) -> std::slice::Chunks<'_, [u8; 32]> {
377 self.addresses.chunks(UPLOAD_WAVE_SIZE)
378 }
379
380 fn read_wave(&self, wave_addrs: &[[u8; 32]]) -> Result<Vec<(Bytes, [u8; 32])>> {
382 let mut out = Vec::with_capacity(wave_addrs.len());
383 for addr in wave_addrs {
384 let content = self.read_chunk(addr)?;
385 out.push((content, *addr));
386 }
387 Ok(out)
388 }
389
390 fn cleanup(&self) {
392 if let Err(e) = std::fs::remove_dir_all(&self.dir) {
393 warn!(
394 "Failed to clean up chunk spill dir {}: {e}",
395 self.dir.display()
396 );
397 }
398 }
399}
400
401impl Drop for ChunkSpill {
402 fn drop(&mut self) {
403 self.cleanup();
404 }
405}
406
407fn check_disk_space_for_spill(file_size: u64) -> Result<()> {
412 let spill_root = ChunkSpill::spill_root()?;
413
414 std::fs::create_dir_all(&spill_root)?;
416
417 let available = fs2::available_space(&spill_root).map_err(|e| {
418 Error::Io(std::io::Error::new(
419 e.kind(),
420 format!(
421 "failed to query disk space on {}: {e}",
422 spill_root.display()
423 ),
424 ))
425 })?;
426
427 let headroom = file_size / DISK_SPACE_HEADROOM_PERCENT;
429 let required = file_size.saturating_add(headroom);
430
431 if available < required {
432 let avail_mb = available / (1024 * 1024);
433 let req_mb = required / (1024 * 1024);
434 return Err(Error::InsufficientDiskSpace(format!(
435 "need ~{req_mb} MB in spill dir ({}) but only {avail_mb} MB available",
436 spill_root.display()
437 )));
438 }
439
440 debug!(
441 "Disk space check passed: {available} bytes available, {required} bytes required (spill: {})",
442 spill_root.display()
443 );
444 Ok(())
445}
446
447fn usable_memory_bytes() -> Option<u64> {
448 let mut system = sysinfo::System::new();
449 system.refresh_memory();
450
451 let available_memory = system.available_memory();
452 let free_memory = system.free_memory();
453 let used_memory = system.used_memory();
454 let total_memory = system.total_memory();
455 let unused_memory = total_memory.saturating_sub(used_memory);
456
457 let mut usable = [available_memory, free_memory, unused_memory]
458 .into_iter()
459 .filter(|bytes| *bytes > 0)
460 .max();
461
462 let cgroup_free_memory = system
463 .cgroup_limits()
464 .filter(|limits| limits.total_memory > 0)
465 .map(|limits| limits.free_memory);
466 if let Some(cgroup_free_memory) = cgroup_free_memory {
467 usable = Some(usable.unwrap_or(u64::MAX).min(cgroup_free_memory));
468 }
469
470 debug!(
471 available_memory,
472 free_memory,
473 used_memory,
474 total_memory,
475 cgroup_free_memory,
476 usable_memory = ?usable,
477 "Detected usable memory for stream decrypt batch sizing"
478 );
479
480 usable
481}
482
483fn stream_decrypt_batch_memory_cap(usable_memory_bytes: u64) -> usize {
484 let budget = usable_memory_bytes / DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR;
485 let estimated_bytes_per_chunk = (self_encryption::MAX_CHUNK_SIZE as u64)
486 .saturating_mul(DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER)
487 .max(1);
488 let cap = (budget / estimated_bytes_per_chunk).max(1);
489
490 usize::try_from(cap).unwrap_or(usize::MAX)
491}
492
493fn adaptive_stream_decrypt_batch_size(
494 total_chunks: usize,
495 fetch_cap: usize,
496 configured_batch_floor: usize,
497 usable_memory_bytes: Option<u64>,
498) -> usize {
499 let fetch_target = fetch_cap
500 .max(1)
501 .saturating_mul(DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER);
502 let requested = match usable_memory_bytes {
503 Some(bytes) => {
504 let memory_cap = stream_decrypt_batch_memory_cap(bytes);
505 configured_batch_floor
506 .max(fetch_target)
507 .max(1)
508 .min(memory_cap)
509 }
510 None => configured_batch_floor.max(1),
511 };
512
513 requested.min(total_chunks.max(1)).max(1)
514}
515
516#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
524pub enum Visibility {
525 #[default]
527 Private,
528 Public,
531}
532
533#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
536pub struct UploadCostEstimate {
537 pub file_size: u64,
539 pub chunk_count: usize,
542 pub storage_cost_atto: String,
544 pub estimated_gas_cost_wei: String,
547 pub payment_mode: PaymentMode,
549}
550
551#[derive(Debug, Clone)]
557#[non_exhaustive]
558pub struct FileUploadResult {
559 pub data_map: DataMap,
561 pub chunks_stored: usize,
563 pub chunks_failed: usize,
567 pub total_chunks: usize,
570 pub payment_mode_used: PaymentMode,
572 pub storage_cost_atto: String,
574 pub gas_cost_wei: u128,
576 pub data_map_address: Option<[u8; 32]>,
584 pub chunk_attempts_total: usize,
588 pub store_durations_ms: Vec<u64>,
591 pub retries_histogram: [usize; 4],
595}
596
597#[derive(Debug)]
599pub enum ExternalPaymentInfo {
600 WaveBatch {
602 prepared_chunks: Vec<PreparedChunk>,
604 payment_intent: PaymentIntent,
606 },
607 Merkle {
609 prepared_batch: PreparedMerkleBatch,
611 chunk_contents: Vec<Bytes>,
613 chunk_addresses: Vec<[u8; 32]>,
615 },
616}
617
618#[derive(Debug)]
631#[non_exhaustive]
632pub struct PreparedUpload {
633 pub data_map: DataMap,
635 pub payment_info: ExternalPaymentInfo,
637 pub data_map_address: Option<[u8; 32]>,
644}
645
646type EncryptionChannels = (
648 tokio::sync::mpsc::Receiver<Bytes>,
649 tokio::sync::oneshot::Receiver<DataMap>,
650 tokio::task::JoinHandle<Result<()>>,
651);
652
653fn spawn_file_encryption(path: PathBuf) -> Result<EncryptionChannels> {
655 let metadata = std::fs::metadata(&path)?;
656 let data_size = usize::try_from(metadata.len())
657 .map_err(|e| Error::Encryption(format!("file size exceeds platform usize: {e}")))?;
658
659 let (chunk_tx, chunk_rx) = tokio::sync::mpsc::channel(2);
660 let (datamap_tx, datamap_rx) = tokio::sync::oneshot::channel();
661
662 let handle = tokio::task::spawn_blocking(move || {
663 let file = std::fs::File::open(&path)?;
664 let mut reader = std::io::BufReader::new(file);
665
666 let read_error: Arc<Mutex<Option<std::io::Error>>> = Arc::new(Mutex::new(None));
667 let read_error_clone = Arc::clone(&read_error);
668
669 let data_iter = std::iter::from_fn(move || {
670 let mut buffer = vec![0u8; 8192];
671 match std::io::Read::read(&mut reader, &mut buffer) {
672 Ok(0) => None,
673 Ok(n) => {
674 buffer.truncate(n);
675 Some(Bytes::from(buffer))
676 }
677 Err(e) => {
678 let mut guard = read_error_clone
679 .lock()
680 .unwrap_or_else(|poisoned| poisoned.into_inner());
681 *guard = Some(e);
682 None
683 }
684 }
685 });
686
687 let mut stream = stream_encrypt(data_size, data_iter)
688 .map_err(|e| Error::Encryption(format!("stream_encrypt failed: {e}")))?;
689
690 for chunk_result in stream.chunks() {
691 {
696 let guard = read_error
697 .lock()
698 .unwrap_or_else(|poisoned| poisoned.into_inner());
699 if let Some(ref e) = *guard {
700 return Err(Error::Io(std::io::Error::new(e.kind(), e.to_string())));
701 }
702 }
703
704 let (_hash, content) = chunk_result
705 .map_err(|e| Error::Encryption(format!("chunk encryption failed: {e}")))?;
706 if chunk_tx.blocking_send(content).is_err() {
707 return Err(Error::Encryption("upload receiver dropped".to_string()));
708 }
709 }
710
711 {
713 let guard = read_error
714 .lock()
715 .unwrap_or_else(|poisoned| poisoned.into_inner());
716 if let Some(ref e) = *guard {
717 return Err(Error::Io(std::io::Error::new(e.kind(), e.to_string())));
718 }
719 }
720
721 let datamap = stream
722 .into_datamap()
723 .ok_or_else(|| Error::Encryption("no DataMap after encryption".to_string()))?;
724 if datamap_tx.send(datamap).is_err() {
725 warn!("DataMap receiver dropped — upload may have been cancelled");
726 }
727 Ok(())
728 });
729
730 Ok((chunk_rx, datamap_rx, handle))
731}
732
733impl Client {
734 pub async fn file_upload(&self, path: &Path) -> Result<FileUploadResult> {
745 self.file_upload_with_mode(path, PaymentMode::Auto).await
746 }
747
748 pub async fn estimate_upload_cost(
776 &self,
777 path: &Path,
778 mode: PaymentMode,
779 progress: Option<mpsc::Sender<UploadEvent>>,
780 ) -> Result<UploadCostEstimate> {
781 let file_size = std::fs::metadata(path).map_err(Error::Io)?.len();
782
783 if file_size < 3 {
784 return Err(Error::InvalidData(
785 "File too small: self-encryption requires at least 3 bytes".into(),
786 ));
787 }
788
789 check_disk_space_for_spill(file_size)?;
790
791 info!(
792 "Estimating upload cost for {} ({file_size} bytes)",
793 path.display()
794 );
795
796 let (spill, _data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
797 let chunk_count = spill.len();
798
799 if let Some(ref tx) = progress {
800 let _ = tx
801 .send(UploadEvent::Encrypted {
802 total_chunks: chunk_count,
803 })
804 .await;
805 }
806
807 info!("Encrypted into {chunk_count} chunks, requesting quote");
808
809 let sample_limit = spill.addresses.len().min(ESTIMATE_SAMPLE_CAP);
815 let mut sampled = 0usize;
816 let mut all_already_stored = true;
817 let mut quotes_opt: Option<Vec<QuoteEntry>> = None;
818
819 for addr in spill.addresses.iter().take(sample_limit) {
820 sampled += 1;
821 let chunk_bytes = spill.read_chunk(addr)?;
822 let data_size = u64::try_from(chunk_bytes.len())
823 .map_err(|e| Error::InvalidData(format!("chunk size too large: {e}")))?;
824 match self
825 .get_store_quotes(addr, data_size, DATA_TYPE_CHUNK)
826 .await
827 {
828 Ok(q) => {
829 quotes_opt = Some(q);
830 all_already_stored = false;
831 break;
832 }
833 Err(Error::AlreadyStored) => {
834 debug!(
835 "Sample chunk {} already stored; trying next address ({sampled}/{sample_limit})",
836 hex::encode(addr)
837 );
838 continue;
839 }
840 Err(e) => return Err(e),
841 }
842 }
843
844 let uses_merkle = should_use_merkle(chunk_count, mode);
845
846 let quotes = match quotes_opt {
847 Some(q) => q,
848 None if all_already_stored && sampled == chunk_count => {
849 info!("All {chunk_count} chunks already stored; returning zero-cost estimate");
853 return Ok(UploadCostEstimate {
854 file_size,
855 chunk_count,
856 storage_cost_atto: "0".into(),
857 estimated_gas_cost_wei: "0".into(),
858 payment_mode: if uses_merkle {
859 PaymentMode::Merkle
860 } else {
861 PaymentMode::Single
862 },
863 });
864 }
865 None => {
866 return Err(Error::CostEstimationInconclusive(format!(
867 "sampled {sampled} chunk addresses out of {chunk_count} and every \
868 one reported AlreadyStored; cannot infer a representative price \
869 for the remaining chunks"
870 )));
871 }
872 };
873
874 let mut prices: Vec<Amount> = quotes.iter().map(|(_, _, _, price)| *price).collect();
877 prices.sort();
878 let median_price = prices
879 .get(prices.len() / 2)
880 .copied()
881 .unwrap_or(Amount::ZERO);
882 let per_chunk_cost = median_price * Amount::from(3u64);
883
884 let chunk_count_u64 = u64::try_from(chunk_count).unwrap_or(u64::MAX);
885 let total_storage = per_chunk_cost * Amount::from(chunk_count_u64);
886
887 let waves = u128::try_from(chunk_count.div_ceil(UPLOAD_WAVE_SIZE)).unwrap_or(u128::MAX);
903 let merkle_batches = u128::try_from(chunk_count.div_ceil(MAX_LEAVES)).unwrap_or(u128::MAX);
904 let estimated_gas: u128 = if uses_merkle {
905 merkle_batches
906 .saturating_mul(GAS_PER_MERKLE_TX)
907 .saturating_mul(ARBITRUM_GAS_PRICE_WEI)
908 } else {
909 waves
910 .saturating_mul(GAS_PER_WAVE_TX)
911 .saturating_mul(ARBITRUM_GAS_PRICE_WEI)
912 };
913
914 info!(
915 "Estimate: {chunk_count} chunks, storage={total_storage} atto, gas~={estimated_gas} wei"
916 );
917
918 Ok(UploadCostEstimate {
919 file_size,
920 chunk_count,
921 storage_cost_atto: total_storage.to_string(),
922 estimated_gas_cost_wei: estimated_gas.to_string(),
923 payment_mode: if uses_merkle {
924 PaymentMode::Merkle
925 } else {
926 PaymentMode::Single
927 },
928 })
929 }
930
931 pub async fn file_prepare_upload(&self, path: &Path) -> Result<PreparedUpload> {
936 self.file_prepare_upload_with_progress(path, Visibility::Private, None)
937 .await
938 }
939
940 pub async fn file_prepare_upload_with_visibility(
945 &self,
946 path: &Path,
947 visibility: Visibility,
948 ) -> Result<PreparedUpload> {
949 self.file_prepare_upload_with_progress(path, visibility, None)
950 .await
951 }
952
953 pub async fn file_prepare_upload_with_progress(
985 &self,
986 path: &Path,
987 visibility: Visibility,
988 progress: Option<mpsc::Sender<UploadEvent>>,
989 ) -> Result<PreparedUpload> {
990 debug!(
991 "Preparing file upload for external signing (visibility={visibility:?}): {}",
992 path.display()
993 );
994
995 let file_size = std::fs::metadata(path)?.len();
996 check_disk_space_for_spill(file_size)?;
997
998 let (spill, data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
999
1000 info!(
1001 "Encrypted {} into {} chunks for external signing (spilled to disk)",
1002 path.display(),
1003 spill.len()
1004 );
1005
1006 let mut chunk_data: Vec<Bytes> = spill
1010 .addresses
1011 .iter()
1012 .map(|addr| spill.read_chunk(addr))
1013 .collect::<std::result::Result<Vec<_>, _>>()?;
1014
1015 let data_map_address = match visibility {
1021 Visibility::Private => None,
1022 Visibility::Public => {
1023 let serialized = rmp_serde::to_vec(&data_map).map_err(|e| {
1024 Error::Serialization(format!("Failed to serialize DataMap: {e}"))
1025 })?;
1026 let bytes = Bytes::from(serialized);
1027 let address = compute_address(&bytes);
1028 info!(
1029 "Public upload: bundling DataMap chunk ({} bytes) at address {}",
1030 bytes.len(),
1031 hex::encode(address)
1032 );
1033 chunk_data.push(bytes);
1034 Some(address)
1035 }
1036 };
1037
1038 let chunk_count = chunk_data.len();
1039
1040 if let Some(ref tx) = progress {
1041 let _ = tx
1042 .send(UploadEvent::Encrypted {
1043 total_chunks: chunk_count,
1044 })
1045 .await;
1046 }
1047
1048 let payment_info = if should_use_merkle(chunk_count, PaymentMode::Auto) {
1049 info!("Using merkle batch preparation for {chunk_count} file chunks");
1051
1052 let addresses: Vec<[u8; 32]> = chunk_data.iter().map(|c| compute_address(c)).collect();
1053
1054 let avg_size =
1055 chunk_data.iter().map(bytes::Bytes::len).sum::<usize>() / chunk_count.max(1);
1056 let avg_size_u64 = u64::try_from(avg_size).unwrap_or(0);
1057
1058 let prepared_batch = self
1059 .prepare_merkle_batch_external(&addresses, DATA_TYPE_CHUNK, avg_size_u64)
1060 .await?;
1061
1062 info!(
1063 "File prepared for external merkle signing: {} chunks, depth={} ({})",
1064 chunk_count,
1065 prepared_batch.depth,
1066 path.display()
1067 );
1068
1069 ExternalPaymentInfo::Merkle {
1070 prepared_batch,
1071 chunk_contents: chunk_data,
1072 chunk_addresses: addresses,
1073 }
1074 } else {
1075 let quote_limiter = self.controller().quote.clone();
1081 let quote_concurrency = quote_limiter.current().min(chunk_count.max(1));
1082 let mut quote_stream = stream::iter(chunk_data)
1083 .map(|content| {
1084 let limiter = quote_limiter.clone();
1085 async move {
1086 observe_op(
1087 &limiter,
1088 || async move { self.prepare_chunk_payment(content).await },
1089 classify_error,
1090 )
1091 .await
1092 }
1093 })
1094 .buffer_unordered(quote_concurrency);
1095
1096 let mut prepared_chunks = Vec::with_capacity(spill.len());
1097 let mut quoted = 0usize;
1098 while let Some(result) = quote_stream.next().await {
1099 if let Some(prepared) = result? {
1100 prepared_chunks.push(prepared);
1101 }
1102 quoted += 1;
1103 if let Some(ref tx) = progress {
1104 let _ = tx.try_send(UploadEvent::ChunkQuoted {
1105 quoted,
1106 total: chunk_count,
1107 });
1108 }
1109 }
1110
1111 if let Some(addr) = data_map_address {
1117 if !prepared_chunks.iter().any(|c| c.address == addr) {
1118 info!(
1119 "Public upload: DataMap chunk {} was already stored \
1120 on the network — address is retrievable without a \
1121 new payment",
1122 hex::encode(addr)
1123 );
1124 }
1125 }
1126
1127 let payment_intent = PaymentIntent::from_prepared_chunks(&prepared_chunks);
1128
1129 info!(
1130 "File prepared for external signing: {} chunks, total {} atto ({})",
1131 prepared_chunks.len(),
1132 payment_intent.total_amount,
1133 path.display()
1134 );
1135
1136 ExternalPaymentInfo::WaveBatch {
1137 prepared_chunks,
1138 payment_intent,
1139 }
1140 };
1141
1142 Ok(PreparedUpload {
1143 data_map,
1144 payment_info,
1145 data_map_address,
1146 })
1147 }
1148
1149 pub async fn finalize_upload(
1161 &self,
1162 prepared: PreparedUpload,
1163 tx_hash_map: &HashMap<QuoteHash, TxHash>,
1164 ) -> Result<FileUploadResult> {
1165 self.finalize_upload_with_progress(prepared, tx_hash_map, None)
1166 .await
1167 }
1168
1169 pub async fn finalize_upload_with_progress(
1178 &self,
1179 prepared: PreparedUpload,
1180 tx_hash_map: &HashMap<QuoteHash, TxHash>,
1181 progress: Option<mpsc::Sender<UploadEvent>>,
1182 ) -> Result<FileUploadResult> {
1183 let data_map_address = prepared.data_map_address;
1184 match prepared.payment_info {
1185 ExternalPaymentInfo::WaveBatch {
1186 prepared_chunks,
1187 payment_intent: _,
1188 } => {
1189 let total_chunks = prepared_chunks.len();
1190 let paid_chunks = finalize_batch_payment(prepared_chunks, tx_hash_map)?;
1191 let wave_result = self
1192 .store_paid_chunks_with_events(paid_chunks, progress.as_ref(), 0, total_chunks)
1193 .await;
1194 if !wave_result.failed.is_empty() {
1195 let failed_count = wave_result.failed.len();
1196 let stored_count = wave_result.stored.len();
1197 return Err(Error::PartialUpload {
1198 stored: wave_result.stored.clone(),
1199 stored_count,
1200 failed: wave_result.failed,
1201 failed_count,
1202 total_chunks: stored_count + failed_count,
1203 reason: "finalize_upload: chunk storage failed after retries".into(),
1204 });
1205 }
1206 let chunks_stored = wave_result.stored.len();
1207
1208 info!("External-signer upload finalized: {chunks_stored} chunks stored");
1209
1210 let mut stats = WaveAggregateStats::default();
1211 stats.absorb(&wave_result);
1212
1213 Ok(FileUploadResult {
1214 data_map: prepared.data_map,
1215 chunks_stored,
1216 chunks_failed: 0,
1217 total_chunks: chunks_stored,
1218 payment_mode_used: PaymentMode::Single,
1219 storage_cost_atto: "0".into(),
1220 gas_cost_wei: 0,
1221 data_map_address,
1222 chunk_attempts_total: stats.chunk_attempts_total,
1223 store_durations_ms: stats.store_durations_ms,
1224 retries_histogram: stats.retries_histogram,
1225 })
1226 }
1227 ExternalPaymentInfo::Merkle { .. } => Err(Error::Payment(
1228 "Cannot finalize merkle upload with wave-batch tx hashes. \
1229 Use finalize_upload_merkle() instead."
1230 .to_string(),
1231 )),
1232 }
1233 }
1234
1235 pub async fn finalize_upload_merkle(
1247 &self,
1248 prepared: PreparedUpload,
1249 winner_pool_hash: [u8; 32],
1250 ) -> Result<FileUploadResult> {
1251 self.finalize_upload_merkle_with_progress(prepared, winner_pool_hash, None)
1252 .await
1253 }
1254
1255 pub async fn finalize_upload_merkle_with_progress(
1264 &self,
1265 prepared: PreparedUpload,
1266 winner_pool_hash: [u8; 32],
1267 progress: Option<mpsc::Sender<UploadEvent>>,
1268 ) -> Result<FileUploadResult> {
1269 let data_map_address = prepared.data_map_address;
1270 match prepared.payment_info {
1271 ExternalPaymentInfo::Merkle {
1272 prepared_batch,
1273 chunk_contents,
1274 chunk_addresses,
1275 } => {
1276 let batch_result = finalize_merkle_batch(prepared_batch, winner_pool_hash)?;
1277 let (chunks_stored, stats) = self
1278 .merkle_upload_chunks(
1279 chunk_contents,
1280 chunk_addresses,
1281 &batch_result,
1282 progress.as_ref(),
1283 )
1284 .await?;
1285
1286 info!("External-signer merkle upload finalized: {chunks_stored} chunks stored");
1287
1288 Ok(FileUploadResult {
1289 data_map: prepared.data_map,
1290 chunks_stored,
1291 chunks_failed: 0,
1292 total_chunks: chunks_stored,
1293 payment_mode_used: PaymentMode::Merkle,
1294 storage_cost_atto: "0".into(),
1295 gas_cost_wei: 0,
1296 data_map_address,
1297 chunk_attempts_total: stats.chunk_attempts_total,
1298 store_durations_ms: stats.store_durations_ms,
1299 retries_histogram: stats.retries_histogram,
1300 })
1301 }
1302 ExternalPaymentInfo::WaveBatch { .. } => Err(Error::Payment(
1303 "Cannot finalize wave-batch upload with merkle winner hash. \
1304 Use finalize_upload() instead."
1305 .to_string(),
1306 )),
1307 }
1308 }
1309
1310 #[allow(clippy::too_many_lines)]
1324 pub async fn file_upload_with_mode(
1325 &self,
1326 path: &Path,
1327 mode: PaymentMode,
1328 ) -> Result<FileUploadResult> {
1329 self.file_upload_with_progress(path, mode, None).await
1330 }
1331
1332 #[allow(clippy::too_many_lines)]
1337 pub async fn file_upload_with_progress(
1338 &self,
1339 path: &Path,
1340 mode: PaymentMode,
1341 progress: Option<mpsc::Sender<UploadEvent>>,
1342 ) -> Result<FileUploadResult> {
1343 debug!(
1344 "Streaming file upload with mode {mode:?}: {}",
1345 path.display()
1346 );
1347
1348 let file_size = std::fs::metadata(path)?.len();
1350 check_disk_space_for_spill(file_size)?;
1351
1352 let (spill, data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
1355
1356 let chunk_count = spill.len();
1357 info!(
1358 "Encrypted {} into {chunk_count} chunks (spilled to disk)",
1359 path.display()
1360 );
1361 if let Some(ref tx) = progress {
1362 let _ = tx
1363 .send(UploadEvent::Encrypted {
1364 total_chunks: chunk_count,
1365 })
1366 .await;
1367 }
1368
1369 let file_path_key = std::fs::canonicalize(path)
1382 .map(|p| p.display().to_string())
1383 .unwrap_or_else(|_| path.display().to_string());
1384 let (chunks_stored, actual_mode, storage_cost_atto, gas_cost_wei, stats) = if self
1385 .should_use_merkle(chunk_count, mode)
1386 {
1387 info!("Using merkle batch payment for {chunk_count} file chunks");
1388
1389 let batch_result = if let Some((_cache_path, cached)) =
1390 crate::data::client::cached_merkle::try_load_for_file(&file_path_key)
1391 {
1392 let addresses_match = spill
1398 .addresses
1399 .iter()
1400 .all(|addr| cached.proofs.contains_key(addr));
1401 if addresses_match && cached.proofs.len() == chunk_count {
1402 info!(
1403 "Skipping merkle payment phase; resuming with \
1404 cached proofs ({} chunks)",
1405 cached.proofs.len()
1406 );
1407 Ok(cached)
1408 } else {
1409 info!(
1410 "Cached merkle receipt does not match current file \
1411 content (cached={}, file={chunk_count}). \
1412 Discarding cache and paying fresh.",
1413 cached.proofs.len()
1414 );
1415 crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
1416 self.pay_for_merkle_batch(
1417 &spill.addresses,
1418 DATA_TYPE_CHUNK,
1419 spill.avg_chunk_size(),
1420 )
1421 .await
1422 .inspect(|result| {
1423 crate::data::client::cached_merkle::try_save(&file_path_key, result);
1424 })
1425 }
1426 } else {
1427 self.pay_for_merkle_batch(&spill.addresses, DATA_TYPE_CHUNK, spill.avg_chunk_size())
1428 .await
1429 .inspect(|result| {
1430 crate::data::client::cached_merkle::try_save(&file_path_key, result);
1433 })
1434 };
1435
1436 let batch_result = match batch_result {
1437 Ok(result) => result,
1438 Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => {
1439 info!("Merkle needs more peers ({msg}), falling back to wave-batch");
1440 let (stored, sc, gc, fb_stats) = self
1441 .upload_waves_single(&spill, progress.as_ref(), Some(&file_path_key))
1442 .await?;
1443 crate::data::client::cached_single::try_delete_for_file(&file_path_key);
1447 return Ok(FileUploadResult {
1448 data_map,
1449 chunks_stored: stored,
1450 chunks_failed: 0,
1451 total_chunks: chunk_count,
1452 payment_mode_used: PaymentMode::Single,
1453 storage_cost_atto: sc,
1454 gas_cost_wei: gc,
1455 data_map_address: None,
1456 chunk_attempts_total: fb_stats.chunk_attempts_total,
1457 store_durations_ms: fb_stats.store_durations_ms,
1458 retries_histogram: fb_stats.retries_histogram,
1459 });
1460 }
1461 Err(e) => return Err(e),
1462 };
1463
1464 let (stored, sc, gc, stats) = self
1465 .upload_waves_merkle(&spill, &batch_result, progress.as_ref())
1466 .await?;
1467 crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
1470 (stored, PaymentMode::Merkle, sc, gc, stats)
1471 } else {
1472 let (stored, sc, gc, stats) = self
1473 .upload_waves_single(&spill, progress.as_ref(), Some(&file_path_key))
1474 .await?;
1475 crate::data::client::cached_single::try_delete_for_file(&file_path_key);
1477 (stored, PaymentMode::Single, sc, gc, stats)
1478 };
1479
1480 info!(
1481 "File uploaded with {actual_mode:?}: {chunks_stored} chunks stored ({})",
1482 path.display()
1483 );
1484
1485 Ok(FileUploadResult {
1486 data_map,
1487 chunks_stored,
1488 chunks_failed: 0,
1489 total_chunks: chunk_count,
1490 payment_mode_used: actual_mode,
1491 storage_cost_atto,
1492 gas_cost_wei,
1493 data_map_address: None,
1494 chunk_attempts_total: stats.chunk_attempts_total,
1495 store_durations_ms: stats.store_durations_ms,
1496 retries_histogram: stats.retries_histogram,
1497 })
1498 }
1499
1500 async fn encrypt_file_to_spill(
1507 &self,
1508 path: &Path,
1509 progress: Option<&mpsc::Sender<UploadEvent>>,
1510 ) -> Result<(ChunkSpill, DataMap)> {
1511 let (mut chunk_rx, datamap_rx, handle) = spawn_file_encryption(path.to_path_buf())?;
1512
1513 let mut spill = ChunkSpill::new()?;
1514 while let Some(content) = chunk_rx.recv().await {
1515 spill.push(&content)?;
1516 let chunks_done = spill.len();
1517 if let Some(tx) = progress {
1518 if chunks_done.is_multiple_of(10) {
1519 let _ = tx.send(UploadEvent::Encrypting { chunks_done }).await;
1520 }
1521 }
1522 if chunks_done % 100 == 0 {
1523 let mb = spill.total_bytes() / (1024 * 1024);
1524 info!(
1525 "Encryption progress: {chunks_done} chunks spilled ({mb} MB) — {}",
1526 path.display()
1527 );
1528 }
1529 }
1530
1531 handle
1533 .await
1534 .map_err(|e| Error::Encryption(format!("encryption task panicked: {e}")))?
1535 .map_err(|e| Error::Encryption(format!("encryption failed: {e}")))?;
1536
1537 let data_map = datamap_rx
1538 .await
1539 .map_err(|_| Error::Encryption("no DataMap from encryption thread".to_string()))?;
1540
1541 Ok((spill, data_map))
1542 }
1543
1544 async fn upload_waves_single(
1551 &self,
1552 spill: &ChunkSpill,
1553 progress: Option<&mpsc::Sender<UploadEvent>>,
1554 resume_key: Option<&str>,
1555 ) -> Result<(usize, String, u128, WaveAggregateStats)> {
1556 let mut total_stored = 0usize;
1557 let mut total_storage = Amount::ZERO;
1558 let mut total_gas: u128 = 0;
1559 let mut agg_stats = WaveAggregateStats::default();
1560 let total_chunks = spill.len();
1561 let waves: Vec<&[[u8; 32]]> = spill.waves().collect();
1562 let wave_count = waves.len();
1563
1564 for (wave_idx, wave_addrs) in waves.into_iter().enumerate() {
1565 let wave_num = wave_idx + 1;
1566 let wave_data: Vec<Bytes> = wave_addrs
1567 .iter()
1568 .map(|addr| spill.read_chunk(addr))
1569 .collect::<Result<Vec<_>>>()?;
1570
1571 info!(
1572 "Wave {wave_num}/{wave_count}: quoting {} chunks — {total_stored}/{total_chunks} stored so far",
1573 wave_data.len()
1574 );
1575 if let Some(tx) = progress {
1576 let _ = tx
1577 .send(UploadEvent::QuotingChunks {
1578 wave: wave_num,
1579 total_waves: wave_count,
1580 chunks_in_wave: wave_data.len(),
1581 })
1582 .await;
1583 }
1584 let (addresses, wave_storage, wave_gas, wave_stats) = self
1585 .batch_upload_chunks_with_events(
1586 wave_data,
1587 progress,
1588 total_stored,
1589 total_chunks,
1590 resume_key,
1591 )
1592 .await?;
1593 total_stored += addresses.len();
1594 if let Ok(cost) = wave_storage.parse::<Amount>() {
1595 total_storage += cost;
1596 }
1597 total_gas = total_gas.saturating_add(wave_gas);
1598 agg_stats.chunk_attempts_total = agg_stats
1601 .chunk_attempts_total
1602 .saturating_add(wave_stats.chunk_attempts_total);
1603 agg_stats
1604 .store_durations_ms
1605 .extend(wave_stats.store_durations_ms);
1606 for (slot, count) in agg_stats
1607 .retries_histogram
1608 .iter_mut()
1609 .zip(wave_stats.retries_histogram.iter())
1610 {
1611 *slot = slot.saturating_add(*count);
1612 }
1613 if let Some(tx) = progress {
1614 let _ = tx
1615 .send(UploadEvent::WaveComplete {
1616 wave: wave_num,
1617 total_waves: wave_count,
1618 stored_so_far: total_stored,
1619 total: total_chunks,
1620 })
1621 .await;
1622 }
1623 }
1624
1625 Ok((
1626 total_stored,
1627 total_storage.to_string(),
1628 total_gas,
1629 agg_stats,
1630 ))
1631 }
1632
1633 async fn upload_waves_merkle(
1641 &self,
1642 spill: &ChunkSpill,
1643 batch_result: &MerkleBatchPaymentResult,
1644 progress: Option<&mpsc::Sender<UploadEvent>>,
1645 ) -> Result<(usize, String, u128, WaveAggregateStats)> {
1646 let mut total_stored = 0usize;
1647 let total_chunks = spill.len();
1648 let waves: Vec<&[[u8; 32]]> = spill.waves().collect();
1649 let wave_count = waves.len();
1650 let mut stored_addresses: Vec<[u8; 32]> = Vec::new();
1651 let mut agg_stats = WaveAggregateStats::default();
1652
1653 for (wave_idx, wave_addrs) in waves.into_iter().enumerate() {
1654 let wave_num = wave_idx + 1;
1655 let wave = spill.read_wave(wave_addrs)?;
1656
1657 info!(
1658 "Wave {wave_num}/{wave_count}: storing {} chunks (merkle) — {total_stored}/{total_chunks} stored so far",
1659 wave.len()
1660 );
1661
1662 let store_limiter = self.controller().store.clone();
1663 let store_concurrency = store_limiter.current().min(wave.len().max(1));
1666 let mut upload_stream = stream::iter(wave.into_iter().map(|(content, addr)| {
1667 let proof_bytes = batch_result.proofs.get(&addr).cloned();
1668 let limiter = store_limiter.clone();
1669 async move {
1670 let started = std::time::Instant::now();
1671 let proof = proof_bytes.ok_or_else(|| {
1672 (
1673 addr,
1674 Error::Payment(format!(
1675 "Missing merkle proof for chunk {}",
1676 hex::encode(addr)
1677 )),
1678 started,
1679 )
1680 })?;
1681 let peers = self
1682 .close_group_peers(&addr)
1683 .await
1684 .map_err(|e| (addr, e, started))?;
1685 observe_op(
1686 &limiter,
1687 || async move {
1688 self.chunk_put_to_close_group(content, proof, &peers).await
1689 },
1690 classify_error,
1691 )
1692 .await
1693 .map(|_| (addr, started))
1694 .map_err(|e| (addr, e, started))
1695 }
1696 }))
1697 .buffer_unordered(store_concurrency);
1698
1699 while let Some(result) = upload_stream.next().await {
1700 match result {
1701 Ok((addr, started)) => {
1702 let duration_ms =
1703 u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX);
1704 agg_stats.store_durations_ms.push(duration_ms);
1705 agg_stats.chunk_attempts_total =
1706 agg_stats.chunk_attempts_total.saturating_add(1);
1707 agg_stats.retries_histogram[0] =
1708 agg_stats.retries_histogram[0].saturating_add(1);
1709 stored_addresses.push(addr);
1710 total_stored += 1;
1711 info!("Stored {total_stored}/{total_chunks}");
1712 if let Some(tx) = progress {
1713 let _ = tx
1714 .send(UploadEvent::ChunkStored {
1715 stored: total_stored,
1716 total: total_chunks,
1717 })
1718 .await;
1719 }
1720 }
1721 Err((addr, e, _started)) => {
1722 warn!("merkle upload failed for chunk {}: {e}", hex::encode(addr));
1723 return Err(Error::PartialUpload {
1724 stored: stored_addresses,
1725 stored_count: total_stored,
1726 failed: vec![(addr, e.to_string())],
1727 failed_count: 1,
1728 total_chunks,
1729 reason: format!("merkle chunk upload failed: {e}"),
1730 });
1731 }
1732 }
1733 }
1734
1735 if let Some(tx) = progress {
1736 let _ = tx
1737 .send(UploadEvent::WaveComplete {
1738 wave: wave_num,
1739 total_waves: wave_count,
1740 stored_so_far: total_stored,
1741 total: total_chunks,
1742 })
1743 .await;
1744 }
1745 }
1746
1747 Ok((
1748 total_stored,
1749 batch_result.storage_cost_atto.clone(),
1750 batch_result.gas_cost_wei,
1751 agg_stats,
1752 ))
1753 }
1754
1755 #[allow(clippy::unused_async)]
1776 pub async fn file_download(&self, data_map: &DataMap, output: &Path) -> Result<u64> {
1777 self.file_download_with_progress(data_map, output, None)
1778 .await
1779 }
1780
1781 #[allow(clippy::unused_async)]
1791 pub async fn file_download_with_progress(
1792 &self,
1793 data_map: &DataMap,
1794 output: &Path,
1795 progress: Option<mpsc::Sender<DownloadEvent>>,
1796 ) -> Result<u64> {
1797 debug!("Downloading file to {}", output.display());
1798
1799 let handle = Handle::current();
1800
1801 let root_map = if data_map.is_child() {
1804 let dm_chunks = data_map.len();
1805 if let Some(ref tx) = progress {
1806 let _ = tx.try_send(DownloadEvent::ResolvingDataMap {
1807 total_map_chunks: dm_chunks,
1808 });
1809 }
1810
1811 let resolve_progress = progress.clone();
1812 let resolve_counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
1813
1814 let resolved = tokio::task::block_in_place(|| {
1815 let counter_ref = resolve_counter.clone();
1816 let progress_ref = resolve_progress.clone();
1817 let fetch_limiter = self.controller().fetch.clone();
1818 let fetch = |batch: &[(usize, XorName)]| {
1819 let batch_owned: Vec<(usize, XorName)> = batch.to_vec();
1820 let counter = counter_ref.clone();
1821 let prog = progress_ref.clone();
1822 let limiter = fetch_limiter.clone();
1823 handle.block_on(async {
1824 let cap = limiter.current().min(batch_owned.len().max(1));
1829 let mut stream = futures::stream::iter(batch_owned)
1830 .map(|(idx, hash)| {
1831 let addr = hash.0;
1832 let limiter = limiter.clone();
1833 async move {
1834 let result = observe_op(
1835 &limiter,
1836 || async move { self.chunk_get(&addr).await },
1837 classify_error,
1838 )
1839 .await;
1840 (idx, hash, result)
1841 }
1842 })
1843 .buffer_unordered(cap);
1844 let mut results = Vec::new();
1845 while let Some((idx, hash, result)) =
1846 futures::StreamExt::next(&mut stream).await
1847 {
1848 let chunk = result
1849 .map_err(|e| {
1850 self_encryption::Error::Generic(format!(
1851 "DataMap resolution failed: {e}"
1852 ))
1853 })?
1854 .ok_or_else(|| {
1855 self_encryption::Error::Generic(format!(
1856 "DataMap chunk not found: {}",
1857 hex::encode(hash.0)
1858 ))
1859 })?;
1860 results.push((idx, chunk.content));
1861 let fetched =
1862 counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
1863 if let Some(ref tx) = prog {
1864 let _ = tx.try_send(DownloadEvent::MapChunkFetched { fetched });
1865 }
1866 }
1867 results.sort_by_key(|(idx, _)| *idx);
1876 Ok(results)
1877 })
1878 };
1879 get_root_data_map_parallel(data_map.clone(), &fetch)
1880 })
1881 .map_err(|e| Error::Encryption(format!("DataMap resolution failed: {e}")))?;
1882
1883 info!(
1884 "Resolved hierarchical DataMap: {} data chunks",
1885 resolved.len()
1886 );
1887 resolved
1888 } else {
1889 data_map.clone()
1890 };
1891
1892 let total_chunks = root_map.len();
1894 if let Some(ref tx) = progress {
1895 let _ = tx.try_send(DownloadEvent::DataMapResolved { total_chunks });
1896 }
1897
1898 let fetched_counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
1900 let fetched_for_closure = fetched_counter.clone();
1901 let progress_for_closure = progress.clone();
1902
1903 let fetch_limiter_outer = self.controller().fetch.clone();
1904 let usable_memory = usable_memory_bytes();
1905 let configured_batch_floor = stream_decrypt_batch_size();
1906 let fetch_cap = fetch_limiter_outer.current();
1907 let decrypt_batch_size = adaptive_stream_decrypt_batch_size(
1908 total_chunks,
1909 fetch_cap,
1910 configured_batch_floor,
1911 usable_memory,
1912 );
1913 info!(
1914 total_chunks,
1915 fetch_cap,
1916 configured_batch_floor,
1917 ?usable_memory,
1918 decrypt_batch_size,
1919 "Selected adaptive stream decrypt batch size"
1920 );
1921
1922 let stream = streaming_decrypt_with_batch_size(
1923 &root_map,
1924 |batch: &[(usize, XorName)]| {
1925 let batch_owned: Vec<(usize, XorName)> = batch.to_vec();
1926 let fetched_ref = fetched_for_closure.clone();
1927 let progress_ref = progress_for_closure.clone();
1928 let fetch_limiter = fetch_limiter_outer.clone();
1929
1930 tokio::task::block_in_place(|| {
1931 handle.block_on(async {
1932 let cap = fetch_limiter.current().min(batch_owned.len().max(1));
1934 let mut stream = futures::stream::iter(batch_owned)
1935 .map(|(idx, hash)| {
1936 let addr = hash.0;
1937 let limiter = fetch_limiter.clone();
1938 async move {
1939 let result = observe_op(
1940 &limiter,
1941 || async move { self.chunk_get(&addr).await },
1942 classify_error,
1943 )
1944 .await;
1945 (idx, hash, result)
1946 }
1947 })
1948 .buffer_unordered(cap);
1949
1950 let mut results = Vec::new();
1951 while let Some((idx, hash, result)) =
1952 futures::StreamExt::next(&mut stream).await
1953 {
1954 let addr_hex = hex::encode(hash.0);
1955 let chunk = result
1956 .map_err(|e| {
1957 self_encryption::Error::Generic(format!(
1958 "Network fetch failed for {addr_hex}: {e}"
1959 ))
1960 })?
1961 .ok_or_else(|| {
1962 self_encryption::Error::Generic(format!(
1963 "Chunk not found: {addr_hex}"
1964 ))
1965 })?;
1966 results.push((idx, chunk.content));
1967 let fetched =
1968 fetched_ref.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
1969 info!("Downloaded {fetched}/{total_chunks}");
1970 if let Some(ref tx) = progress_ref {
1971 let _ = tx.try_send(DownloadEvent::ChunksFetched {
1972 fetched,
1973 total: total_chunks,
1974 });
1975 }
1976 }
1977 results.sort_by_key(|(idx, _)| *idx);
1984 Ok(results)
1985 })
1986 })
1987 },
1988 decrypt_batch_size,
1989 )
1990 .map_err(|e| Error::Encryption(format!("streaming decrypt failed: {e}")))?;
1991
1992 let parent = output.parent().unwrap_or_else(|| Path::new("."));
1994 let unique: u64 = rand::random();
1995 let tmp_path = parent.join(format!(".ant_download_{}_{unique}.tmp", std::process::id()));
1996
1997 let write_result = (|| -> Result<u64> {
1998 let mut file = std::fs::File::create(&tmp_path)?;
1999 let mut bytes_written = 0u64;
2000 for chunk_result in stream {
2001 let chunk_bytes = chunk_result
2002 .map_err(|e| Error::Encryption(format!("decryption failed: {e}")))?;
2003 file.write_all(&chunk_bytes)?;
2004 bytes_written += chunk_bytes.len() as u64;
2005 }
2006 file.flush()?;
2007 Ok(bytes_written)
2008 })();
2009
2010 match write_result {
2011 Ok(bytes_written) => match std::fs::rename(&tmp_path, output) {
2012 Ok(()) => {
2013 info!(
2014 "File downloaded: {bytes_written} bytes written to {}",
2015 output.display()
2016 );
2017 Ok(bytes_written)
2018 }
2019 Err(rename_err) => {
2020 if let Err(cleanup_err) = std::fs::remove_file(&tmp_path) {
2021 warn!(
2022 "Failed to remove temp download file {}: {cleanup_err}",
2023 tmp_path.display()
2024 );
2025 }
2026 Err(rename_err.into())
2027 }
2028 },
2029 Err(e) => {
2030 if let Err(cleanup_err) = std::fs::remove_file(&tmp_path) {
2031 warn!(
2032 "Failed to remove temp download file {}: {cleanup_err}",
2033 tmp_path.display()
2034 );
2035 }
2036 Err(e)
2037 }
2038 }
2039 }
2040}
2041
2042#[cfg(test)]
2043#[allow(clippy::unwrap_used)]
2044mod tests {
2045 use super::*;
2046
2047 #[test]
2048 fn disk_space_check_passes_for_small_file() {
2049 check_disk_space_for_spill(1024).unwrap();
2051 }
2052
2053 #[test]
2054 fn disk_space_check_fails_for_absurd_size() {
2055 let result = check_disk_space_for_spill(u64::MAX / 2);
2057 assert!(result.is_err());
2058 let err = result.unwrap_err();
2059 assert!(
2060 matches!(err, Error::InsufficientDiskSpace(_)),
2061 "expected InsufficientDiskSpace, got: {err}"
2062 );
2063 }
2064
2065 #[test]
2066 fn adaptive_stream_decrypt_batch_size_tracks_fetch_headroom() {
2067 let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, Some(u64::MAX));
2068
2069 assert_eq!(batch_size, 64 * DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER);
2070 }
2071
2072 #[test]
2073 fn adaptive_stream_decrypt_batch_size_caps_to_total_chunks() {
2074 let batch_size = adaptive_stream_decrypt_batch_size(12, 64, 10, Some(u64::MAX));
2075
2076 assert_eq!(batch_size, 12);
2077 }
2078
2079 #[test]
2080 fn adaptive_stream_decrypt_batch_size_honours_configured_floor() {
2081 let batch_size = adaptive_stream_decrypt_batch_size(1_000, 1, 32, None);
2082
2083 assert_eq!(batch_size, 32);
2084 }
2085
2086 #[test]
2087 fn adaptive_stream_decrypt_batch_size_does_not_expand_without_memory_reading() {
2088 let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, None);
2089
2090 assert_eq!(batch_size, 10);
2091 }
2092
2093 #[test]
2094 fn adaptive_stream_decrypt_batch_size_caps_to_memory_budget() {
2095 let estimated_bytes_per_chunk = (self_encryption::MAX_CHUNK_SIZE as u64)
2096 .saturating_mul(DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER)
2097 .max(1);
2098 let usable_memory = estimated_bytes_per_chunk
2099 .saturating_mul(16)
2100 .saturating_mul(DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR);
2101 let batch_size = adaptive_stream_decrypt_batch_size(1_000, 256, 10, Some(usable_memory));
2102
2103 assert_eq!(batch_size, 16);
2104 }
2105
2106 #[test]
2107 fn adaptive_stream_decrypt_batch_size_keeps_one_chunk_when_memory_is_tight() {
2108 let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, Some(1));
2109
2110 assert_eq!(batch_size, 1);
2111 }
2112
2113 #[test]
2114 fn chunk_spill_round_trip() {
2115 let mut spill = ChunkSpill::new().unwrap();
2116 let data1 = vec![0xAA; 1024];
2117 let data2 = vec![0xBB; 2048];
2118
2119 spill.push(&data1).unwrap();
2120 spill.push(&data2).unwrap();
2121
2122 assert_eq!(spill.len(), 2);
2123 assert_eq!(spill.total_bytes(), 1024 + 2048);
2124 assert_eq!(spill.avg_chunk_size(), (1024 + 2048) / 2);
2125
2126 let chunk1 = spill.read_chunk(spill.addresses.first().unwrap()).unwrap();
2128 assert_eq!(&chunk1[..], &data1[..]);
2129
2130 let chunk2 = spill.read_chunk(spill.addresses.get(1).unwrap()).unwrap();
2131 assert_eq!(&chunk2[..], &data2[..]);
2132
2133 let waves: Vec<_> = spill.addresses.chunks(1).collect();
2135 assert_eq!(waves.len(), 2);
2136 }
2137
2138 #[test]
2139 fn chunk_spill_cleanup_on_drop() {
2140 let dir;
2141 {
2142 let spill = ChunkSpill::new().unwrap();
2143 dir = spill.dir.clone();
2144 assert!(dir.exists());
2145 }
2146 assert!(!dir.exists(), "spill dir should be removed on drop");
2148 }
2149
2150 #[test]
2151 fn chunk_spill_deduplicates_identical_content() {
2152 let mut spill = ChunkSpill::new().unwrap();
2153 let data = vec![0xCC; 512];
2154
2155 spill.push(&data).unwrap();
2156 spill.push(&data).unwrap(); spill.push(&data).unwrap(); assert_eq!(spill.len(), 1, "duplicate chunks should be deduplicated");
2160 assert_eq!(
2161 spill.total_bytes(),
2162 512,
2163 "total_bytes should count unique only"
2164 );
2165
2166 let data2 = vec![0xDD; 256];
2168 spill.push(&data2).unwrap();
2169 assert_eq!(spill.len(), 2);
2170 assert_eq!(spill.total_bytes(), 512 + 256);
2171 }
2172}
2173
2174#[cfg(test)]
2176mod send_assertions {
2177 use super::*;
2178
2179 fn _assert_send<T: Send>(_: &T) {}
2180
2181 #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
2182 async fn _file_upload_is_send(client: &Client) {
2183 let fut = client.file_upload(Path::new("/dev/null"));
2184 _assert_send(&fut);
2185 }
2186
2187 #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
2188 async fn _file_upload_with_mode_is_send(client: &Client) {
2189 let fut = client.file_upload_with_mode(Path::new("/dev/null"), PaymentMode::Auto);
2190 _assert_send(&fut);
2191 }
2192
2193 #[allow(
2194 dead_code,
2195 unreachable_code,
2196 unused_variables,
2197 clippy::diverging_sub_expression
2198 )]
2199 async fn _file_download_is_send(client: &Client) {
2200 let dm: DataMap = todo!();
2201 let fut = client.file_download(&dm, Path::new("/dev/null"));
2202 _assert_send(&fut);
2203 }
2204}