use crate::data::client::adaptive::{observe_op, rebucketed_unordered};
use crate::data::client::batch::{
finalize_batch_payment, PaymentIntent, PreparedChunk, WaveAggregateStats,
};
use crate::data::client::classify_error;
use crate::data::client::merkle::{
chunk_contents_for_upload_addresses, finalize_merkle_batch, merkle_store_with_retry,
should_use_merkle, MerkleBatchPaymentResult, PaymentMode, PreparedMerkleBatch,
MERKLE_RETRY_BACKOFF, MERKLE_STORE_MAX_ATTEMPTS,
};
use crate::data::client::Client;
use crate::data::error::{Error, Result};
use ant_protocol::evm::{Amount, PaymentQuote, QuoteHash, TxHash, MAX_LEAVES};
use ant_protocol::transport::{MultiAddr, PeerId};
use ant_protocol::{compute_address, DATA_TYPE_CHUNK};
use bytes::Bytes;
use fs2::FileExt;
use futures::stream::{self, StreamExt};
use self_encryption::{
get_root_data_map_parallel, stream_decrypt_batch_size, stream_encrypt,
streaming_decrypt_with_batch_size, DataMap,
};
use std::collections::{HashMap, HashSet};
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use tokio::runtime::Handle;
use tokio::sync::mpsc;
use tracing::{debug, info, warn};
use xor_name::XorName;
#[derive(Debug, Clone)]
pub enum UploadEvent {
Encrypting { chunks_done: usize },
Encrypted { total_chunks: usize },
QuotingChunks {
wave: usize,
total_waves: usize,
chunks_in_wave: usize,
},
ChunkQuoted { quoted: usize, total: usize },
ChunkStored { stored: usize, total: usize },
WaveComplete {
wave: usize,
total_waves: usize,
stored_so_far: usize,
total: usize,
},
}
#[derive(Debug, Clone)]
pub enum DownloadEvent {
ResolvingDataMap { total_map_chunks: usize },
MapChunkFetched { fetched: usize },
DataMapResolved { total_chunks: usize },
ChunksFetched { fetched: usize, total: usize },
}
type QuoteEntry = (PeerId, Vec<MultiAddr>, PaymentQuote, Amount);
const UPLOAD_WAVE_SIZE: usize = 64;
const DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER: usize = 4;
const DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR: u64 = 4;
const DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER: u64 = 3;
const ESTIMATE_SAMPLE_CAP: usize = 5;
const GAS_PER_WAVE_TX: u128 = 1_500_000;
const GAS_PER_MERKLE_TX: u128 = 500_000;
const ARBITRUM_GAS_PRICE_WEI: u128 = 100_000_000;
const DISK_SPACE_HEADROOM_PERCENT: u64 = 10;
const SPILL_STALE_GRACE_SECS: u64 = 30;
const SPILL_DIR_PREFIX: &str = "spill_";
const SPILL_LOCK_NAME: &str = ".lock";
struct ChunkSpill {
dir: PathBuf,
_lock: std::fs::File,
addresses: Vec<[u8; 32]>,
seen: HashSet<[u8; 32]>,
sizes: HashMap<[u8; 32], u64>,
total_bytes: u64,
}
impl ChunkSpill {
fn spill_root() -> Result<PathBuf> {
use crate::config;
let root = config::data_dir()
.map_err(|e| Error::Config(format!("cannot determine data dir for spill: {e}")))?
.join("spill");
Ok(root)
}
fn new() -> Result<Self> {
let root = Self::spill_root()?;
std::fs::create_dir_all(&root)?;
Self::cleanup_stale(&root);
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let unique: u64 = rand::random();
let dir = root.join(format!("{SPILL_DIR_PREFIX}{now}_{unique}"));
std::fs::create_dir(&dir)?;
let lock_path = dir.join(SPILL_LOCK_NAME);
let lock_file = std::fs::File::create(&lock_path).map_err(|e| {
Error::Io(std::io::Error::new(
e.kind(),
format!("failed to create spill lockfile: {e}"),
))
})?;
lock_file.try_lock_exclusive().map_err(|e| {
Error::Io(std::io::Error::new(
e.kind(),
format!("failed to lock spill lockfile: {e}"),
))
})?;
Ok(Self {
dir,
_lock: lock_file,
addresses: Vec::new(),
seen: HashSet::new(),
sizes: HashMap::new(),
total_bytes: 0,
})
}
fn cleanup_stale(root: &Path) {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
if now == 0 {
warn!("System clock before Unix epoch, skipping spill cleanup");
return;
}
let entries = match std::fs::read_dir(root) {
Ok(entries) => entries,
Err(_) => return,
};
for entry in entries.flatten() {
let name = entry.file_name();
let name_str = name.to_string_lossy();
let suffix = match name_str.strip_prefix(SPILL_DIR_PREFIX) {
Some(s) => s,
None => continue,
};
let timestamp: u64 = match suffix.split('_').next().and_then(|s| s.parse().ok()) {
Some(ts) => ts,
None => continue,
};
if now.saturating_sub(timestamp) < SPILL_STALE_GRACE_SECS {
continue;
}
let file_type = match entry.file_type() {
Ok(ft) => ft,
Err(_) => continue,
};
if !file_type.is_dir() {
continue;
}
let path = entry.path();
let lock_path = path.join(SPILL_LOCK_NAME);
if let Ok(lock_file) = std::fs::File::open(&lock_path) {
use fs2::FileExt;
if lock_file.try_lock_exclusive().is_err() {
debug!("Skipping active spill dir: {}", path.display());
continue;
}
drop(lock_file);
}
info!("Cleaning up stale spill dir: {}", path.display());
if let Err(e) = std::fs::remove_dir_all(&path) {
warn!("Failed to clean up stale spill dir {}: {e}", path.display());
}
}
}
#[allow(dead_code)]
pub(crate) fn run_cleanup() {
if let Ok(root) = Self::spill_root() {
Self::cleanup_stale(&root);
}
}
fn push(&mut self, content: &[u8]) -> Result<()> {
let address = compute_address(content);
if !self.seen.insert(address) {
return Ok(());
}
let path = self.dir.join(hex::encode(address));
std::fs::write(&path, content)?;
let content_len = content.len() as u64;
self.sizes.insert(address, content_len);
self.total_bytes += content_len;
self.addresses.push(address);
Ok(())
}
fn len(&self) -> usize {
self.addresses.len()
}
fn total_bytes(&self) -> u64 {
self.total_bytes
}
fn chunk_entries(&self) -> Result<Vec<([u8; 32], u64)>> {
self.addresses
.iter()
.map(|address| {
self.sizes
.get(address)
.copied()
.map(|size| (*address, size))
.ok_or_else(|| {
Error::Storage(format!(
"missing size for spilled chunk {}",
hex::encode(address)
))
})
})
.collect()
}
fn read_chunk(&self, address: &[u8; 32]) -> Result<Bytes> {
let path = self.dir.join(hex::encode(address));
let data = std::fs::read(&path).map_err(|e| {
Error::Io(std::io::Error::new(
e.kind(),
format!("reading spilled chunk {}: {e}", hex::encode(address)),
))
})?;
Ok(Bytes::from(data))
}
fn read_wave(&self, wave_addrs: &[[u8; 32]]) -> Result<Vec<(Bytes, [u8; 32])>> {
let mut out = Vec::with_capacity(wave_addrs.len());
for addr in wave_addrs {
let content = self.read_chunk(addr)?;
out.push((content, *addr));
}
Ok(out)
}
fn cleanup(&self) {
if let Err(e) = std::fs::remove_dir_all(&self.dir) {
warn!(
"Failed to clean up chunk spill dir {}: {e}",
self.dir.display()
);
}
}
}
impl Drop for ChunkSpill {
fn drop(&mut self) {
self.cleanup();
}
}
fn cached_merkle_covers_addresses(
cached: &MerkleBatchPaymentResult,
addresses: &[[u8; 32]],
) -> bool {
addresses
.iter()
.all(|addr| cached.proofs.contains_key(addr))
}
fn partition_addresses_by_proof(
addresses: &[[u8; 32]],
proofs: &HashMap<[u8; 32], Vec<u8>>,
) -> (Vec<[u8; 32]>, Vec<[u8; 32]>) {
addresses
.iter()
.copied()
.partition(|addr| proofs.contains_key(addr))
}
fn check_disk_space_for_spill(file_size: u64) -> Result<()> {
let spill_root = ChunkSpill::spill_root()?;
std::fs::create_dir_all(&spill_root)?;
let available = fs2::available_space(&spill_root).map_err(|e| {
Error::Io(std::io::Error::new(
e.kind(),
format!(
"failed to query disk space on {}: {e}",
spill_root.display()
),
))
})?;
let headroom = file_size / DISK_SPACE_HEADROOM_PERCENT;
let required = file_size.saturating_add(headroom);
if available < required {
let avail_mb = available / (1024 * 1024);
let req_mb = required / (1024 * 1024);
return Err(Error::InsufficientDiskSpace(format!(
"need ~{req_mb} MB in spill dir ({}) but only {avail_mb} MB available",
spill_root.display()
)));
}
debug!(
"Disk space check passed: {available} bytes available, {required} bytes required (spill: {})",
spill_root.display()
);
Ok(())
}
fn usable_memory_bytes() -> Option<u64> {
let mut system = sysinfo::System::new();
system.refresh_memory();
let available_memory = system.available_memory();
let free_memory = system.free_memory();
let used_memory = system.used_memory();
let total_memory = system.total_memory();
let unused_memory = total_memory.saturating_sub(used_memory);
let mut usable = [available_memory, free_memory, unused_memory]
.into_iter()
.filter(|bytes| *bytes > 0)
.max();
let cgroup_free_memory = system
.cgroup_limits()
.filter(|limits| limits.total_memory > 0)
.map(|limits| limits.free_memory);
if let Some(cgroup_free_memory) = cgroup_free_memory {
usable = Some(usable.unwrap_or(u64::MAX).min(cgroup_free_memory));
}
debug!(
available_memory,
free_memory,
used_memory,
total_memory,
cgroup_free_memory,
usable_memory = ?usable,
"Detected usable memory for stream decrypt batch sizing"
);
usable
}
fn stream_decrypt_batch_memory_cap(usable_memory_bytes: u64) -> usize {
let budget = usable_memory_bytes / DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR;
let estimated_bytes_per_chunk = (self_encryption::MAX_CHUNK_SIZE as u64)
.saturating_mul(DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER)
.max(1);
let cap = (budget / estimated_bytes_per_chunk).max(1);
usize::try_from(cap).unwrap_or(usize::MAX)
}
fn adaptive_stream_decrypt_batch_size(
total_chunks: usize,
fetch_cap: usize,
configured_batch_floor: usize,
usable_memory_bytes: Option<u64>,
) -> usize {
let fetch_target = fetch_cap
.max(1)
.saturating_mul(DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER);
let requested = match usable_memory_bytes {
Some(bytes) => {
let memory_cap = stream_decrypt_batch_memory_cap(bytes);
configured_batch_floor
.max(fetch_target)
.max(1)
.min(memory_cap)
}
None => configured_batch_floor.max(1),
};
requested.min(total_chunks.max(1)).max(1)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum Visibility {
#[default]
Private,
Public,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct UploadCostEstimate {
pub file_size: u64,
pub chunk_count: usize,
pub storage_cost_atto: String,
pub estimated_gas_cost_wei: String,
pub payment_mode: PaymentMode,
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct FileUploadResult {
pub data_map: DataMap,
pub chunks_stored: usize,
pub chunks_failed: usize,
pub total_chunks: usize,
pub payment_mode_used: PaymentMode,
pub storage_cost_atto: String,
pub gas_cost_wei: u128,
pub data_map_address: Option<[u8; 32]>,
pub chunk_attempts_total: usize,
pub store_durations_ms: Vec<u64>,
pub retries_histogram: [usize; 4],
}
#[derive(Debug)]
pub enum ExternalPaymentInfo {
WaveBatch {
prepared_chunks: Vec<PreparedChunk>,
payment_intent: PaymentIntent,
},
Merkle {
prepared_batch: PreparedMerkleBatch,
chunk_contents: Vec<Bytes>,
chunk_addresses: Vec<[u8; 32]>,
},
}
#[derive(Debug)]
#[non_exhaustive]
pub struct PreparedUpload {
pub data_map: DataMap,
pub payment_info: ExternalPaymentInfo,
pub data_map_address: Option<[u8; 32]>,
pub already_stored_addresses: Vec<[u8; 32]>,
pub total_chunks: usize,
}
type EncryptionChannels = (
tokio::sync::mpsc::Receiver<Bytes>,
tokio::sync::oneshot::Receiver<DataMap>,
tokio::task::JoinHandle<Result<()>>,
);
fn spawn_file_encryption(path: PathBuf) -> Result<EncryptionChannels> {
let metadata = std::fs::metadata(&path)?;
let data_size = usize::try_from(metadata.len())
.map_err(|e| Error::Encryption(format!("file size exceeds platform usize: {e}")))?;
let (chunk_tx, chunk_rx) = tokio::sync::mpsc::channel(2);
let (datamap_tx, datamap_rx) = tokio::sync::oneshot::channel();
let handle = tokio::task::spawn_blocking(move || {
let file = std::fs::File::open(&path)?;
let mut reader = std::io::BufReader::new(file);
let read_error: Arc<Mutex<Option<std::io::Error>>> = Arc::new(Mutex::new(None));
let read_error_clone = Arc::clone(&read_error);
let data_iter = std::iter::from_fn(move || {
let mut buffer = vec![0u8; 8192];
match std::io::Read::read(&mut reader, &mut buffer) {
Ok(0) => None,
Ok(n) => {
buffer.truncate(n);
Some(Bytes::from(buffer))
}
Err(e) => {
let mut guard = read_error_clone
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
*guard = Some(e);
None
}
}
});
let mut stream = stream_encrypt(data_size, data_iter)
.map_err(|e| Error::Encryption(format!("stream_encrypt failed: {e}")))?;
for chunk_result in stream.chunks() {
{
let guard = read_error
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
if let Some(ref e) = *guard {
return Err(Error::Io(std::io::Error::new(e.kind(), e.to_string())));
}
}
let (_hash, content) = chunk_result
.map_err(|e| Error::Encryption(format!("chunk encryption failed: {e}")))?;
if chunk_tx.blocking_send(content).is_err() {
return Err(Error::Encryption("upload receiver dropped".to_string()));
}
}
{
let guard = read_error
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
if let Some(ref e) = *guard {
return Err(Error::Io(std::io::Error::new(e.kind(), e.to_string())));
}
}
let datamap = stream
.into_datamap()
.ok_or_else(|| Error::Encryption("no DataMap after encryption".to_string()))?;
if datamap_tx.send(datamap).is_err() {
warn!("DataMap receiver dropped — upload may have been cancelled");
}
Ok(())
});
Ok((chunk_rx, datamap_rx, handle))
}
impl Client {
pub async fn file_upload(&self, path: &Path) -> Result<FileUploadResult> {
self.file_upload_with_mode(path, PaymentMode::Auto).await
}
pub async fn estimate_upload_cost(
&self,
path: &Path,
mode: PaymentMode,
progress: Option<mpsc::Sender<UploadEvent>>,
) -> Result<UploadCostEstimate> {
let file_size = std::fs::metadata(path).map_err(Error::Io)?.len();
if file_size < 3 {
return Err(Error::InvalidData(
"File too small: self-encryption requires at least 3 bytes".into(),
));
}
check_disk_space_for_spill(file_size)?;
info!(
"Estimating upload cost for {} ({file_size} bytes)",
path.display()
);
let (spill, _data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
let chunk_count = spill.len();
if let Some(ref tx) = progress {
let _ = tx
.send(UploadEvent::Encrypted {
total_chunks: chunk_count,
})
.await;
}
info!("Encrypted into {chunk_count} chunks, requesting quote");
let sample_limit = spill.addresses.len().min(ESTIMATE_SAMPLE_CAP);
let mut sampled = 0usize;
let mut all_already_stored = true;
let mut quotes_opt: Option<Vec<QuoteEntry>> = None;
for addr in spill.addresses.iter().take(sample_limit) {
sampled += 1;
let chunk_bytes = spill.read_chunk(addr)?;
let data_size = u64::try_from(chunk_bytes.len())
.map_err(|e| Error::InvalidData(format!("chunk size too large: {e}")))?;
match self
.get_store_quotes(addr, data_size, DATA_TYPE_CHUNK)
.await
{
Ok(q) => {
quotes_opt = Some(q);
all_already_stored = false;
break;
}
Err(Error::AlreadyStored) => {
debug!(
"Sample chunk {} already stored; trying next address ({sampled}/{sample_limit})",
hex::encode(addr)
);
continue;
}
Err(e) => return Err(e),
}
}
let uses_merkle = should_use_merkle(chunk_count, mode);
let quotes = match quotes_opt {
Some(q) => q,
None if all_already_stored && sampled == chunk_count => {
info!("All {chunk_count} chunks already stored; returning zero-cost estimate");
return Ok(UploadCostEstimate {
file_size,
chunk_count,
storage_cost_atto: "0".into(),
estimated_gas_cost_wei: "0".into(),
payment_mode: if uses_merkle {
PaymentMode::Merkle
} else {
PaymentMode::Single
},
});
}
None => {
return Err(Error::CostEstimationInconclusive(format!(
"sampled {sampled} chunk addresses out of {chunk_count} and every \
one reported AlreadyStored; cannot infer a representative price \
for the remaining chunks"
)));
}
};
let mut prices: Vec<Amount> = quotes.iter().map(|(_, _, _, price)| *price).collect();
prices.sort();
let median_price = prices
.get(prices.len() / 2)
.copied()
.unwrap_or(Amount::ZERO);
let per_chunk_cost = median_price * Amount::from(3u64);
let chunk_count_u64 = u64::try_from(chunk_count).unwrap_or(u64::MAX);
let total_storage = per_chunk_cost * Amount::from(chunk_count_u64);
let waves = u128::try_from(chunk_count.div_ceil(UPLOAD_WAVE_SIZE)).unwrap_or(u128::MAX);
let merkle_batches = u128::try_from(chunk_count.div_ceil(MAX_LEAVES)).unwrap_or(u128::MAX);
let estimated_gas: u128 = if uses_merkle {
merkle_batches
.saturating_mul(GAS_PER_MERKLE_TX)
.saturating_mul(ARBITRUM_GAS_PRICE_WEI)
} else {
waves
.saturating_mul(GAS_PER_WAVE_TX)
.saturating_mul(ARBITRUM_GAS_PRICE_WEI)
};
info!(
"Estimate: {chunk_count} chunks, storage={total_storage} atto, gas~={estimated_gas} wei"
);
Ok(UploadCostEstimate {
file_size,
chunk_count,
storage_cost_atto: total_storage.to_string(),
estimated_gas_cost_wei: estimated_gas.to_string(),
payment_mode: if uses_merkle {
PaymentMode::Merkle
} else {
PaymentMode::Single
},
})
}
pub async fn file_prepare_upload(&self, path: &Path) -> Result<PreparedUpload> {
self.file_prepare_upload_with_progress(path, Visibility::Private, None)
.await
}
pub async fn file_prepare_upload_with_visibility(
&self,
path: &Path,
visibility: Visibility,
) -> Result<PreparedUpload> {
self.file_prepare_upload_with_progress(path, visibility, None)
.await
}
pub async fn file_prepare_upload_with_progress(
&self,
path: &Path,
visibility: Visibility,
progress: Option<mpsc::Sender<UploadEvent>>,
) -> Result<PreparedUpload> {
debug!(
"Preparing file upload for external signing (visibility={visibility:?}): {}",
path.display()
);
let file_size = std::fs::metadata(path)?.len();
check_disk_space_for_spill(file_size)?;
let (spill, data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
info!(
"Encrypted {} into {} chunks for external signing (spilled to disk)",
path.display(),
spill.len()
);
let mut chunk_data: Vec<Bytes> = spill
.addresses
.iter()
.map(|addr| spill.read_chunk(addr))
.collect::<std::result::Result<Vec<_>, _>>()?;
let data_map_address = match visibility {
Visibility::Private => None,
Visibility::Public => {
let serialized = rmp_serde::to_vec(&data_map).map_err(|e| {
Error::Serialization(format!("Failed to serialize DataMap: {e}"))
})?;
let bytes = Bytes::from(serialized);
let address = compute_address(&bytes);
info!(
"Public upload: bundling DataMap chunk ({} bytes) at address {}",
bytes.len(),
hex::encode(address)
);
chunk_data.push(bytes);
Some(address)
}
};
let chunk_count = chunk_data.len();
if let Some(ref tx) = progress {
let _ = tx
.send(UploadEvent::Encrypted {
total_chunks: chunk_count,
})
.await;
}
let (payment_info, already_stored_addresses) = if should_use_merkle(
chunk_count,
PaymentMode::Auto,
) {
info!("Using merkle batch preparation for {chunk_count} file chunks");
let chunk_entries: Vec<([u8; 32], u64)> = chunk_data
.iter()
.map(|chunk| {
let size = u64::try_from(chunk.len())
.map_err(|e| Error::InvalidData(format!("chunk size too large: {e}")))?;
Ok((compute_address(chunk), size))
})
.collect::<Result<Vec<_>>>()?;
let merkle_plan = self
.plan_merkle_upload(chunk_entries, DATA_TYPE_CHUNK, progress.as_ref())
.await?;
if merkle_plan.to_upload.is_empty() {
info!("All {chunk_count} file chunks already stored; no external payment needed");
(
ExternalPaymentInfo::WaveBatch {
prepared_chunks: Vec::new(),
payment_intent: PaymentIntent::from_prepared_chunks(&[]),
},
merkle_plan.already_stored,
)
} else {
let chunk_data =
chunk_contents_for_upload_addresses(chunk_data, &merkle_plan.to_upload)?;
if !should_use_merkle(merkle_plan.to_upload.len(), PaymentMode::Auto) {
info!(
"{} file chunks need upload after merkle preflight; preparing wave-batch payment",
merkle_plan.to_upload.len()
);
let (payment_info, mut wave_already_stored) = self
.prepare_wave_batch_external_chunks(
chunk_data,
progress.as_ref(),
chunk_count,
)
.await?;
let mut already_stored = merkle_plan.already_stored;
already_stored.append(&mut wave_already_stored);
(payment_info, already_stored)
} else {
match self
.prepare_merkle_batch_external(
&merkle_plan.to_upload,
DATA_TYPE_CHUNK,
merkle_plan.to_upload_avg_size(),
)
.await
{
Ok(prepared_batch) => {
info!(
"File prepared for external merkle signing: {} chunks, depth={} ({})",
merkle_plan.to_upload.len(),
prepared_batch.depth,
path.display()
);
(
ExternalPaymentInfo::Merkle {
prepared_batch,
chunk_contents: chunk_data,
chunk_addresses: merkle_plan.to_upload,
},
merkle_plan.already_stored,
)
}
Err(Error::InsufficientPeers(ref msg)) => {
info!(
"External merkle preparation needs more peers ({msg}); preparing wave-batch payment"
);
let (payment_info, mut wave_already_stored) = self
.prepare_wave_batch_external_chunks(
chunk_data,
progress.as_ref(),
chunk_count,
)
.await?;
let mut already_stored = merkle_plan.already_stored;
already_stored.append(&mut wave_already_stored);
(payment_info, already_stored)
}
Err(e) => return Err(e),
}
}
}
} else {
self.prepare_wave_batch_external_chunks(chunk_data, progress.as_ref(), chunk_count)
.await?
};
if let Some(addr) = data_map_address {
let data_map_needs_payment = match &payment_info {
ExternalPaymentInfo::WaveBatch {
prepared_chunks, ..
} => prepared_chunks.iter().any(|c| c.address == addr),
ExternalPaymentInfo::Merkle {
chunk_addresses, ..
} => chunk_addresses.contains(&addr),
};
if !data_map_needs_payment {
info!(
"Public upload: DataMap chunk {} was already stored \
on the network — address is retrievable without a \
new payment",
hex::encode(addr)
);
}
}
Ok(PreparedUpload {
data_map,
payment_info,
data_map_address,
already_stored_addresses,
total_chunks: chunk_count,
})
}
async fn prepare_wave_batch_external_chunks(
&self,
chunk_data: Vec<Bytes>,
progress: Option<&mpsc::Sender<UploadEvent>>,
progress_total: usize,
) -> Result<(ExternalPaymentInfo, Vec<[u8; 32]>)> {
let chunk_count = chunk_data.len();
let chunks_with_addr: Vec<(Bytes, [u8; 32])> = chunk_data
.into_iter()
.map(|content| {
let address = compute_address(&content);
(content, address)
})
.collect();
let quote_limiter = self.controller().quote.clone();
let quote_concurrency = quote_limiter.current().min(chunk_count.max(1));
let mut quote_stream = stream::iter(chunks_with_addr)
.map(|(content, address)| {
let limiter = quote_limiter.clone();
async move {
let result = observe_op(
&limiter,
|| async move { self.prepare_chunk_payment(content).await },
classify_error,
)
.await;
(address, result)
}
})
.buffer_unordered(quote_concurrency);
let mut prepared_chunks = Vec::with_capacity(chunk_count);
let mut already_stored = Vec::new();
let mut quoted = 0usize;
while let Some((address, result)) = quote_stream.next().await {
match result? {
Some(prepared) => prepared_chunks.push(prepared),
None => already_stored.push(address),
}
quoted += 1;
if let Some(tx) = progress {
let _ = tx.try_send(UploadEvent::ChunkQuoted {
quoted,
total: progress_total,
});
}
}
let payment_intent = PaymentIntent::from_prepared_chunks(&prepared_chunks);
info!(
"Prepared external wave-batch payment: {} chunks, {} already stored, total {} atto",
prepared_chunks.len(),
already_stored.len(),
payment_intent.total_amount,
);
Ok((
ExternalPaymentInfo::WaveBatch {
prepared_chunks,
payment_intent,
},
already_stored,
))
}
pub async fn finalize_upload(
&self,
prepared: PreparedUpload,
tx_hash_map: &HashMap<QuoteHash, TxHash>,
) -> Result<FileUploadResult> {
self.finalize_upload_with_progress(prepared, tx_hash_map, None)
.await
}
pub async fn finalize_upload_with_progress(
&self,
prepared: PreparedUpload,
tx_hash_map: &HashMap<QuoteHash, TxHash>,
progress: Option<mpsc::Sender<UploadEvent>>,
) -> Result<FileUploadResult> {
let data_map_address = prepared.data_map_address;
let already_stored_addresses = prepared.already_stored_addresses;
let already_stored_count = already_stored_addresses.len();
let total_chunks = prepared.total_chunks;
match prepared.payment_info {
ExternalPaymentInfo::WaveBatch {
prepared_chunks,
payment_intent: _,
} => {
let paid_chunks = finalize_batch_payment(prepared_chunks, tx_hash_map)?;
let wave_result = self
.store_paid_chunks_with_events(
paid_chunks,
progress.as_ref(),
already_stored_count,
total_chunks,
)
.await;
if !wave_result.failed.is_empty() {
let failed_count = wave_result.failed.len();
let stored_count = already_stored_count + wave_result.stored.len();
let mut stored = already_stored_addresses;
stored.extend(wave_result.stored);
return Err(Error::PartialUpload {
stored,
stored_count,
failed: wave_result.failed,
failed_count,
total_chunks,
reason: "finalize_upload: chunk storage failed after retries".into(),
});
}
let chunks_stored = already_stored_count + wave_result.stored.len();
info!("External-signer upload finalized: {chunks_stored} chunks stored");
let mut stats = WaveAggregateStats::default();
stats.absorb(&wave_result);
Ok(FileUploadResult {
data_map: prepared.data_map,
chunks_stored,
chunks_failed: 0,
total_chunks,
payment_mode_used: PaymentMode::Single,
storage_cost_atto: "0".into(),
gas_cost_wei: 0,
data_map_address,
chunk_attempts_total: stats.chunk_attempts_total,
store_durations_ms: stats.store_durations_ms,
retries_histogram: stats.retries_histogram,
})
}
ExternalPaymentInfo::Merkle { .. } => Err(Error::Payment(
"Cannot finalize merkle upload with wave-batch tx hashes. \
Use finalize_upload_merkle() instead."
.to_string(),
)),
}
}
pub async fn finalize_upload_merkle(
&self,
prepared: PreparedUpload,
winner_pool_hash: [u8; 32],
) -> Result<FileUploadResult> {
self.finalize_upload_merkle_with_progress(prepared, winner_pool_hash, None)
.await
}
pub async fn finalize_upload_merkle_with_progress(
&self,
prepared: PreparedUpload,
winner_pool_hash: [u8; 32],
progress: Option<mpsc::Sender<UploadEvent>>,
) -> Result<FileUploadResult> {
let data_map_address = prepared.data_map_address;
let already_stored_count = prepared.already_stored_addresses.len();
let total_chunks = prepared.total_chunks;
match prepared.payment_info {
ExternalPaymentInfo::Merkle {
prepared_batch,
chunk_contents,
chunk_addresses,
} => {
let batch_result = finalize_merkle_batch(prepared_batch, winner_pool_hash)?;
let outcome = self
.merkle_upload_chunks(
chunk_contents,
chunk_addresses,
&batch_result,
progress.as_ref(),
already_stored_count,
total_chunks,
)
.await?;
info!(
"External-signer merkle upload finalized: {} chunks stored, {} failed",
outcome.stored, outcome.failed
);
Ok(FileUploadResult {
data_map: prepared.data_map,
chunks_stored: outcome.stored,
chunks_failed: outcome.failed,
total_chunks,
payment_mode_used: PaymentMode::Merkle,
storage_cost_atto: "0".into(),
gas_cost_wei: 0,
data_map_address,
chunk_attempts_total: outcome.stats.chunk_attempts_total,
store_durations_ms: outcome.stats.store_durations_ms,
retries_histogram: outcome.stats.retries_histogram,
})
}
ExternalPaymentInfo::WaveBatch { .. } => Err(Error::Payment(
"Cannot finalize wave-batch upload with merkle winner hash. \
Use finalize_upload() instead."
.to_string(),
)),
}
}
#[allow(clippy::too_many_lines)]
pub async fn file_upload_with_mode(
&self,
path: &Path,
mode: PaymentMode,
) -> Result<FileUploadResult> {
self.file_upload_with_progress(path, mode, None).await
}
#[allow(clippy::too_many_lines)]
pub async fn file_upload_with_progress(
&self,
path: &Path,
mode: PaymentMode,
progress: Option<mpsc::Sender<UploadEvent>>,
) -> Result<FileUploadResult> {
debug!(
"Streaming file upload with mode {mode:?}: {}",
path.display()
);
let file_size = std::fs::metadata(path)?.len();
check_disk_space_for_spill(file_size)?;
let (spill, data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
let chunk_count = spill.len();
info!(
"Encrypted {} into {chunk_count} chunks (spilled to disk)",
path.display()
);
if let Some(ref tx) = progress {
let _ = tx
.send(UploadEvent::Encrypted {
total_chunks: chunk_count,
})
.await;
}
let file_path_key = std::fs::canonicalize(path)
.map(|p| p.display().to_string())
.unwrap_or_else(|_| path.display().to_string());
let (chunks_stored, actual_mode, storage_cost_atto, gas_cost_wei, stats) = if self
.should_use_merkle(chunk_count, mode)
{
info!("Using merkle batch payment for {chunk_count} file chunks");
let cached_merkle =
crate::data::client::cached_merkle::try_load_for_file(&file_path_key)
.map(|(_cache_path, cached)| cached);
let merkle_plan = match self
.plan_merkle_upload(spill.chunk_entries()?, DATA_TYPE_CHUNK, progress.as_ref())
.await
{
Ok(plan) => plan,
Err(e) => {
if let Some(cached) = cached_merkle
.as_ref()
.filter(|cached| cached_merkle_covers_addresses(cached, &spill.addresses))
{
info!(
"Merkle preflight failed ({e}); \
resuming with cached merkle proofs"
);
let (stored, sc, gc, stats) = self
.upload_waves_merkle(
&spill,
&spill.addresses,
cached,
&[],
progress.as_ref(),
)
.await?;
crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
return Ok(FileUploadResult {
data_map,
chunks_stored: stored,
chunks_failed: 0,
total_chunks: chunk_count,
payment_mode_used: PaymentMode::Merkle,
storage_cost_atto: sc,
gas_cost_wei: gc,
data_map_address: None,
chunk_attempts_total: stats.chunk_attempts_total,
store_durations_ms: stats.store_durations_ms,
retries_histogram: stats.retries_histogram,
});
}
match &e {
Error::InsufficientPeers(msg) if mode == PaymentMode::Auto => {
info!(
"Merkle preflight needs more peers ({msg}), \
falling back to wave-batch"
);
let (stored, sc, gc, fb_stats) = self
.upload_waves_single(
&spill,
progress.as_ref(),
Some(&file_path_key),
)
.await?;
crate::data::client::cached_single::try_delete_for_file(&file_path_key);
return Ok(FileUploadResult {
data_map,
chunks_stored: stored,
chunks_failed: 0,
total_chunks: chunk_count,
payment_mode_used: PaymentMode::Single,
storage_cost_atto: sc,
gas_cost_wei: gc,
data_map_address: None,
chunk_attempts_total: fb_stats.chunk_attempts_total,
store_durations_ms: fb_stats.store_durations_ms,
retries_histogram: fb_stats.retries_histogram,
});
}
_ => return Err(e),
}
}
};
if merkle_plan.to_upload.is_empty() {
info!("All {chunk_count} merkle chunks already stored; skipping payment");
crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
crate::data::client::cached_single::try_delete_for_file(&file_path_key);
(
chunk_count,
PaymentMode::Merkle,
"0".to_string(),
0,
WaveAggregateStats::default(),
)
} else if !self.should_use_merkle(merkle_plan.to_upload.len(), mode) {
let remaining_chunks = merkle_plan.to_upload.len();
if let Some(cached) = cached_merkle
.as_ref()
.filter(|cached| cached_merkle_covers_addresses(cached, &merkle_plan.to_upload))
{
info!(
"{remaining_chunks} chunks remain below merkle threshold; \
reusing cached merkle proofs"
);
let (stored, sc, gc, stats) = self
.upload_waves_merkle(
&spill,
&merkle_plan.to_upload,
cached,
&merkle_plan.already_stored,
progress.as_ref(),
)
.await?;
crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
(stored, PaymentMode::Merkle, sc, gc, stats)
} else {
if cached_merkle.is_some() {
info!(
"{remaining_chunks} chunks remain below merkle threshold, \
and the cached merkle receipt does not cover them. \
Discarding cache and using single-node payment."
);
crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
} else {
info!(
"{remaining_chunks} chunks need upload after merkle preflight; \
using single-node payment"
);
}
let (stored, sc, gc, stats) = self
.upload_spill_addresses_single(
&spill,
&merkle_plan.to_upload,
progress.as_ref(),
merkle_plan.already_stored.len(),
chunk_count,
Some(&file_path_key),
)
.await?;
crate::data::client::cached_single::try_delete_for_file(&file_path_key);
(stored, PaymentMode::Single, sc, gc, stats)
}
} else {
let batch_result = if let Some(cached) = cached_merkle.as_ref() {
if cached_merkle_covers_addresses(cached, &merkle_plan.to_upload) {
info!(
"Skipping merkle payment phase; resuming with \
cached proofs for {} remaining chunks",
merkle_plan.to_upload.len()
);
Ok(cached.clone())
} else {
info!(
"Cached merkle receipt does not cover the current \
remaining chunks (cached={}, remaining={}). \
Discarding cache and paying fresh.",
cached.proofs.len(),
merkle_plan.to_upload.len()
);
crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
self.pay_for_merkle_batch(
&merkle_plan.to_upload,
DATA_TYPE_CHUNK,
merkle_plan.to_upload_avg_size(),
)
.await
.inspect(|result| {
crate::data::client::cached_merkle::try_save(&file_path_key, result);
})
}
} else {
self.pay_for_merkle_batch(
&merkle_plan.to_upload,
DATA_TYPE_CHUNK,
merkle_plan.to_upload_avg_size(),
)
.await
.inspect(|result| {
crate::data::client::cached_merkle::try_save(&file_path_key, result);
})
};
let batch_result = match batch_result {
Ok(result) => result,
Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => {
info!("Merkle needs more peers ({msg}), falling back to wave-batch");
let (stored, sc, gc, fb_stats) = self
.upload_spill_addresses_single(
&spill,
&merkle_plan.to_upload,
progress.as_ref(),
merkle_plan.already_stored.len(),
chunk_count,
Some(&file_path_key),
)
.await?;
crate::data::client::cached_single::try_delete_for_file(&file_path_key);
return Ok(FileUploadResult {
data_map,
chunks_stored: stored,
chunks_failed: 0,
total_chunks: chunk_count,
payment_mode_used: PaymentMode::Single,
storage_cost_atto: sc,
gas_cost_wei: gc,
data_map_address: None,
chunk_attempts_total: fb_stats.chunk_attempts_total,
store_durations_ms: fb_stats.store_durations_ms,
retries_histogram: fb_stats.retries_histogram,
});
}
Err(e) => return Err(e),
};
let (stored, sc, gc, stats) = self
.upload_waves_merkle(
&spill,
&merkle_plan.to_upload,
&batch_result,
&merkle_plan.already_stored,
progress.as_ref(),
)
.await?;
crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
(stored, PaymentMode::Merkle, sc, gc, stats)
}
} else {
let (stored, sc, gc, stats) = self
.upload_waves_single(&spill, progress.as_ref(), Some(&file_path_key))
.await?;
crate::data::client::cached_single::try_delete_for_file(&file_path_key);
(stored, PaymentMode::Single, sc, gc, stats)
};
info!(
"File uploaded with {actual_mode:?}: {chunks_stored} chunks stored ({})",
path.display()
);
Ok(FileUploadResult {
data_map,
chunks_stored,
chunks_failed: 0,
total_chunks: chunk_count,
payment_mode_used: actual_mode,
storage_cost_atto,
gas_cost_wei,
data_map_address: None,
chunk_attempts_total: stats.chunk_attempts_total,
store_durations_ms: stats.store_durations_ms,
retries_histogram: stats.retries_histogram,
})
}
async fn encrypt_file_to_spill(
&self,
path: &Path,
progress: Option<&mpsc::Sender<UploadEvent>>,
) -> Result<(ChunkSpill, DataMap)> {
let (mut chunk_rx, datamap_rx, handle) = spawn_file_encryption(path.to_path_buf())?;
let mut spill = ChunkSpill::new()?;
while let Some(content) = chunk_rx.recv().await {
spill.push(&content)?;
let chunks_done = spill.len();
if let Some(tx) = progress {
if chunks_done.is_multiple_of(10) {
let _ = tx.send(UploadEvent::Encrypting { chunks_done }).await;
}
}
if chunks_done % 100 == 0 {
let mb = spill.total_bytes() / (1024 * 1024);
info!(
"Encryption progress: {chunks_done} chunks spilled ({mb} MB) — {}",
path.display()
);
}
}
handle
.await
.map_err(|e| Error::Encryption(format!("encryption task panicked: {e}")))?
.map_err(|e| Error::Encryption(format!("encryption failed: {e}")))?;
let data_map = datamap_rx
.await
.map_err(|_| Error::Encryption("no DataMap from encryption thread".to_string()))?;
Ok((spill, data_map))
}
async fn upload_waves_single(
&self,
spill: &ChunkSpill,
progress: Option<&mpsc::Sender<UploadEvent>>,
resume_key: Option<&str>,
) -> Result<(usize, String, u128, WaveAggregateStats)> {
self.upload_spill_addresses_single(
spill,
&spill.addresses,
progress,
0,
spill.len(),
resume_key,
)
.await
}
async fn upload_spill_addresses_single(
&self,
spill: &ChunkSpill,
addresses: &[[u8; 32]],
progress: Option<&mpsc::Sender<UploadEvent>>,
stored_offset: usize,
total_chunks: usize,
resume_key: Option<&str>,
) -> Result<(usize, String, u128, WaveAggregateStats)> {
let mut total_stored = stored_offset;
let mut total_storage = Amount::ZERO;
let mut total_gas: u128 = 0;
let mut agg_stats = WaveAggregateStats::default();
let waves: Vec<&[[u8; 32]]> = addresses.chunks(UPLOAD_WAVE_SIZE).collect();
let wave_count = waves.len();
for (wave_idx, wave_addrs) in waves.into_iter().enumerate() {
let wave_num = wave_idx + 1;
let wave_data: Vec<Bytes> = wave_addrs
.iter()
.map(|addr| spill.read_chunk(addr))
.collect::<Result<Vec<_>>>()?;
info!(
"Wave {wave_num}/{wave_count}: quoting {} chunks — {total_stored}/{total_chunks} stored so far",
wave_data.len()
);
if let Some(tx) = progress {
let _ = tx
.send(UploadEvent::QuotingChunks {
wave: wave_num,
total_waves: wave_count,
chunks_in_wave: wave_data.len(),
})
.await;
}
let (addresses, wave_storage, wave_gas, wave_stats) = self
.batch_upload_chunks_with_events(
wave_data,
progress,
total_stored,
total_chunks,
resume_key,
)
.await?;
total_stored += addresses.len();
if let Ok(cost) = wave_storage.parse::<Amount>() {
total_storage += cost;
}
total_gas = total_gas.saturating_add(wave_gas);
agg_stats.chunk_attempts_total = agg_stats
.chunk_attempts_total
.saturating_add(wave_stats.chunk_attempts_total);
agg_stats
.store_durations_ms
.extend(wave_stats.store_durations_ms);
for (slot, count) in agg_stats
.retries_histogram
.iter_mut()
.zip(wave_stats.retries_histogram.iter())
{
*slot = slot.saturating_add(*count);
}
if let Some(tx) = progress {
let _ = tx
.send(UploadEvent::WaveComplete {
wave: wave_num,
total_waves: wave_count,
stored_so_far: total_stored,
total: total_chunks,
})
.await;
}
}
Ok((
total_stored,
total_storage.to_string(),
total_gas,
agg_stats,
))
}
async fn upload_waves_merkle(
&self,
spill: &ChunkSpill,
addresses: &[[u8; 32]],
batch_result: &MerkleBatchPaymentResult,
already_stored_addresses: &[[u8; 32]],
progress: Option<&mpsc::Sender<UploadEvent>>,
) -> Result<(usize, String, u128, WaveAggregateStats)> {
let mut total_stored = already_stored_addresses.len();
let total_chunks = total_stored + addresses.len();
let mut stored_addresses: Vec<[u8; 32]> = already_stored_addresses.to_vec();
let mut failed: Vec<([u8; 32], String)> = Vec::new();
let mut agg_stats = WaveAggregateStats::default();
let (to_store, missing_proof) =
partition_addresses_by_proof(addresses, &batch_result.proofs);
if !missing_proof.is_empty() {
warn!(
"{} chunk(s) lack a merkle proof (partial payment); reporting them as failed",
missing_proof.len()
);
for addr in &missing_proof {
failed.push((
*addr,
format!("Missing merkle proof for chunk {}", hex::encode(addr)),
));
}
}
let waves: Vec<&[[u8; 32]]> = to_store.chunks(UPLOAD_WAVE_SIZE).collect();
let wave_count = waves.len();
let store_limiter = self.controller().store.clone();
let store_one = |addr: [u8; 32], content: Bytes| {
let limiter = store_limiter.clone();
let proof_bytes = batch_result.proofs.get(&addr).cloned();
async move {
let started = std::time::Instant::now();
let proof = proof_bytes.ok_or_else(|| {
Error::Payment(format!(
"Missing merkle proof for chunk {}",
hex::encode(addr)
))
})?;
let peers = self.close_group_peers(&addr).await?;
observe_op(
&limiter,
|| async move { self.chunk_put_to_close_group(content, proof, &peers).await },
classify_error,
)
.await
.map(|_| started)
}
};
for (wave_idx, wave_addrs) in waves.into_iter().enumerate() {
let wave_num = wave_idx + 1;
let wave = spill.read_wave(wave_addrs)?;
info!(
"Wave {wave_num}/{wave_count}: storing {} chunks (merkle) — {total_stored}/{total_chunks} stored so far",
wave.len()
);
let store_concurrency = store_limiter.current().min(wave.len().max(1));
let chunks: Vec<([u8; 32], Bytes)> = wave
.into_iter()
.map(|(content, addr)| (addr, content))
.collect();
let outcome = match merkle_store_with_retry(
chunks,
store_concurrency,
MERKLE_STORE_MAX_ATTEMPTS,
MERKLE_RETRY_BACKOFF,
progress,
total_stored,
total_chunks,
&store_one,
)
.await
{
Ok(outcome) => outcome,
Err(e) => {
warn!("merkle wave {wave_num}/{wave_count} aborted: {e}");
let failed_count = failed.len();
return Err(Error::PartialUpload {
stored: stored_addresses,
stored_count: total_stored,
failed,
failed_count,
total_chunks,
reason: format!("merkle chunk store aborted: {e}"),
});
}
};
let wave_failed: HashSet<[u8; 32]> = outcome
.failed_addresses
.iter()
.map(|(addr, _)| *addr)
.collect();
for addr in wave_addrs {
if !wave_failed.contains(addr) {
stored_addresses.push(*addr);
}
}
failed.extend(outcome.failed_addresses);
total_stored = outcome.stored;
agg_stats.chunk_attempts_total = agg_stats
.chunk_attempts_total
.saturating_add(outcome.stats.chunk_attempts_total);
agg_stats
.store_durations_ms
.extend(outcome.stats.store_durations_ms);
for (slot, count) in agg_stats
.retries_histogram
.iter_mut()
.zip(outcome.stats.retries_histogram.iter())
{
*slot = slot.saturating_add(*count);
}
if let Some(tx) = progress {
let _ = tx
.send(UploadEvent::WaveComplete {
wave: wave_num,
total_waves: wave_count,
stored_so_far: total_stored,
total: total_chunks,
})
.await;
}
}
if !failed.is_empty() {
let failed_count = failed.len();
warn!(
"merkle upload incomplete: {failed_count}/{total_chunks} chunks short of quorum after retries"
);
return Err(Error::PartialUpload {
stored: stored_addresses,
stored_count: total_stored,
failed,
failed_count,
total_chunks,
reason: format!(
"{failed_count} chunk(s) short of quorum after {MERKLE_STORE_MAX_ATTEMPTS} attempts"
),
});
}
Ok((
total_stored,
batch_result.storage_cost_atto.clone(),
batch_result.gas_cost_wei,
agg_stats,
))
}
#[allow(clippy::unused_async)]
pub async fn file_download(&self, data_map: &DataMap, output: &Path) -> Result<u64> {
self.file_download_with_progress(data_map, output, None)
.await
}
#[allow(clippy::unused_async)]
pub async fn file_download_with_progress(
&self,
data_map: &DataMap,
output: &Path,
progress: Option<mpsc::Sender<DownloadEvent>>,
) -> Result<u64> {
debug!("Downloading file to {}", output.display());
let handle = Handle::current();
let root_map = if data_map.is_child() {
let dm_chunks = data_map.len();
if let Some(ref tx) = progress {
let _ = tx.try_send(DownloadEvent::ResolvingDataMap {
total_map_chunks: dm_chunks,
});
}
let resolve_progress = progress.clone();
let resolve_counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let resolved = tokio::task::block_in_place(|| {
let counter_ref = resolve_counter.clone();
let progress_ref = resolve_progress.clone();
let fetch_limiter = self.controller().fetch.clone();
let fetch = |batch: &[(usize, XorName)]| {
let batch_owned: Vec<(usize, XorName)> = batch.to_vec();
let counter = counter_ref.clone();
let prog = progress_ref.clone();
let limiter = fetch_limiter.clone();
handle.block_on(async {
let mut results = rebucketed_unordered(
&limiter,
batch_owned,
|(idx, hash): (usize, XorName)| {
let counter = counter.clone();
let prog = prog.clone();
async move {
let addr = hash.0;
let chunk = self
.chunk_get_observed(&addr)
.await
.map_err(|e| {
self_encryption::Error::Generic(format!(
"DataMap resolution failed: {e}"
))
})?
.ok_or_else(|| {
self_encryption::Error::Generic(format!(
"DataMap chunk not found: {}",
hex::encode(addr)
))
})?;
let fetched = counter
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
+ 1;
if let Some(ref tx) = prog {
let _ =
tx.try_send(DownloadEvent::MapChunkFetched { fetched });
}
Ok::<_, self_encryption::Error>((idx, chunk.content))
}
},
)
.await?;
results.sort_by_key(|(idx, _)| *idx);
Ok(results)
})
};
get_root_data_map_parallel(data_map.clone(), &fetch)
})
.map_err(|e| Error::Encryption(format!("DataMap resolution failed: {e}")))?;
info!(
"Resolved hierarchical DataMap: {} data chunks",
resolved.len()
);
resolved
} else {
data_map.clone()
};
let total_chunks = root_map.len();
if let Some(ref tx) = progress {
let _ = tx.try_send(DownloadEvent::DataMapResolved { total_chunks });
}
let fetched_counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let fetched_for_closure = fetched_counter.clone();
let progress_for_closure = progress.clone();
let fetch_limiter_outer = self.controller().fetch.clone();
let usable_memory = usable_memory_bytes();
let configured_batch_floor = stream_decrypt_batch_size();
let fetch_cap = fetch_limiter_outer.current();
let decrypt_batch_size = adaptive_stream_decrypt_batch_size(
total_chunks,
fetch_cap,
configured_batch_floor,
usable_memory,
);
info!(
total_chunks,
fetch_cap,
configured_batch_floor,
?usable_memory,
decrypt_batch_size,
"Selected adaptive stream decrypt batch size"
);
let stream = streaming_decrypt_with_batch_size(
&root_map,
|batch: &[(usize, XorName)]| {
let batch_owned: Vec<(usize, XorName)> = batch.to_vec();
let fetched_ref = fetched_for_closure.clone();
let progress_ref = progress_for_closure.clone();
let fetch_limiter = fetch_limiter_outer.clone();
tokio::task::block_in_place(|| {
handle.block_on(async {
type BatchEntry =
(usize, std::result::Result<bytes::Bytes, XorName>);
let raw: Vec<BatchEntry> = rebucketed_unordered(
&fetch_limiter,
batch_owned,
|(idx, hash): (usize, XorName)| {
let fetched_ref = fetched_ref.clone();
let progress_ref = progress_ref.clone();
async move {
let addr = hash.0;
let addr_hex = hex::encode(addr);
match self.chunk_get_observed(&addr).await {
Ok(Some(chunk)) => {
let fetched = fetched_ref.fetch_add(
1,
std::sync::atomic::Ordering::Relaxed,
) + 1;
info!("Downloaded {fetched}/{total_chunks}");
if let Some(ref tx) = progress_ref {
let _ = tx.try_send(
DownloadEvent::ChunksFetched {
fetched,
total: total_chunks,
},
);
}
Ok::<BatchEntry, self_encryption::Error>((
idx,
Ok(chunk.content),
))
}
Ok(None) => Ok((idx, Err(hash))),
Err(e) => {
info!(
"First-pass fetch error for {addr_hex}: {e}; deferring"
);
Ok((idx, Err(hash)))
}
}
}
},
)
.await?;
let mut results: Vec<(usize, bytes::Bytes)> = Vec::new();
let mut deferred: Vec<(usize, XorName)> = Vec::new();
for (idx, inner) in raw {
match inner {
Ok(bytes) => results.push((idx, bytes)),
Err(hash) => deferred.push((idx, hash)),
}
}
if !deferred.is_empty() {
const DEFERRED_ROUND_DELAYS_SECS: [u64; 3] = [0, 15, 45];
info!(
"Deferring {} chunk(s) for concurrent retry after batch settles",
deferred.len()
);
let mut remaining = deferred;
for (round, &delay_secs) in DEFERRED_ROUND_DELAYS_SECS
.iter()
.enumerate()
{
if remaining.is_empty() {
break;
}
if delay_secs > 0 {
tokio::time::sleep(std::time::Duration::from_secs(
delay_secs,
))
.await;
}
info!(
"Deferred retry round {}/{}: {} chunk(s)",
round + 1,
DEFERRED_ROUND_DELAYS_SECS.len(),
remaining.len(),
);
let round_input = std::mem::take(&mut remaining);
let round_results: Vec<BatchEntry> = rebucketed_unordered(
&fetch_limiter,
round_input,
|(idx, hash): (usize, XorName)| {
let fetched_ref = fetched_ref.clone();
let progress_ref = progress_ref.clone();
async move {
let addr = hash.0;
match self.chunk_get_observed(&addr).await {
Ok(Some(chunk)) => {
let fetched = fetched_ref.fetch_add(
1,
std::sync::atomic::Ordering::Relaxed,
) + 1;
info!(
"Downloaded {fetched}/{total_chunks} (deferred retry)"
);
if let Some(ref tx) = progress_ref {
let _ = tx.try_send(
DownloadEvent::ChunksFetched {
fetched,
total: total_chunks,
},
);
}
Ok::<BatchEntry, self_encryption::Error>((
idx,
Ok(chunk.content),
))
}
Ok(None) => Ok((idx, Err(hash))),
Err(e) => {
info!(
"Deferred retry for {} hit transient error: {e}; re-deferring",
hex::encode(addr)
);
Ok((idx, Err(hash)))
}
}
}
},
)
.await?;
for (idx, inner) in round_results {
match inner {
Ok(bytes) => results.push((idx, bytes)),
Err(hash) => remaining.push((idx, hash)),
}
}
}
if let Some((_, hash)) = remaining.first() {
return Err(self_encryption::Error::Generic(format!(
"Chunk not found after {} deferred retry rounds: {}",
DEFERRED_ROUND_DELAYS_SECS.len(),
hex::encode(hash.0),
)));
}
}
results.sort_by_key(|(idx, _)| *idx);
Ok(results)
})
})
},
decrypt_batch_size,
)
.map_err(|e| Error::Encryption(format!("streaming decrypt failed: {e}")))?;
let parent = output.parent().unwrap_or_else(|| Path::new("."));
let unique: u64 = rand::random();
let tmp_path = parent.join(format!(".ant_download_{}_{unique}.tmp", std::process::id()));
let write_result = (|| -> Result<u64> {
let mut file = std::fs::File::create(&tmp_path)?;
let mut bytes_written = 0u64;
for chunk_result in stream {
let chunk_bytes = chunk_result
.map_err(|e| Error::Encryption(format!("decryption failed: {e}")))?;
file.write_all(&chunk_bytes)?;
bytes_written += chunk_bytes.len() as u64;
}
file.flush()?;
Ok(bytes_written)
})();
match write_result {
Ok(bytes_written) => match std::fs::rename(&tmp_path, output) {
Ok(()) => {
info!(
"File downloaded: {bytes_written} bytes written to {}",
output.display()
);
Ok(bytes_written)
}
Err(rename_err) => {
if let Err(cleanup_err) = std::fs::remove_file(&tmp_path) {
warn!(
"Failed to remove temp download file {}: {cleanup_err}",
tmp_path.display()
);
}
Err(rename_err.into())
}
},
Err(e) => {
if let Err(cleanup_err) = std::fs::remove_file(&tmp_path) {
warn!(
"Failed to remove temp download file {}: {cleanup_err}",
tmp_path.display()
);
}
Err(e)
}
}
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
#[test]
fn disk_space_check_passes_for_small_file() {
check_disk_space_for_spill(1024).unwrap();
}
#[test]
fn disk_space_check_fails_for_absurd_size() {
let result = check_disk_space_for_spill(u64::MAX / 2);
assert!(result.is_err());
let err = result.unwrap_err();
assert!(
matches!(err, Error::InsufficientDiskSpace(_)),
"expected InsufficientDiskSpace, got: {err}"
);
}
#[test]
fn adaptive_stream_decrypt_batch_size_tracks_fetch_headroom() {
let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, Some(u64::MAX));
assert_eq!(batch_size, 64 * DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER);
}
#[test]
fn adaptive_stream_decrypt_batch_size_caps_to_total_chunks() {
let batch_size = adaptive_stream_decrypt_batch_size(12, 64, 10, Some(u64::MAX));
assert_eq!(batch_size, 12);
}
#[test]
fn adaptive_stream_decrypt_batch_size_honours_configured_floor() {
let batch_size = adaptive_stream_decrypt_batch_size(1_000, 1, 32, None);
assert_eq!(batch_size, 32);
}
#[test]
fn adaptive_stream_decrypt_batch_size_does_not_expand_without_memory_reading() {
let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, None);
assert_eq!(batch_size, 10);
}
#[test]
fn adaptive_stream_decrypt_batch_size_caps_to_memory_budget() {
let estimated_bytes_per_chunk = (self_encryption::MAX_CHUNK_SIZE as u64)
.saturating_mul(DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER)
.max(1);
let usable_memory = estimated_bytes_per_chunk
.saturating_mul(16)
.saturating_mul(DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR);
let batch_size = adaptive_stream_decrypt_batch_size(1_000, 256, 10, Some(usable_memory));
assert_eq!(batch_size, 16);
}
#[test]
fn adaptive_stream_decrypt_batch_size_keeps_one_chunk_when_memory_is_tight() {
let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, Some(1));
assert_eq!(batch_size, 1);
}
#[test]
fn cached_merkle_covers_only_when_all_addresses_have_proofs() {
let covered = compute_address(&Bytes::from_static(b"covered"));
let extra = compute_address(&Bytes::from_static(b"extra"));
let missing = compute_address(&Bytes::from_static(b"missing"));
let cached = MerkleBatchPaymentResult {
proofs: HashMap::from([(covered, vec![1]), (extra, vec![2])]),
chunk_count: 2,
storage_cost_atto: "0".to_string(),
gas_cost_wei: 0,
merkle_payment_timestamp: 0,
};
assert!(cached_merkle_covers_addresses(&cached, &[covered]));
assert!(cached_merkle_covers_addresses(&cached, &[covered, extra]));
assert!(!cached_merkle_covers_addresses(
&cached,
&[covered, missing]
));
}
#[test]
fn partition_addresses_by_proof_splits_paid_and_unpaid() {
let paid_a = [1u8; 32];
let unpaid_b = [2u8; 32];
let paid_c = [3u8; 32];
let unpaid_d = [4u8; 32];
let proofs: HashMap<[u8; 32], Vec<u8>> =
HashMap::from([(paid_a, vec![0xaa]), (paid_c, vec![0xcc])]);
let (to_store, missing) =
partition_addresses_by_proof(&[paid_a, unpaid_b, paid_c, unpaid_d], &proofs);
assert_eq!(to_store, vec![paid_a, paid_c]);
assert_eq!(missing, vec![unpaid_b, unpaid_d]);
}
#[test]
fn partition_addresses_by_proof_handles_all_or_nothing() {
let a = [5u8; 32];
let b = [6u8; 32];
let empty: HashMap<[u8; 32], Vec<u8>> = HashMap::new();
let (to_store, missing) = partition_addresses_by_proof(&[a, b], &empty);
assert!(to_store.is_empty());
assert_eq!(missing, vec![a, b]);
let full: HashMap<[u8; 32], Vec<u8>> = HashMap::from([(a, vec![1]), (b, vec![2])]);
let (to_store, missing) = partition_addresses_by_proof(&[a, b], &full);
assert_eq!(to_store, vec![a, b]);
assert!(missing.is_empty());
}
#[test]
fn chunk_spill_round_trip() {
let mut spill = ChunkSpill::new().unwrap();
let data1 = vec![0xAA; 1024];
let data2 = vec![0xBB; 2048];
spill.push(&data1).unwrap();
spill.push(&data2).unwrap();
assert_eq!(spill.len(), 2);
assert_eq!(spill.total_bytes(), 1024 + 2048);
let chunk_entries = spill.chunk_entries().unwrap();
let entry_total: u64 = chunk_entries.iter().map(|(_, size)| *size).sum();
assert_eq!(entry_total, 1024 + 2048);
let chunk1 = spill.read_chunk(spill.addresses.first().unwrap()).unwrap();
assert_eq!(&chunk1[..], &data1[..]);
let chunk2 = spill.read_chunk(spill.addresses.get(1).unwrap()).unwrap();
assert_eq!(&chunk2[..], &data2[..]);
let waves: Vec<_> = spill.addresses.chunks(1).collect();
assert_eq!(waves.len(), 2);
}
#[test]
fn chunk_spill_cleanup_on_drop() {
let dir;
{
let spill = ChunkSpill::new().unwrap();
dir = spill.dir.clone();
assert!(dir.exists());
}
assert!(!dir.exists(), "spill dir should be removed on drop");
}
#[test]
fn chunk_spill_deduplicates_identical_content() {
let mut spill = ChunkSpill::new().unwrap();
let data = vec![0xCC; 512];
spill.push(&data).unwrap();
spill.push(&data).unwrap(); spill.push(&data).unwrap();
assert_eq!(spill.len(), 1, "duplicate chunks should be deduplicated");
assert_eq!(
spill.total_bytes(),
512,
"total_bytes should count unique only"
);
let data2 = vec![0xDD; 256];
spill.push(&data2).unwrap();
assert_eq!(spill.len(), 2);
assert_eq!(spill.total_bytes(), 512 + 256);
}
}
#[cfg(test)]
mod send_assertions {
use super::*;
fn _assert_send<T: Send>(_: &T) {}
#[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
async fn _file_upload_is_send(client: &Client) {
let fut = client.file_upload(Path::new("/dev/null"));
_assert_send(&fut);
}
#[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
async fn _file_upload_with_mode_is_send(client: &Client) {
let fut = client.file_upload_with_mode(Path::new("/dev/null"), PaymentMode::Auto);
_assert_send(&fut);
}
#[allow(
dead_code,
unreachable_code,
unused_variables,
clippy::diverging_sub_expression
)]
async fn _file_download_is_send(client: &Client) {
let dm: DataMap = todo!();
let fut = client.file_download(&dm, Path::new("/dev/null"));
_assert_send(&fut);
}
}