1use crate::data::client::batch::{finalize_batch_payment, PaymentIntent, PreparedChunk};
14use crate::data::client::merkle::{
15 finalize_merkle_batch, should_use_merkle, MerkleBatchPaymentResult, PaymentMode,
16 PreparedMerkleBatch,
17};
18use crate::data::client::Client;
19use crate::data::error::{Error, Result};
20use ant_protocol::evm::{Amount, PaymentQuote, QuoteHash, TxHash, MAX_LEAVES};
21use ant_protocol::transport::{MultiAddr, PeerId};
22use ant_protocol::{compute_address, DATA_TYPE_CHUNK};
23use bytes::Bytes;
24use fs2::FileExt;
25use futures::stream::{self, StreamExt};
26use self_encryption::{get_root_data_map_parallel, stream_encrypt, streaming_decrypt, DataMap};
27use std::collections::{HashMap, HashSet};
28use std::io::Write;
29use std::path::{Path, PathBuf};
30use std::sync::{Arc, Mutex};
31use tokio::runtime::Handle;
32use tokio::sync::mpsc;
33use tracing::{debug, info, warn};
34use xor_name::XorName;
35
36#[derive(Debug, Clone)]
38pub enum UploadEvent {
39 Encrypting { chunks_done: usize },
41 Encrypted { total_chunks: usize },
43 QuotingChunks {
45 wave: usize,
46 total_waves: usize,
47 chunks_in_wave: usize,
48 },
49 ChunkQuoted { quoted: usize, total: usize },
52 ChunkStored { stored: usize, total: usize },
54 WaveComplete {
56 wave: usize,
57 total_waves: usize,
58 stored_so_far: usize,
59 total: usize,
60 },
61}
62
63#[derive(Debug, Clone)]
65pub enum DownloadEvent {
66 ResolvingDataMap { total_map_chunks: usize },
68 MapChunkFetched { fetched: usize },
70 DataMapResolved { total_chunks: usize },
72 ChunksFetched { fetched: usize, total: usize },
74}
75
76type QuoteEntry = (PeerId, Vec<MultiAddr>, PaymentQuote, Amount);
80
81const UPLOAD_WAVE_SIZE: usize = 64;
83
84const ESTIMATE_SAMPLE_CAP: usize = 5;
91
92const GAS_PER_WAVE_TX: u128 = 1_500_000;
101
102const GAS_PER_MERKLE_TX: u128 = 500_000;
107
108const ARBITRUM_GAS_PRICE_WEI: u128 = 100_000_000;
116
117const DISK_SPACE_HEADROOM_PERCENT: u64 = 10;
123
124const SPILL_MAX_AGE_SECS: u64 = 24 * 60 * 60; const SPILL_DIR_PREFIX: &str = "spill_";
135
136const SPILL_LOCK_NAME: &str = ".lock";
138
139struct ChunkSpill {
140 dir: PathBuf,
142 _lock: std::fs::File,
144 addresses: Vec<[u8; 32]>,
146 seen: HashSet<[u8; 32]>,
148 total_bytes: u64,
150}
151
152impl ChunkSpill {
153 fn spill_root() -> Result<PathBuf> {
155 use crate::config;
156 let root = config::data_dir()
157 .map_err(|e| Error::Config(format!("cannot determine data dir for spill: {e}")))?
158 .join("spill");
159 Ok(root)
160 }
161
162 fn new() -> Result<Self> {
168 let root = Self::spill_root()?;
169 std::fs::create_dir_all(&root)?;
170
171 Self::cleanup_stale(&root);
173
174 let now = std::time::SystemTime::now()
175 .duration_since(std::time::UNIX_EPOCH)
176 .unwrap_or_default()
177 .as_secs();
178 let unique: u64 = rand::random();
179 let dir = root.join(format!("{SPILL_DIR_PREFIX}{now}_{unique}"));
180 std::fs::create_dir(&dir)?;
181
182 let lock_path = dir.join(SPILL_LOCK_NAME);
185 let lock_file = std::fs::File::create(&lock_path).map_err(|e| {
186 Error::Io(std::io::Error::new(
187 e.kind(),
188 format!("failed to create spill lockfile: {e}"),
189 ))
190 })?;
191 lock_file.try_lock_exclusive().map_err(|e| {
192 Error::Io(std::io::Error::new(
193 e.kind(),
194 format!("failed to lock spill lockfile: {e}"),
195 ))
196 })?;
197
198 Ok(Self {
199 dir,
200 _lock: lock_file,
201 addresses: Vec::new(),
202 seen: HashSet::new(),
203 total_bytes: 0,
204 })
205 }
206
207 fn cleanup_stale(root: &Path) {
217 let now = std::time::SystemTime::now()
218 .duration_since(std::time::UNIX_EPOCH)
219 .unwrap_or_default()
220 .as_secs();
221
222 if now == 0 {
223 warn!("System clock before Unix epoch, skipping spill cleanup");
226 return;
227 }
228
229 let entries = match std::fs::read_dir(root) {
230 Ok(entries) => entries,
231 Err(_) => return,
232 };
233
234 for entry in entries.flatten() {
235 let name = entry.file_name();
236 let name_str = name.to_string_lossy();
237
238 let suffix = match name_str.strip_prefix(SPILL_DIR_PREFIX) {
240 Some(s) => s,
241 None => continue,
242 };
243
244 let timestamp: u64 = match suffix.split('_').next().and_then(|s| s.parse().ok()) {
246 Some(ts) => ts,
247 None => continue,
248 };
249
250 if now.saturating_sub(timestamp) <= SPILL_MAX_AGE_SECS {
251 continue;
252 }
253
254 let file_type = match entry.file_type() {
256 Ok(ft) => ft,
257 Err(_) => continue,
258 };
259 if !file_type.is_dir() {
260 continue;
261 }
262
263 let path = entry.path();
264
265 let lock_path = path.join(SPILL_LOCK_NAME);
267 if let Ok(lock_file) = std::fs::File::open(&lock_path) {
268 use fs2::FileExt;
269 if lock_file.try_lock_exclusive().is_err() {
270 debug!("Skipping active spill dir: {}", path.display());
272 continue;
273 }
274 drop(lock_file);
277 }
278
279 info!("Cleaning up stale spill dir: {}", path.display());
280 if let Err(e) = std::fs::remove_dir_all(&path) {
281 warn!("Failed to clean up stale spill dir {}: {e}", path.display());
282 }
283 }
284 }
285
286 #[allow(dead_code)]
288 pub(crate) fn run_cleanup() {
289 if let Ok(root) = Self::spill_root() {
290 Self::cleanup_stale(&root);
291 }
292 }
293
294 fn push(&mut self, content: &[u8]) -> Result<()> {
300 let address = compute_address(content);
301 if !self.seen.insert(address) {
302 return Ok(());
303 }
304 let path = self.dir.join(hex::encode(address));
305 std::fs::write(&path, content)?;
306 self.total_bytes += content.len() as u64;
307 self.addresses.push(address);
308 Ok(())
309 }
310
311 fn len(&self) -> usize {
313 self.addresses.len()
314 }
315
316 fn total_bytes(&self) -> u64 {
318 self.total_bytes
319 }
320
321 fn avg_chunk_size(&self) -> u64 {
323 if self.addresses.is_empty() {
324 return 0;
325 }
326 self.total_bytes / self.addresses.len() as u64
327 }
328
329 fn read_chunk(&self, address: &[u8; 32]) -> Result<Bytes> {
331 let path = self.dir.join(hex::encode(address));
332 let data = std::fs::read(&path).map_err(|e| {
333 Error::Io(std::io::Error::new(
334 e.kind(),
335 format!("reading spilled chunk {}: {e}", hex::encode(address)),
336 ))
337 })?;
338 Ok(Bytes::from(data))
339 }
340
341 fn waves(&self) -> std::slice::Chunks<'_, [u8; 32]> {
343 self.addresses.chunks(UPLOAD_WAVE_SIZE)
344 }
345
346 fn read_wave(&self, wave_addrs: &[[u8; 32]]) -> Result<Vec<(Bytes, [u8; 32])>> {
348 let mut out = Vec::with_capacity(wave_addrs.len());
349 for addr in wave_addrs {
350 let content = self.read_chunk(addr)?;
351 out.push((content, *addr));
352 }
353 Ok(out)
354 }
355
356 fn cleanup(&self) {
358 if let Err(e) = std::fs::remove_dir_all(&self.dir) {
359 warn!(
360 "Failed to clean up chunk spill dir {}: {e}",
361 self.dir.display()
362 );
363 }
364 }
365}
366
367impl Drop for ChunkSpill {
368 fn drop(&mut self) {
369 self.cleanup();
370 }
371}
372
373fn check_disk_space_for_spill(file_size: u64) -> Result<()> {
378 let spill_root = ChunkSpill::spill_root()?;
379
380 std::fs::create_dir_all(&spill_root)?;
382
383 let available = fs2::available_space(&spill_root).map_err(|e| {
384 Error::Io(std::io::Error::new(
385 e.kind(),
386 format!(
387 "failed to query disk space on {}: {e}",
388 spill_root.display()
389 ),
390 ))
391 })?;
392
393 let headroom = file_size / DISK_SPACE_HEADROOM_PERCENT;
395 let required = file_size.saturating_add(headroom);
396
397 if available < required {
398 let avail_mb = available / (1024 * 1024);
399 let req_mb = required / (1024 * 1024);
400 return Err(Error::InsufficientDiskSpace(format!(
401 "need ~{req_mb} MB in spill dir ({}) but only {avail_mb} MB available",
402 spill_root.display()
403 )));
404 }
405
406 debug!(
407 "Disk space check passed: {available} bytes available, {required} bytes required (spill: {})",
408 spill_root.display()
409 );
410 Ok(())
411}
412
413#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
421pub enum Visibility {
422 #[default]
424 Private,
425 Public,
428}
429
430#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
433pub struct UploadCostEstimate {
434 pub file_size: u64,
436 pub chunk_count: usize,
439 pub storage_cost_atto: String,
441 pub estimated_gas_cost_wei: String,
444 pub payment_mode: PaymentMode,
446}
447
448#[derive(Debug, Clone)]
454#[non_exhaustive]
455pub struct FileUploadResult {
456 pub data_map: DataMap,
458 pub chunks_stored: usize,
460 pub chunks_failed: usize,
464 pub total_chunks: usize,
467 pub payment_mode_used: PaymentMode,
469 pub storage_cost_atto: String,
471 pub gas_cost_wei: u128,
473 pub data_map_address: Option<[u8; 32]>,
481}
482
483#[derive(Debug)]
485pub enum ExternalPaymentInfo {
486 WaveBatch {
488 prepared_chunks: Vec<PreparedChunk>,
490 payment_intent: PaymentIntent,
492 },
493 Merkle {
495 prepared_batch: PreparedMerkleBatch,
497 chunk_contents: Vec<Bytes>,
499 chunk_addresses: Vec<[u8; 32]>,
501 },
502}
503
504#[derive(Debug)]
517#[non_exhaustive]
518pub struct PreparedUpload {
519 pub data_map: DataMap,
521 pub payment_info: ExternalPaymentInfo,
523 pub data_map_address: Option<[u8; 32]>,
530}
531
532type EncryptionChannels = (
534 tokio::sync::mpsc::Receiver<Bytes>,
535 tokio::sync::oneshot::Receiver<DataMap>,
536 tokio::task::JoinHandle<Result<()>>,
537);
538
539fn spawn_file_encryption(path: PathBuf) -> Result<EncryptionChannels> {
541 let metadata = std::fs::metadata(&path)?;
542 let data_size = usize::try_from(metadata.len())
543 .map_err(|e| Error::Encryption(format!("file size exceeds platform usize: {e}")))?;
544
545 let (chunk_tx, chunk_rx) = tokio::sync::mpsc::channel(2);
546 let (datamap_tx, datamap_rx) = tokio::sync::oneshot::channel();
547
548 let handle = tokio::task::spawn_blocking(move || {
549 let file = std::fs::File::open(&path)?;
550 let mut reader = std::io::BufReader::new(file);
551
552 let read_error: Arc<Mutex<Option<std::io::Error>>> = Arc::new(Mutex::new(None));
553 let read_error_clone = Arc::clone(&read_error);
554
555 let data_iter = std::iter::from_fn(move || {
556 let mut buffer = vec![0u8; 8192];
557 match std::io::Read::read(&mut reader, &mut buffer) {
558 Ok(0) => None,
559 Ok(n) => {
560 buffer.truncate(n);
561 Some(Bytes::from(buffer))
562 }
563 Err(e) => {
564 let mut guard = read_error_clone
565 .lock()
566 .unwrap_or_else(|poisoned| poisoned.into_inner());
567 *guard = Some(e);
568 None
569 }
570 }
571 });
572
573 let mut stream = stream_encrypt(data_size, data_iter)
574 .map_err(|e| Error::Encryption(format!("stream_encrypt failed: {e}")))?;
575
576 for chunk_result in stream.chunks() {
577 {
582 let guard = read_error
583 .lock()
584 .unwrap_or_else(|poisoned| poisoned.into_inner());
585 if let Some(ref e) = *guard {
586 return Err(Error::Io(std::io::Error::new(e.kind(), e.to_string())));
587 }
588 }
589
590 let (_hash, content) = chunk_result
591 .map_err(|e| Error::Encryption(format!("chunk encryption failed: {e}")))?;
592 if chunk_tx.blocking_send(content).is_err() {
593 return Err(Error::Encryption("upload receiver dropped".to_string()));
594 }
595 }
596
597 {
599 let guard = read_error
600 .lock()
601 .unwrap_or_else(|poisoned| poisoned.into_inner());
602 if let Some(ref e) = *guard {
603 return Err(Error::Io(std::io::Error::new(e.kind(), e.to_string())));
604 }
605 }
606
607 let datamap = stream
608 .into_datamap()
609 .ok_or_else(|| Error::Encryption("no DataMap after encryption".to_string()))?;
610 if datamap_tx.send(datamap).is_err() {
611 warn!("DataMap receiver dropped — upload may have been cancelled");
612 }
613 Ok(())
614 });
615
616 Ok((chunk_rx, datamap_rx, handle))
617}
618
619impl Client {
620 pub async fn file_upload(&self, path: &Path) -> Result<FileUploadResult> {
631 self.file_upload_with_mode(path, PaymentMode::Auto).await
632 }
633
634 pub async fn estimate_upload_cost(
662 &self,
663 path: &Path,
664 mode: PaymentMode,
665 progress: Option<mpsc::Sender<UploadEvent>>,
666 ) -> Result<UploadCostEstimate> {
667 let file_size = std::fs::metadata(path).map_err(Error::Io)?.len();
668
669 if file_size < 3 {
670 return Err(Error::InvalidData(
671 "File too small: self-encryption requires at least 3 bytes".into(),
672 ));
673 }
674
675 check_disk_space_for_spill(file_size)?;
676
677 info!(
678 "Estimating upload cost for {} ({file_size} bytes)",
679 path.display()
680 );
681
682 let (spill, _data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
683 let chunk_count = spill.len();
684
685 if let Some(ref tx) = progress {
686 let _ = tx
687 .send(UploadEvent::Encrypted {
688 total_chunks: chunk_count,
689 })
690 .await;
691 }
692
693 info!("Encrypted into {chunk_count} chunks, requesting quote");
694
695 let sample_limit = spill.addresses.len().min(ESTIMATE_SAMPLE_CAP);
701 let mut sampled = 0usize;
702 let mut all_already_stored = true;
703 let mut quotes_opt: Option<Vec<QuoteEntry>> = None;
704
705 for addr in spill.addresses.iter().take(sample_limit) {
706 sampled += 1;
707 let chunk_bytes = spill.read_chunk(addr)?;
708 let data_size = u64::try_from(chunk_bytes.len())
709 .map_err(|e| Error::InvalidData(format!("chunk size too large: {e}")))?;
710 match self
711 .get_store_quotes(addr, data_size, DATA_TYPE_CHUNK)
712 .await
713 {
714 Ok(q) => {
715 quotes_opt = Some(q);
716 all_already_stored = false;
717 break;
718 }
719 Err(Error::AlreadyStored) => {
720 debug!(
721 "Sample chunk {} already stored; trying next address ({sampled}/{sample_limit})",
722 hex::encode(addr)
723 );
724 continue;
725 }
726 Err(e) => return Err(e),
727 }
728 }
729
730 let uses_merkle = should_use_merkle(chunk_count, mode);
731
732 let quotes = match quotes_opt {
733 Some(q) => q,
734 None if all_already_stored && sampled == chunk_count => {
735 info!("All {chunk_count} chunks already stored; returning zero-cost estimate");
739 return Ok(UploadCostEstimate {
740 file_size,
741 chunk_count,
742 storage_cost_atto: "0".into(),
743 estimated_gas_cost_wei: "0".into(),
744 payment_mode: if uses_merkle {
745 PaymentMode::Merkle
746 } else {
747 PaymentMode::Single
748 },
749 });
750 }
751 None => {
752 return Err(Error::CostEstimationInconclusive(format!(
753 "sampled {sampled} chunk addresses out of {chunk_count} and every \
754 one reported AlreadyStored; cannot infer a representative price \
755 for the remaining chunks"
756 )));
757 }
758 };
759
760 let mut prices: Vec<Amount> = quotes.iter().map(|(_, _, _, price)| *price).collect();
763 prices.sort();
764 let median_price = prices
765 .get(prices.len() / 2)
766 .copied()
767 .unwrap_or(Amount::ZERO);
768 let per_chunk_cost = median_price * Amount::from(3u64);
769
770 let chunk_count_u64 = u64::try_from(chunk_count).unwrap_or(u64::MAX);
771 let total_storage = per_chunk_cost * Amount::from(chunk_count_u64);
772
773 let waves = u128::try_from(chunk_count.div_ceil(UPLOAD_WAVE_SIZE)).unwrap_or(u128::MAX);
789 let merkle_batches = u128::try_from(chunk_count.div_ceil(MAX_LEAVES)).unwrap_or(u128::MAX);
790 let estimated_gas: u128 = if uses_merkle {
791 merkle_batches
792 .saturating_mul(GAS_PER_MERKLE_TX)
793 .saturating_mul(ARBITRUM_GAS_PRICE_WEI)
794 } else {
795 waves
796 .saturating_mul(GAS_PER_WAVE_TX)
797 .saturating_mul(ARBITRUM_GAS_PRICE_WEI)
798 };
799
800 info!(
801 "Estimate: {chunk_count} chunks, storage={total_storage} atto, gas~={estimated_gas} wei"
802 );
803
804 Ok(UploadCostEstimate {
805 file_size,
806 chunk_count,
807 storage_cost_atto: total_storage.to_string(),
808 estimated_gas_cost_wei: estimated_gas.to_string(),
809 payment_mode: if uses_merkle {
810 PaymentMode::Merkle
811 } else {
812 PaymentMode::Single
813 },
814 })
815 }
816
817 pub async fn file_prepare_upload(&self, path: &Path) -> Result<PreparedUpload> {
822 self.file_prepare_upload_with_visibility(path, Visibility::Private)
823 .await
824 }
825
826 pub async fn file_prepare_upload_with_visibility(
852 &self,
853 path: &Path,
854 visibility: Visibility,
855 ) -> Result<PreparedUpload> {
856 debug!(
857 "Preparing file upload for external signing (visibility={visibility:?}): {}",
858 path.display()
859 );
860
861 let file_size = std::fs::metadata(path)?.len();
862 check_disk_space_for_spill(file_size)?;
863
864 let (spill, data_map) = self.encrypt_file_to_spill(path, None).await?;
865
866 info!(
867 "Encrypted {} into {} chunks for external signing (spilled to disk)",
868 path.display(),
869 spill.len()
870 );
871
872 let mut chunk_data: Vec<Bytes> = spill
876 .addresses
877 .iter()
878 .map(|addr| spill.read_chunk(addr))
879 .collect::<std::result::Result<Vec<_>, _>>()?;
880
881 let data_map_address = match visibility {
887 Visibility::Private => None,
888 Visibility::Public => {
889 let serialized = rmp_serde::to_vec(&data_map).map_err(|e| {
890 Error::Serialization(format!("Failed to serialize DataMap: {e}"))
891 })?;
892 let bytes = Bytes::from(serialized);
893 let address = compute_address(&bytes);
894 info!(
895 "Public upload: bundling DataMap chunk ({} bytes) at address {}",
896 bytes.len(),
897 hex::encode(address)
898 );
899 chunk_data.push(bytes);
900 Some(address)
901 }
902 };
903
904 let chunk_count = chunk_data.len();
905
906 let payment_info = if should_use_merkle(chunk_count, PaymentMode::Auto) {
907 info!("Using merkle batch preparation for {chunk_count} file chunks");
909
910 let addresses: Vec<[u8; 32]> = chunk_data.iter().map(|c| compute_address(c)).collect();
911
912 let avg_size =
913 chunk_data.iter().map(bytes::Bytes::len).sum::<usize>() / chunk_count.max(1);
914 let avg_size_u64 = u64::try_from(avg_size).unwrap_or(0);
915
916 let prepared_batch = self
917 .prepare_merkle_batch_external(&addresses, DATA_TYPE_CHUNK, avg_size_u64)
918 .await?;
919
920 info!(
921 "File prepared for external merkle signing: {} chunks, depth={} ({})",
922 chunk_count,
923 prepared_batch.depth,
924 path.display()
925 );
926
927 ExternalPaymentInfo::Merkle {
928 prepared_batch,
929 chunk_contents: chunk_data,
930 chunk_addresses: addresses,
931 }
932 } else {
933 let quote_concurrency = self.config().quote_concurrency;
935 let results: Vec<Result<Option<PreparedChunk>>> = stream::iter(chunk_data)
936 .map(|content| async move { self.prepare_chunk_payment(content).await })
937 .buffer_unordered(quote_concurrency)
938 .collect()
939 .await;
940
941 let mut prepared_chunks = Vec::with_capacity(spill.len());
942 for result in results {
943 if let Some(prepared) = result? {
944 prepared_chunks.push(prepared);
945 }
946 }
947
948 if let Some(addr) = data_map_address {
954 if !prepared_chunks.iter().any(|c| c.address == addr) {
955 info!(
956 "Public upload: DataMap chunk {} was already stored \
957 on the network — address is retrievable without a \
958 new payment",
959 hex::encode(addr)
960 );
961 }
962 }
963
964 let payment_intent = PaymentIntent::from_prepared_chunks(&prepared_chunks);
965
966 info!(
967 "File prepared for external signing: {} chunks, total {} atto ({})",
968 prepared_chunks.len(),
969 payment_intent.total_amount,
970 path.display()
971 );
972
973 ExternalPaymentInfo::WaveBatch {
974 prepared_chunks,
975 payment_intent,
976 }
977 };
978
979 Ok(PreparedUpload {
980 data_map,
981 payment_info,
982 data_map_address,
983 })
984 }
985
986 pub async fn finalize_upload(
998 &self,
999 prepared: PreparedUpload,
1000 tx_hash_map: &HashMap<QuoteHash, TxHash>,
1001 ) -> Result<FileUploadResult> {
1002 let data_map_address = prepared.data_map_address;
1003 match prepared.payment_info {
1004 ExternalPaymentInfo::WaveBatch {
1005 prepared_chunks,
1006 payment_intent: _,
1007 } => {
1008 let paid_chunks = finalize_batch_payment(prepared_chunks, tx_hash_map)?;
1009 let wave_result = self.store_paid_chunks(paid_chunks).await;
1010 if !wave_result.failed.is_empty() {
1011 let failed_count = wave_result.failed.len();
1012 let stored_count = wave_result.stored.len();
1013 return Err(Error::PartialUpload {
1014 stored: wave_result.stored.clone(),
1015 stored_count,
1016 failed: wave_result.failed,
1017 failed_count,
1018 total_chunks: stored_count + failed_count,
1019 reason: "finalize_upload: chunk storage failed after retries".into(),
1020 });
1021 }
1022 let chunks_stored = wave_result.stored.len();
1023
1024 info!("External-signer upload finalized: {chunks_stored} chunks stored");
1025
1026 Ok(FileUploadResult {
1027 data_map: prepared.data_map,
1028 chunks_stored,
1029 chunks_failed: 0,
1030 total_chunks: chunks_stored,
1031 payment_mode_used: PaymentMode::Single,
1032 storage_cost_atto: "0".into(),
1033 gas_cost_wei: 0,
1034 data_map_address,
1035 })
1036 }
1037 ExternalPaymentInfo::Merkle { .. } => Err(Error::Payment(
1038 "Cannot finalize merkle upload with wave-batch tx hashes. \
1039 Use finalize_upload_merkle() instead."
1040 .to_string(),
1041 )),
1042 }
1043 }
1044
1045 pub async fn finalize_upload_merkle(
1057 &self,
1058 prepared: PreparedUpload,
1059 winner_pool_hash: [u8; 32],
1060 ) -> Result<FileUploadResult> {
1061 let data_map_address = prepared.data_map_address;
1062 match prepared.payment_info {
1063 ExternalPaymentInfo::Merkle {
1064 prepared_batch,
1065 chunk_contents,
1066 chunk_addresses,
1067 } => {
1068 let batch_result = finalize_merkle_batch(prepared_batch, winner_pool_hash)?;
1069 let chunks_stored = self
1070 .merkle_upload_chunks(chunk_contents, chunk_addresses, &batch_result)
1071 .await?;
1072
1073 info!("External-signer merkle upload finalized: {chunks_stored} chunks stored");
1074
1075 Ok(FileUploadResult {
1076 data_map: prepared.data_map,
1077 chunks_stored,
1078 chunks_failed: 0,
1079 total_chunks: chunks_stored,
1080 payment_mode_used: PaymentMode::Merkle,
1081 storage_cost_atto: "0".into(),
1082 gas_cost_wei: 0,
1083 data_map_address,
1084 })
1085 }
1086 ExternalPaymentInfo::WaveBatch { .. } => Err(Error::Payment(
1087 "Cannot finalize wave-batch upload with merkle winner hash. \
1088 Use finalize_upload() instead."
1089 .to_string(),
1090 )),
1091 }
1092 }
1093
1094 #[allow(clippy::too_many_lines)]
1108 pub async fn file_upload_with_mode(
1109 &self,
1110 path: &Path,
1111 mode: PaymentMode,
1112 ) -> Result<FileUploadResult> {
1113 self.file_upload_with_progress(path, mode, None).await
1114 }
1115
1116 #[allow(clippy::too_many_lines)]
1121 pub async fn file_upload_with_progress(
1122 &self,
1123 path: &Path,
1124 mode: PaymentMode,
1125 progress: Option<mpsc::Sender<UploadEvent>>,
1126 ) -> Result<FileUploadResult> {
1127 debug!(
1128 "Streaming file upload with mode {mode:?}: {}",
1129 path.display()
1130 );
1131
1132 let file_size = std::fs::metadata(path)?.len();
1134 check_disk_space_for_spill(file_size)?;
1135
1136 let (spill, data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
1139
1140 let chunk_count = spill.len();
1141 info!(
1142 "Encrypted {} into {chunk_count} chunks (spilled to disk)",
1143 path.display()
1144 );
1145 if let Some(ref tx) = progress {
1146 let _ = tx
1147 .send(UploadEvent::Encrypted {
1148 total_chunks: chunk_count,
1149 })
1150 .await;
1151 }
1152
1153 let (chunks_stored, actual_mode, storage_cost_atto, gas_cost_wei) =
1155 if self.should_use_merkle(chunk_count, mode) {
1156 info!("Using merkle batch payment for {chunk_count} file chunks");
1157
1158 let batch_result = match self
1159 .pay_for_merkle_batch(&spill.addresses, DATA_TYPE_CHUNK, spill.avg_chunk_size())
1160 .await
1161 {
1162 Ok(result) => result,
1163 Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => {
1164 info!("Merkle needs more peers ({msg}), falling back to wave-batch");
1165 let (stored, sc, gc) =
1166 self.upload_waves_single(&spill, progress.as_ref()).await?;
1167 return Ok(FileUploadResult {
1168 data_map,
1169 chunks_stored: stored,
1170 chunks_failed: 0,
1171 total_chunks: chunk_count,
1172 payment_mode_used: PaymentMode::Single,
1173 storage_cost_atto: sc,
1174 gas_cost_wei: gc,
1175 data_map_address: None,
1176 });
1177 }
1178 Err(e) => return Err(e),
1179 };
1180
1181 let (stored, sc, gc) = self
1182 .upload_waves_merkle(&spill, &batch_result, progress.as_ref())
1183 .await?;
1184 (stored, PaymentMode::Merkle, sc, gc)
1185 } else {
1186 let (stored, sc, gc) = self.upload_waves_single(&spill, progress.as_ref()).await?;
1187 (stored, PaymentMode::Single, sc, gc)
1188 };
1189
1190 info!(
1191 "File uploaded with {actual_mode:?}: {chunks_stored} chunks stored ({})",
1192 path.display()
1193 );
1194
1195 Ok(FileUploadResult {
1196 data_map,
1197 chunks_stored,
1198 chunks_failed: 0,
1199 total_chunks: chunk_count,
1200 payment_mode_used: actual_mode,
1201 storage_cost_atto,
1202 gas_cost_wei,
1203 data_map_address: None,
1204 })
1205 }
1206
1207 async fn encrypt_file_to_spill(
1214 &self,
1215 path: &Path,
1216 progress: Option<&mpsc::Sender<UploadEvent>>,
1217 ) -> Result<(ChunkSpill, DataMap)> {
1218 let (mut chunk_rx, datamap_rx, handle) = spawn_file_encryption(path.to_path_buf())?;
1219
1220 let mut spill = ChunkSpill::new()?;
1221 while let Some(content) = chunk_rx.recv().await {
1222 spill.push(&content)?;
1223 let chunks_done = spill.len();
1224 if let Some(tx) = progress {
1225 if chunks_done.is_multiple_of(10) {
1226 let _ = tx.send(UploadEvent::Encrypting { chunks_done }).await;
1227 }
1228 }
1229 if chunks_done % 100 == 0 {
1230 let mb = spill.total_bytes() / (1024 * 1024);
1231 info!(
1232 "Encryption progress: {chunks_done} chunks spilled ({mb} MB) — {}",
1233 path.display()
1234 );
1235 }
1236 }
1237
1238 handle
1240 .await
1241 .map_err(|e| Error::Encryption(format!("encryption task panicked: {e}")))?
1242 .map_err(|e| Error::Encryption(format!("encryption failed: {e}")))?;
1243
1244 let data_map = datamap_rx
1245 .await
1246 .map_err(|_| Error::Encryption("no DataMap from encryption thread".to_string()))?;
1247
1248 Ok((spill, data_map))
1249 }
1250
1251 async fn upload_waves_single(
1258 &self,
1259 spill: &ChunkSpill,
1260 progress: Option<&mpsc::Sender<UploadEvent>>,
1261 ) -> Result<(usize, String, u128)> {
1262 let mut total_stored = 0usize;
1263 let mut total_storage = Amount::ZERO;
1264 let mut total_gas: u128 = 0;
1265 let total_chunks = spill.len();
1266 let waves: Vec<&[[u8; 32]]> = spill.waves().collect();
1267 let wave_count = waves.len();
1268
1269 for (wave_idx, wave_addrs) in waves.into_iter().enumerate() {
1270 let wave_num = wave_idx + 1;
1271 let wave_data: Vec<Bytes> = wave_addrs
1272 .iter()
1273 .map(|addr| spill.read_chunk(addr))
1274 .collect::<Result<Vec<_>>>()?;
1275
1276 info!(
1277 "Wave {wave_num}/{wave_count}: quoting {} chunks — {total_stored}/{total_chunks} stored so far",
1278 wave_data.len()
1279 );
1280 if let Some(tx) = progress {
1281 let _ = tx
1282 .send(UploadEvent::QuotingChunks {
1283 wave: wave_num,
1284 total_waves: wave_count,
1285 chunks_in_wave: wave_data.len(),
1286 })
1287 .await;
1288 }
1289 let (addresses, wave_storage, wave_gas) = self
1290 .batch_upload_chunks_with_events(wave_data, progress, total_stored, total_chunks)
1291 .await?;
1292 total_stored += addresses.len();
1293 if let Ok(cost) = wave_storage.parse::<Amount>() {
1294 total_storage += cost;
1295 }
1296 total_gas = total_gas.saturating_add(wave_gas);
1297 if let Some(tx) = progress {
1298 let _ = tx
1299 .send(UploadEvent::WaveComplete {
1300 wave: wave_num,
1301 total_waves: wave_count,
1302 stored_so_far: total_stored,
1303 total: total_chunks,
1304 })
1305 .await;
1306 }
1307 }
1308
1309 Ok((total_stored, total_storage.to_string(), total_gas))
1310 }
1311
1312 async fn upload_waves_merkle(
1320 &self,
1321 spill: &ChunkSpill,
1322 batch_result: &MerkleBatchPaymentResult,
1323 progress: Option<&mpsc::Sender<UploadEvent>>,
1324 ) -> Result<(usize, String, u128)> {
1325 let mut total_stored = 0usize;
1326 let total_chunks = spill.len();
1327 let waves: Vec<&[[u8; 32]]> = spill.waves().collect();
1328 let wave_count = waves.len();
1329 let mut stored_addresses: Vec<[u8; 32]> = Vec::new();
1330
1331 for (wave_idx, wave_addrs) in waves.into_iter().enumerate() {
1332 let wave_num = wave_idx + 1;
1333 let wave = spill.read_wave(wave_addrs)?;
1334
1335 info!(
1336 "Wave {wave_num}/{wave_count}: storing {} chunks (merkle) — {total_stored}/{total_chunks} stored so far",
1337 wave.len()
1338 );
1339
1340 let mut upload_stream = stream::iter(wave.into_iter().map(|(content, addr)| {
1341 let proof_bytes = batch_result.proofs.get(&addr).cloned();
1342 async move {
1343 let proof = proof_bytes.ok_or_else(|| {
1344 (
1345 addr,
1346 Error::Payment(format!(
1347 "Missing merkle proof for chunk {}",
1348 hex::encode(addr)
1349 )),
1350 )
1351 })?;
1352 let peers = self.close_group_peers(&addr).await.map_err(|e| (addr, e))?;
1353 self.chunk_put_to_close_group(content, proof, &peers)
1354 .await
1355 .map(|_| addr)
1356 .map_err(|e| (addr, e))
1357 }
1358 }))
1359 .buffer_unordered(self.config().store_concurrency);
1360
1361 while let Some(result) = upload_stream.next().await {
1362 match result {
1363 Ok(addr) => {
1364 stored_addresses.push(addr);
1365 total_stored += 1;
1366 info!("Stored {total_stored}/{total_chunks}");
1367 if let Some(tx) = progress {
1368 let _ = tx
1369 .send(UploadEvent::ChunkStored {
1370 stored: total_stored,
1371 total: total_chunks,
1372 })
1373 .await;
1374 }
1375 }
1376 Err((addr, e)) => {
1377 warn!("merkle upload failed for chunk {}: {e}", hex::encode(addr));
1378 return Err(Error::PartialUpload {
1379 stored: stored_addresses,
1380 stored_count: total_stored,
1381 failed: vec![(addr, e.to_string())],
1382 failed_count: 1,
1383 total_chunks,
1384 reason: format!("merkle chunk upload failed: {e}"),
1385 });
1386 }
1387 }
1388 }
1389
1390 if let Some(tx) = progress {
1391 let _ = tx
1392 .send(UploadEvent::WaveComplete {
1393 wave: wave_num,
1394 total_waves: wave_count,
1395 stored_so_far: total_stored,
1396 total: total_chunks,
1397 })
1398 .await;
1399 }
1400 }
1401
1402 Ok((
1403 total_stored,
1404 batch_result.storage_cost_atto.clone(),
1405 batch_result.gas_cost_wei,
1406 ))
1407 }
1408
1409 #[allow(clippy::unused_async)]
1430 pub async fn file_download(&self, data_map: &DataMap, output: &Path) -> Result<u64> {
1431 self.file_download_with_progress(data_map, output, None)
1432 .await
1433 }
1434
1435 #[allow(clippy::unused_async)]
1445 pub async fn file_download_with_progress(
1446 &self,
1447 data_map: &DataMap,
1448 output: &Path,
1449 progress: Option<mpsc::Sender<DownloadEvent>>,
1450 ) -> Result<u64> {
1451 debug!("Downloading file to {}", output.display());
1452
1453 let handle = Handle::current();
1454
1455 let root_map = if data_map.is_child() {
1458 let dm_chunks = data_map.len();
1459 if let Some(ref tx) = progress {
1460 let _ = tx.try_send(DownloadEvent::ResolvingDataMap {
1461 total_map_chunks: dm_chunks,
1462 });
1463 }
1464
1465 let resolve_progress = progress.clone();
1466 let resolve_counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
1467
1468 let resolved = tokio::task::block_in_place(|| {
1469 let counter_ref = resolve_counter.clone();
1470 let progress_ref = resolve_progress.clone();
1471 let fetch = |batch: &[(usize, XorName)]| {
1472 let batch_owned: Vec<(usize, XorName)> = batch.to_vec();
1473 let counter = counter_ref.clone();
1474 let prog = progress_ref.clone();
1475 handle.block_on(async {
1476 let mut futs = futures::stream::FuturesUnordered::new();
1477 for (idx, hash) in batch_owned {
1478 let addr = hash.0;
1479 futs.push(async move {
1480 let result = self.chunk_get(&addr).await;
1481 (idx, hash, result)
1482 });
1483 }
1484 let mut results = Vec::with_capacity(futs.len());
1485 while let Some((idx, hash, result)) =
1486 futures::StreamExt::next(&mut futs).await
1487 {
1488 let chunk = result
1489 .map_err(|e| {
1490 self_encryption::Error::Generic(format!(
1491 "DataMap resolution failed: {e}"
1492 ))
1493 })?
1494 .ok_or_else(|| {
1495 self_encryption::Error::Generic(format!(
1496 "DataMap chunk not found: {}",
1497 hex::encode(hash.0)
1498 ))
1499 })?;
1500 results.push((idx, chunk.content));
1501 let fetched =
1502 counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
1503 if let Some(ref tx) = prog {
1504 let _ = tx.try_send(DownloadEvent::MapChunkFetched { fetched });
1505 }
1506 }
1507 Ok(results)
1508 })
1509 };
1510 get_root_data_map_parallel(data_map.clone(), &fetch)
1511 })
1512 .map_err(|e| Error::Encryption(format!("DataMap resolution failed: {e}")))?;
1513
1514 info!(
1515 "Resolved hierarchical DataMap: {} data chunks",
1516 resolved.len()
1517 );
1518 resolved
1519 } else {
1520 data_map.clone()
1521 };
1522
1523 let total_chunks = root_map.len();
1525 if let Some(ref tx) = progress {
1526 let _ = tx.try_send(DownloadEvent::DataMapResolved { total_chunks });
1527 }
1528
1529 let fetched_counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
1531 let fetched_for_closure = fetched_counter.clone();
1532 let progress_for_closure = progress.clone();
1533
1534 let stream = streaming_decrypt(&root_map, |batch: &[(usize, XorName)]| {
1535 let batch_owned: Vec<(usize, XorName)> = batch.to_vec();
1536 let fetched_ref = fetched_for_closure.clone();
1537 let progress_ref = progress_for_closure.clone();
1538
1539 tokio::task::block_in_place(|| {
1540 handle.block_on(async {
1541 let mut futs = futures::stream::FuturesUnordered::new();
1542 for (idx, hash) in batch_owned {
1543 let addr = hash.0;
1544 futs.push(async move {
1545 let result = self.chunk_get(&addr).await;
1546 (idx, hash, result)
1547 });
1548 }
1549
1550 let mut results = Vec::with_capacity(futs.len());
1551 while let Some((idx, hash, result)) = futures::StreamExt::next(&mut futs).await
1552 {
1553 let addr_hex = hex::encode(hash.0);
1554 let chunk = result
1555 .map_err(|e| {
1556 self_encryption::Error::Generic(format!(
1557 "Network fetch failed for {addr_hex}: {e}"
1558 ))
1559 })?
1560 .ok_or_else(|| {
1561 self_encryption::Error::Generic(format!(
1562 "Chunk not found: {addr_hex}"
1563 ))
1564 })?;
1565 results.push((idx, chunk.content));
1566 let fetched =
1567 fetched_ref.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
1568 info!("Downloaded {fetched}/{total_chunks}");
1569 if let Some(ref tx) = progress_ref {
1570 let _ = tx.try_send(DownloadEvent::ChunksFetched {
1571 fetched,
1572 total: total_chunks,
1573 });
1574 }
1575 }
1576 Ok(results)
1577 })
1578 })
1579 })
1580 .map_err(|e| Error::Encryption(format!("streaming decrypt failed: {e}")))?;
1581
1582 let parent = output.parent().unwrap_or_else(|| Path::new("."));
1584 let unique: u64 = rand::random();
1585 let tmp_path = parent.join(format!(".ant_download_{}_{unique}.tmp", std::process::id()));
1586
1587 let write_result = (|| -> Result<u64> {
1588 let mut file = std::fs::File::create(&tmp_path)?;
1589 let mut bytes_written = 0u64;
1590 for chunk_result in stream {
1591 let chunk_bytes = chunk_result
1592 .map_err(|e| Error::Encryption(format!("decryption failed: {e}")))?;
1593 file.write_all(&chunk_bytes)?;
1594 bytes_written += chunk_bytes.len() as u64;
1595 }
1596 file.flush()?;
1597 Ok(bytes_written)
1598 })();
1599
1600 match write_result {
1601 Ok(bytes_written) => match std::fs::rename(&tmp_path, output) {
1602 Ok(()) => {
1603 info!(
1604 "File downloaded: {bytes_written} bytes written to {}",
1605 output.display()
1606 );
1607 Ok(bytes_written)
1608 }
1609 Err(rename_err) => {
1610 if let Err(cleanup_err) = std::fs::remove_file(&tmp_path) {
1611 warn!(
1612 "Failed to remove temp download file {}: {cleanup_err}",
1613 tmp_path.display()
1614 );
1615 }
1616 Err(rename_err.into())
1617 }
1618 },
1619 Err(e) => {
1620 if let Err(cleanup_err) = std::fs::remove_file(&tmp_path) {
1621 warn!(
1622 "Failed to remove temp download file {}: {cleanup_err}",
1623 tmp_path.display()
1624 );
1625 }
1626 Err(e)
1627 }
1628 }
1629 }
1630}
1631
1632#[cfg(test)]
1633#[allow(clippy::unwrap_used)]
1634mod tests {
1635 use super::*;
1636
1637 #[test]
1638 fn disk_space_check_passes_for_small_file() {
1639 check_disk_space_for_spill(1024).unwrap();
1641 }
1642
1643 #[test]
1644 fn disk_space_check_fails_for_absurd_size() {
1645 let result = check_disk_space_for_spill(u64::MAX / 2);
1647 assert!(result.is_err());
1648 let err = result.unwrap_err();
1649 assert!(
1650 matches!(err, Error::InsufficientDiskSpace(_)),
1651 "expected InsufficientDiskSpace, got: {err}"
1652 );
1653 }
1654
1655 #[test]
1656 fn chunk_spill_round_trip() {
1657 let mut spill = ChunkSpill::new().unwrap();
1658 let data1 = vec![0xAA; 1024];
1659 let data2 = vec![0xBB; 2048];
1660
1661 spill.push(&data1).unwrap();
1662 spill.push(&data2).unwrap();
1663
1664 assert_eq!(spill.len(), 2);
1665 assert_eq!(spill.total_bytes(), 1024 + 2048);
1666 assert_eq!(spill.avg_chunk_size(), (1024 + 2048) / 2);
1667
1668 let chunk1 = spill.read_chunk(spill.addresses.first().unwrap()).unwrap();
1670 assert_eq!(&chunk1[..], &data1[..]);
1671
1672 let chunk2 = spill.read_chunk(spill.addresses.get(1).unwrap()).unwrap();
1673 assert_eq!(&chunk2[..], &data2[..]);
1674
1675 let waves: Vec<_> = spill.addresses.chunks(1).collect();
1677 assert_eq!(waves.len(), 2);
1678 }
1679
1680 #[test]
1681 fn chunk_spill_cleanup_on_drop() {
1682 let dir;
1683 {
1684 let spill = ChunkSpill::new().unwrap();
1685 dir = spill.dir.clone();
1686 assert!(dir.exists());
1687 }
1688 assert!(!dir.exists(), "spill dir should be removed on drop");
1690 }
1691
1692 #[test]
1693 fn chunk_spill_deduplicates_identical_content() {
1694 let mut spill = ChunkSpill::new().unwrap();
1695 let data = vec![0xCC; 512];
1696
1697 spill.push(&data).unwrap();
1698 spill.push(&data).unwrap(); spill.push(&data).unwrap(); assert_eq!(spill.len(), 1, "duplicate chunks should be deduplicated");
1702 assert_eq!(
1703 spill.total_bytes(),
1704 512,
1705 "total_bytes should count unique only"
1706 );
1707
1708 let data2 = vec![0xDD; 256];
1710 spill.push(&data2).unwrap();
1711 assert_eq!(spill.len(), 2);
1712 assert_eq!(spill.total_bytes(), 512 + 256);
1713 }
1714}
1715
1716#[cfg(test)]
1718mod send_assertions {
1719 use super::*;
1720
1721 fn _assert_send<T: Send>(_: &T) {}
1722
1723 #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
1724 async fn _file_upload_is_send(client: &Client) {
1725 let fut = client.file_upload(Path::new("/dev/null"));
1726 _assert_send(&fut);
1727 }
1728
1729 #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
1730 async fn _file_upload_with_mode_is_send(client: &Client) {
1731 let fut = client.file_upload_with_mode(Path::new("/dev/null"), PaymentMode::Auto);
1732 _assert_send(&fut);
1733 }
1734
1735 #[allow(
1736 dead_code,
1737 unreachable_code,
1738 unused_variables,
1739 clippy::diverging_sub_expression
1740 )]
1741 async fn _file_download_is_send(client: &Client) {
1742 let dm: DataMap = todo!();
1743 let fut = client.file_download(&dm, Path::new("/dev/null"));
1744 _assert_send(&fut);
1745 }
1746}