use crate::data::client::adaptive::{observe_op, rebucketed_unordered};
use crate::data::client::batch::{
finalize_batch_payment, PaymentIntent, PreparedChunk, WaveAggregateStats,
};
use crate::data::client::classify_error;
use crate::data::client::merkle::{
chunk_contents_for_upload_addresses, finalize_merkle_batch, merkle_deferred_retry,
merkle_store_with_retry, should_use_merkle, MerkleBatchPaymentResult, PaymentMode,
PreparedMerkleBatch, DEFERRED_ROUND_DELAYS_SECS,
};
use crate::data::client::Client;
use crate::data::error::{Error, PartialUploadSpend, Result};
use ant_protocol::evm::{Amount, PaymentQuote, QuoteHash, TxHash, MAX_LEAVES};
use ant_protocol::transport::{MultiAddr, PeerId};
use ant_protocol::{compute_address, DATA_TYPE_CHUNK};
use bytes::Bytes;
use fs2::FileExt;
use futures::stream::{self, StreamExt};
use self_encryption::{
get_root_data_map_parallel, stream_decrypt_batch_size, stream_encrypt,
streaming_decrypt_with_batch_size, DataMap,
};
use std::collections::{HashMap, HashSet};
use std::io::Write;
use std::num::NonZeroUsize;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use tokio::runtime::Handle;
use tokio::sync::mpsc;
use tracing::{debug, info, warn};
use xor_name::XorName;
#[derive(Debug, Clone)]
pub enum UploadEvent {
Encrypting { chunks_done: usize },
Encrypted { total_chunks: usize },
QuotingChunks {
wave: usize,
total_waves: usize,
chunks_in_wave: usize,
},
ChunkQuoted { quoted: usize, total: usize },
ChunkStored { stored: usize, total: usize },
WaveComplete {
wave: usize,
total_waves: usize,
stored_so_far: usize,
total: usize,
},
}
#[derive(Debug, Clone)]
pub enum DownloadEvent {
ResolvingDataMap { total_map_chunks: usize },
MapChunkFetched { fetched: usize },
DataMapResolved { total_chunks: usize },
ChunksFetched { fetched: usize, total: usize },
}
type QuoteEntry = (PeerId, Vec<MultiAddr>, PaymentQuote, Amount);
const UPLOAD_WAVE_SIZE: usize = 64;
const DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER: usize = 4;
const DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR: u64 = 4;
const DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER: u64 = 3;
const ESTIMATE_SAMPLE_CAP: usize = 5;
fn distributed_sample_indices(total: usize, cap: usize) -> Vec<usize> {
if total == 0 {
return Vec::new();
}
let sample_limit = total.min(cap);
if sample_limit <= 1 {
return vec![0];
}
let mut indices: Vec<usize> = (0..sample_limit)
.map(|i| i * (total - 1) / (sample_limit - 1))
.collect();
indices.dedup(); indices
}
const GAS_PER_WAVE_TX: u128 = 1_500_000;
const GAS_PER_MERKLE_TX: u128 = 500_000;
const ARBITRUM_GAS_PRICE_WEI: u128 = 100_000_000;
const DISK_SPACE_HEADROOM_PERCENT: u64 = 10;
const SPILL_STALE_GRACE_SECS: u64 = 30;
const SPILL_DIR_PREFIX: &str = "spill_";
const SPILL_LOCK_NAME: &str = ".lock";
struct ChunkSpill {
dir: PathBuf,
_lock: std::fs::File,
addresses: Vec<[u8; 32]>,
seen: HashSet<[u8; 32]>,
sizes: HashMap<[u8; 32], u64>,
total_bytes: u64,
}
impl ChunkSpill {
fn spill_root() -> Result<PathBuf> {
use crate::config;
let root = config::data_dir()
.map_err(|e| Error::Config(format!("cannot determine data dir for spill: {e}")))?
.join("spill");
Ok(root)
}
fn new() -> Result<Self> {
let root = Self::spill_root()?;
std::fs::create_dir_all(&root)?;
Self::cleanup_stale(&root);
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let unique: u64 = rand::random();
let dir = root.join(format!("{SPILL_DIR_PREFIX}{now}_{unique}"));
std::fs::create_dir(&dir)?;
let lock_path = dir.join(SPILL_LOCK_NAME);
let lock_file = std::fs::File::create(&lock_path).map_err(|e| {
Error::Io(std::io::Error::new(
e.kind(),
format!("failed to create spill lockfile: {e}"),
))
})?;
lock_file.try_lock_exclusive().map_err(|e| {
Error::Io(std::io::Error::new(
e.kind(),
format!("failed to lock spill lockfile: {e}"),
))
})?;
Ok(Self {
dir,
_lock: lock_file,
addresses: Vec::new(),
seen: HashSet::new(),
sizes: HashMap::new(),
total_bytes: 0,
})
}
fn cleanup_stale(root: &Path) {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
if now == 0 {
warn!("System clock before Unix epoch, skipping spill cleanup");
return;
}
let entries = match std::fs::read_dir(root) {
Ok(entries) => entries,
Err(_) => return,
};
for entry in entries.flatten() {
let name = entry.file_name();
let name_str = name.to_string_lossy();
let suffix = match name_str.strip_prefix(SPILL_DIR_PREFIX) {
Some(s) => s,
None => continue,
};
let timestamp: u64 = match suffix.split('_').next().and_then(|s| s.parse().ok()) {
Some(ts) => ts,
None => continue,
};
if now.saturating_sub(timestamp) < SPILL_STALE_GRACE_SECS {
continue;
}
let file_type = match entry.file_type() {
Ok(ft) => ft,
Err(_) => continue,
};
if !file_type.is_dir() {
continue;
}
let path = entry.path();
let lock_path = path.join(SPILL_LOCK_NAME);
if let Ok(lock_file) = std::fs::File::open(&lock_path) {
use fs2::FileExt;
if lock_file.try_lock_exclusive().is_err() {
debug!("Skipping active spill dir: {}", path.display());
continue;
}
drop(lock_file);
}
info!("Cleaning up stale spill dir: {}", path.display());
if let Err(e) = std::fs::remove_dir_all(&path) {
warn!("Failed to clean up stale spill dir {}: {e}", path.display());
}
}
}
#[allow(dead_code)]
pub(crate) fn run_cleanup() {
if let Ok(root) = Self::spill_root() {
Self::cleanup_stale(&root);
}
}
fn push(&mut self, content: &[u8]) -> Result<()> {
let address = compute_address(content);
if !self.seen.insert(address) {
return Ok(());
}
let path = self.dir.join(hex::encode(address));
std::fs::write(&path, content)?;
let content_len = content.len() as u64;
self.sizes.insert(address, content_len);
self.total_bytes += content_len;
self.addresses.push(address);
Ok(())
}
fn len(&self) -> usize {
self.addresses.len()
}
fn total_bytes(&self) -> u64 {
self.total_bytes
}
fn chunk_entries(&self) -> Result<Vec<([u8; 32], u64)>> {
self.addresses
.iter()
.map(|address| {
self.sizes
.get(address)
.copied()
.map(|size| (*address, size))
.ok_or_else(|| {
Error::Storage(format!(
"missing size for spilled chunk {}",
hex::encode(address)
))
})
})
.collect()
}
fn read_chunk(&self, address: &[u8; 32]) -> Result<Bytes> {
let path = self.dir.join(hex::encode(address));
let data = std::fs::read(&path).map_err(|e| {
Error::Io(std::io::Error::new(
e.kind(),
format!("reading spilled chunk {}: {e}", hex::encode(address)),
))
})?;
Ok(Bytes::from(data))
}
fn read_wave(&self, wave_addrs: &[[u8; 32]]) -> Result<Vec<(Bytes, [u8; 32])>> {
let mut out = Vec::with_capacity(wave_addrs.len());
for addr in wave_addrs {
let content = self.read_chunk(addr)?;
out.push((content, *addr));
}
Ok(out)
}
fn cleanup(&self) {
if let Err(e) = std::fs::remove_dir_all(&self.dir) {
warn!(
"Failed to clean up chunk spill dir {}: {e}",
self.dir.display()
);
}
}
}
impl Drop for ChunkSpill {
fn drop(&mut self) {
self.cleanup();
}
}
fn cached_merkle_covers_addresses(
cached: &MerkleBatchPaymentResult,
addresses: &[[u8; 32]],
) -> bool {
addresses
.iter()
.all(|addr| cached.proofs.contains_key(addr))
}
fn partition_addresses_by_proof(
addresses: &[[u8; 32]],
proofs: &HashMap<[u8; 32], Vec<u8>>,
) -> (Vec<[u8; 32]>, Vec<[u8; 32]>) {
addresses
.iter()
.copied()
.partition(|addr| proofs.contains_key(addr))
}
fn partial_upload_after_fatal(
addresses: &[[u8; 32]],
stored_addresses: Vec<[u8; 32]>,
stored_count: usize,
total_chunks: usize,
known_failed: Vec<([u8; 32], String)>,
spend: PartialUploadSpend,
reason: String,
) -> Error {
let stored_set: HashSet<[u8; 32]> = stored_addresses.iter().copied().collect();
let mut failed_map: HashMap<[u8; 32], String> = HashMap::new();
for (addr, msg) in known_failed {
if !stored_set.contains(&addr) {
failed_map.entry(addr).or_insert(msg);
}
}
for addr in addresses {
if !stored_set.contains(addr) {
failed_map.entry(*addr).or_insert_with(|| reason.clone());
}
}
let failed: Vec<([u8; 32], String)> = failed_map.into_iter().collect();
let failed_count = failed.len();
Error::PartialUpload {
stored: stored_addresses,
stored_count,
failed,
failed_count,
total_chunks,
spend: Box::new(spend),
reason,
}
}
#[derive(Debug)]
struct SingleWaveOutcome {
stored: Vec<[u8; 32]>,
failed: Vec<([u8; 32], String)>,
storage_atto: Amount,
gas_wei: u128,
stats: WaveAggregateStats,
}
fn fold_single_wave(
result: Result<(Vec<[u8; 32]>, String, u128, WaveAggregateStats)>,
) -> Result<SingleWaveOutcome> {
match result {
Ok((stored, storage, gas, stats)) => Ok(SingleWaveOutcome {
stored,
failed: Vec::new(),
storage_atto: storage.parse().unwrap_or(Amount::ZERO),
gas_wei: gas,
stats,
}),
Err(Error::PartialUpload {
stored,
failed,
spend,
..
}) => Ok(SingleWaveOutcome {
stored,
failed,
storage_atto: spend.storage_cost_atto.parse().unwrap_or(Amount::ZERO),
gas_wei: spend.gas_cost_wei,
stats: WaveAggregateStats::default(),
}),
Err(e) => Err(e),
}
}
fn check_disk_space_for_spill(file_size: u64) -> Result<()> {
let spill_root = ChunkSpill::spill_root()?;
std::fs::create_dir_all(&spill_root)?;
let available = fs2::available_space(&spill_root).map_err(|e| {
Error::Io(std::io::Error::new(
e.kind(),
format!(
"failed to query disk space on {}: {e}",
spill_root.display()
),
))
})?;
let headroom = file_size / DISK_SPACE_HEADROOM_PERCENT;
let required = file_size.saturating_add(headroom);
if available < required {
let avail_mb = available / (1024 * 1024);
let req_mb = required / (1024 * 1024);
return Err(Error::InsufficientDiskSpace(format!(
"need ~{req_mb} MB in spill dir ({}) but only {avail_mb} MB available",
spill_root.display()
)));
}
debug!(
"Disk space check passed: {available} bytes available, {required} bytes required (spill: {})",
spill_root.display()
);
Ok(())
}
fn usable_memory_bytes() -> Option<u64> {
let mut system = sysinfo::System::new();
system.refresh_memory();
let available_memory = system.available_memory();
let free_memory = system.free_memory();
let used_memory = system.used_memory();
let total_memory = system.total_memory();
let unused_memory = total_memory.saturating_sub(used_memory);
let mut usable = [available_memory, free_memory, unused_memory]
.into_iter()
.filter(|bytes| *bytes > 0)
.max();
let cgroup_free_memory = system
.cgroup_limits()
.filter(|limits| limits.total_memory > 0)
.map(|limits| limits.free_memory);
if let Some(cgroup_free_memory) = cgroup_free_memory {
usable = Some(usable.unwrap_or(u64::MAX).min(cgroup_free_memory));
}
debug!(
available_memory,
free_memory,
used_memory,
total_memory,
cgroup_free_memory,
usable_memory = ?usable,
"Detected usable memory for stream decrypt batch sizing"
);
usable
}
fn stream_decrypt_batch_memory_cap(usable_memory_bytes: u64) -> usize {
let budget = usable_memory_bytes / DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR;
let estimated_bytes_per_chunk = (self_encryption::MAX_CHUNK_SIZE as u64)
.saturating_mul(DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER)
.max(1);
let cap = (budget / estimated_bytes_per_chunk).max(1);
usize::try_from(cap).unwrap_or(usize::MAX)
}
fn adaptive_stream_decrypt_batch_size(
total_chunks: usize,
fetch_cap: usize,
configured_batch_floor: usize,
usable_memory_bytes: Option<u64>,
) -> usize {
let fetch_target = fetch_cap
.max(1)
.saturating_mul(DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER);
let requested = match usable_memory_bytes {
Some(bytes) => {
let memory_cap = stream_decrypt_batch_memory_cap(bytes);
configured_batch_floor
.max(fetch_target)
.max(1)
.min(memory_cap)
}
None => configured_batch_floor.max(1),
};
requested.min(total_chunks.max(1)).max(1)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum Visibility {
#[default]
Private,
Public,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum CostEstimateConfidence {
#[default]
PricedSample,
VerifiedAllAlreadyStored,
AllSamplesAlreadyStoredIncomplete,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[non_exhaustive]
pub struct UploadCostEstimate {
pub file_size: u64,
pub chunk_count: usize,
pub storage_cost_atto: String,
pub estimated_gas_cost_wei: String,
pub payment_mode: PaymentMode,
#[serde(default)]
pub confidence: CostEstimateConfidence,
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct FileUploadResult {
pub data_map: DataMap,
pub chunks_stored: usize,
pub chunks_failed: usize,
pub total_chunks: usize,
pub payment_mode_used: PaymentMode,
pub storage_cost_atto: String,
pub gas_cost_wei: u128,
pub data_map_address: Option<[u8; 32]>,
pub chunk_attempts_total: usize,
pub store_durations_ms: Vec<u64>,
pub retries_histogram: [usize; 4],
}
#[derive(Debug)]
pub enum ExternalPaymentInfo {
WaveBatch {
prepared_chunks: Vec<PreparedChunk>,
payment_intent: PaymentIntent,
},
Merkle {
prepared_batch: PreparedMerkleBatch,
chunk_contents: Vec<Bytes>,
chunk_addresses: Vec<[u8; 32]>,
},
}
#[derive(Debug)]
#[non_exhaustive]
pub struct PreparedUpload {
pub data_map: DataMap,
pub payment_info: ExternalPaymentInfo,
pub data_map_address: Option<[u8; 32]>,
pub already_stored_addresses: Vec<[u8; 32]>,
pub total_chunks: usize,
}
type EncryptionChannels = (
tokio::sync::mpsc::Receiver<Bytes>,
tokio::sync::oneshot::Receiver<DataMap>,
tokio::task::JoinHandle<Result<()>>,
);
fn spawn_file_encryption(path: PathBuf) -> Result<EncryptionChannels> {
let metadata = std::fs::metadata(&path)?;
let data_size = usize::try_from(metadata.len())
.map_err(|e| Error::Encryption(format!("file size exceeds platform usize: {e}")))?;
let (chunk_tx, chunk_rx) = tokio::sync::mpsc::channel(2);
let (datamap_tx, datamap_rx) = tokio::sync::oneshot::channel();
let handle = tokio::task::spawn_blocking(move || {
let file = std::fs::File::open(&path)?;
let mut reader = std::io::BufReader::new(file);
let read_error: Arc<Mutex<Option<std::io::Error>>> = Arc::new(Mutex::new(None));
let read_error_clone = Arc::clone(&read_error);
let data_iter = std::iter::from_fn(move || {
let mut buffer = vec![0u8; 8192];
match std::io::Read::read(&mut reader, &mut buffer) {
Ok(0) => None,
Ok(n) => {
buffer.truncate(n);
Some(Bytes::from(buffer))
}
Err(e) => {
let mut guard = read_error_clone
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
*guard = Some(e);
None
}
}
});
let mut stream = stream_encrypt(data_size, data_iter)
.map_err(|e| Error::Encryption(format!("stream_encrypt failed: {e}")))?;
for chunk_result in stream.chunks() {
{
let guard = read_error
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
if let Some(ref e) = *guard {
return Err(Error::Io(std::io::Error::new(e.kind(), e.to_string())));
}
}
let (_hash, content) = chunk_result
.map_err(|e| Error::Encryption(format!("chunk encryption failed: {e}")))?;
if chunk_tx.blocking_send(content).is_err() {
return Err(Error::Encryption("upload receiver dropped".to_string()));
}
}
{
let guard = read_error
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
if let Some(ref e) = *guard {
return Err(Error::Io(std::io::Error::new(e.kind(), e.to_string())));
}
}
let datamap = stream
.into_datamap()
.ok_or_else(|| Error::Encryption("no DataMap after encryption".to_string()))?;
if datamap_tx.send(datamap).is_err() {
warn!("DataMap receiver dropped — upload may have been cancelled");
}
Ok(())
});
Ok((chunk_rx, datamap_rx, handle))
}
struct TempDownload {
path: Option<PathBuf>,
}
impl TempDownload {
fn new(path: PathBuf) -> Self {
Self { path: Some(path) }
}
fn path(&self) -> &Path {
self.path
.as_deref()
.expect("TempDownload::path called after commit")
}
fn commit(mut self, dest: &Path) -> std::io::Result<()> {
std::fs::rename(self.path(), dest)?; self.path = None; Ok(())
}
}
impl Drop for TempDownload {
fn drop(&mut self) {
if let Some(path) = self.path.take() {
if let Err(e) = std::fs::remove_file(&path) {
if e.kind() != std::io::ErrorKind::NotFound {
warn!(
"Failed to remove temp download file {}: {e}",
path.display()
);
}
}
}
}
}
impl Client {
pub async fn file_upload(&self, path: &Path) -> Result<FileUploadResult> {
self.file_upload_with_mode(path, PaymentMode::Auto).await
}
pub async fn estimate_upload_cost(
&self,
path: &Path,
mode: PaymentMode,
progress: Option<mpsc::Sender<UploadEvent>>,
) -> Result<UploadCostEstimate> {
let file_size = std::fs::metadata(path).map_err(Error::Io)?.len();
if file_size < 3 {
return Err(Error::InvalidData(
"File too small: self-encryption requires at least 3 bytes".into(),
));
}
check_disk_space_for_spill(file_size)?;
info!(
"Estimating upload cost for {} ({file_size} bytes)",
path.display()
);
let (spill, _data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
let chunk_count = spill.len();
if let Some(ref tx) = progress {
let _ = tx
.send(UploadEvent::Encrypted {
total_chunks: chunk_count,
})
.await;
}
info!("Encrypted into {chunk_count} chunks, requesting quote");
let uses_merkle = should_use_merkle(chunk_count, mode);
let sample_indices = distributed_sample_indices(spill.addresses.len(), ESTIMATE_SAMPLE_CAP);
let mut sampled = 0usize;
let mut all_already_stored = true;
let mut quotes_opt: Option<Vec<QuoteEntry>> = None;
for &idx in &sample_indices {
let addr = &spill.addresses[idx];
sampled += 1;
let chunk_bytes = spill.read_chunk(addr)?;
let data_size = u64::try_from(chunk_bytes.len())
.map_err(|e| Error::InvalidData(format!("chunk size too large: {e}")))?;
let result = if uses_merkle {
self.get_store_quotes_with_fault_tolerance(addr, data_size, DATA_TYPE_CHUNK)
.await
} else {
self.get_store_quotes(addr, data_size, DATA_TYPE_CHUNK)
.await
};
match result {
Ok(q) => {
quotes_opt = Some(q);
all_already_stored = false;
break;
}
Err(Error::AlreadyStored) => {
debug!(
"Sample chunk {} already stored; trying next address ({sampled}/{})",
hex::encode(addr),
sample_indices.len()
);
continue;
}
Err(e) => return Err(e),
}
}
let quotes = match quotes_opt {
Some(q) => q,
None if all_already_stored && sampled == chunk_count => {
info!("All {chunk_count} chunks already stored; returning zero-cost estimate");
return Ok(UploadCostEstimate {
file_size,
chunk_count,
storage_cost_atto: "0".into(),
estimated_gas_cost_wei: "0".into(),
payment_mode: if uses_merkle {
PaymentMode::Merkle
} else {
PaymentMode::Single
},
confidence: CostEstimateConfidence::VerifiedAllAlreadyStored,
});
}
None => {
info!(
"All {sampled}/{chunk_count} sampled chunks already stored; \
returning incomplete zero-cost estimate"
);
return Ok(UploadCostEstimate {
file_size,
chunk_count,
storage_cost_atto: "0".into(),
estimated_gas_cost_wei: "0".into(),
payment_mode: if uses_merkle {
PaymentMode::Merkle
} else {
PaymentMode::Single
},
confidence: CostEstimateConfidence::AllSamplesAlreadyStoredIncomplete,
});
}
};
let mut prices: Vec<Amount> = quotes.iter().map(|(_, _, _, price)| *price).collect();
prices.sort();
let median_price = prices
.get(prices.len() / 2)
.copied()
.unwrap_or(Amount::ZERO);
let per_chunk_cost = median_price * Amount::from(3u64);
let chunk_count_u64 = u64::try_from(chunk_count).unwrap_or(u64::MAX);
let total_storage = per_chunk_cost * Amount::from(chunk_count_u64);
let waves = u128::try_from(chunk_count.div_ceil(UPLOAD_WAVE_SIZE)).unwrap_or(u128::MAX);
let merkle_batches = u128::try_from(chunk_count.div_ceil(MAX_LEAVES)).unwrap_or(u128::MAX);
let estimated_gas: u128 = if uses_merkle {
merkle_batches
.saturating_mul(GAS_PER_MERKLE_TX)
.saturating_mul(ARBITRUM_GAS_PRICE_WEI)
} else {
waves
.saturating_mul(GAS_PER_WAVE_TX)
.saturating_mul(ARBITRUM_GAS_PRICE_WEI)
};
info!(
"Estimate: {chunk_count} chunks, storage={total_storage} atto, gas~={estimated_gas} wei"
);
Ok(UploadCostEstimate {
file_size,
chunk_count,
storage_cost_atto: total_storage.to_string(),
estimated_gas_cost_wei: estimated_gas.to_string(),
payment_mode: if uses_merkle {
PaymentMode::Merkle
} else {
PaymentMode::Single
},
confidence: CostEstimateConfidence::PricedSample,
})
}
pub async fn file_prepare_upload(&self, path: &Path) -> Result<PreparedUpload> {
self.file_prepare_upload_with_progress(path, Visibility::Private, None)
.await
}
pub async fn file_prepare_upload_with_visibility(
&self,
path: &Path,
visibility: Visibility,
) -> Result<PreparedUpload> {
self.file_prepare_upload_with_progress(path, visibility, None)
.await
}
pub async fn file_prepare_upload_with_progress(
&self,
path: &Path,
visibility: Visibility,
progress: Option<mpsc::Sender<UploadEvent>>,
) -> Result<PreparedUpload> {
debug!(
"Preparing file upload for external signing (visibility={visibility:?}): {}",
path.display()
);
let file_size = std::fs::metadata(path)?.len();
check_disk_space_for_spill(file_size)?;
let (spill, data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
info!(
"Encrypted {} into {} chunks for external signing (spilled to disk)",
path.display(),
spill.len()
);
let mut chunk_data: Vec<Bytes> = spill
.addresses
.iter()
.map(|addr| spill.read_chunk(addr))
.collect::<std::result::Result<Vec<_>, _>>()?;
let data_map_address = match visibility {
Visibility::Private => None,
Visibility::Public => {
let serialized = rmp_serde::to_vec(&data_map).map_err(|e| {
Error::Serialization(format!("Failed to serialize DataMap: {e}"))
})?;
let bytes = Bytes::from(serialized);
let address = compute_address(&bytes);
info!(
"Public upload: bundling DataMap chunk ({} bytes) at address {}",
bytes.len(),
hex::encode(address)
);
chunk_data.push(bytes);
Some(address)
}
};
let chunk_count = chunk_data.len();
if let Some(ref tx) = progress {
let _ = tx
.send(UploadEvent::Encrypted {
total_chunks: chunk_count,
})
.await;
}
let (payment_info, already_stored_addresses) = if should_use_merkle(
chunk_count,
PaymentMode::Auto,
) {
info!("Using merkle batch preparation for {chunk_count} file chunks");
let chunk_entries: Vec<([u8; 32], u64)> = chunk_data
.iter()
.map(|chunk| {
let size = u64::try_from(chunk.len())
.map_err(|e| Error::InvalidData(format!("chunk size too large: {e}")))?;
Ok((compute_address(chunk), size))
})
.collect::<Result<Vec<_>>>()?;
let merkle_plan = self
.plan_merkle_upload(chunk_entries, DATA_TYPE_CHUNK, progress.as_ref())
.await?;
if merkle_plan.to_upload.is_empty() {
info!("All {chunk_count} file chunks already stored; no external payment needed");
(
ExternalPaymentInfo::WaveBatch {
prepared_chunks: Vec::new(),
payment_intent: PaymentIntent::from_prepared_chunks(&[]),
},
merkle_plan.already_stored,
)
} else {
let chunk_data =
chunk_contents_for_upload_addresses(chunk_data, &merkle_plan.to_upload)?;
if !should_use_merkle(merkle_plan.to_upload.len(), PaymentMode::Auto) {
info!(
"{} file chunks need upload after merkle preflight; preparing wave-batch payment",
merkle_plan.to_upload.len()
);
let (payment_info, mut wave_already_stored) = self
.prepare_wave_batch_external_chunks(
chunk_data,
progress.as_ref(),
chunk_count,
)
.await?;
let mut already_stored = merkle_plan.already_stored;
already_stored.append(&mut wave_already_stored);
(payment_info, already_stored)
} else {
match self
.prepare_merkle_batch_external(
&merkle_plan.to_upload,
DATA_TYPE_CHUNK,
merkle_plan.to_upload_avg_size(),
)
.await
{
Ok(prepared_batch) => {
info!(
"File prepared for external merkle signing: {} chunks, depth={} ({})",
merkle_plan.to_upload.len(),
prepared_batch.depth,
path.display()
);
(
ExternalPaymentInfo::Merkle {
prepared_batch,
chunk_contents: chunk_data,
chunk_addresses: merkle_plan.to_upload,
},
merkle_plan.already_stored,
)
}
Err(Error::InsufficientPeers(ref msg)) => {
info!(
"External merkle preparation needs more peers ({msg}); preparing wave-batch payment"
);
let (payment_info, mut wave_already_stored) = self
.prepare_wave_batch_external_chunks(
chunk_data,
progress.as_ref(),
chunk_count,
)
.await?;
let mut already_stored = merkle_plan.already_stored;
already_stored.append(&mut wave_already_stored);
(payment_info, already_stored)
}
Err(e) => return Err(e),
}
}
}
} else {
self.prepare_wave_batch_external_chunks(chunk_data, progress.as_ref(), chunk_count)
.await?
};
if let Some(addr) = data_map_address {
let data_map_needs_payment = match &payment_info {
ExternalPaymentInfo::WaveBatch {
prepared_chunks, ..
} => prepared_chunks.iter().any(|c| c.address == addr),
ExternalPaymentInfo::Merkle {
chunk_addresses, ..
} => chunk_addresses.contains(&addr),
};
if !data_map_needs_payment {
info!(
"Public upload: DataMap chunk {} was already stored \
on the network — address is retrievable without a \
new payment",
hex::encode(addr)
);
}
}
Ok(PreparedUpload {
data_map,
payment_info,
data_map_address,
already_stored_addresses,
total_chunks: chunk_count,
})
}
async fn prepare_wave_batch_external_chunks(
&self,
chunk_data: Vec<Bytes>,
progress: Option<&mpsc::Sender<UploadEvent>>,
progress_total: usize,
) -> Result<(ExternalPaymentInfo, Vec<[u8; 32]>)> {
let chunk_count = chunk_data.len();
let chunks_with_addr: Vec<(Bytes, [u8; 32])> = chunk_data
.into_iter()
.map(|content| {
let address = compute_address(&content);
(content, address)
})
.collect();
let quote_limiter = self.controller().quote.clone();
let quote_concurrency = quote_limiter.current().min(chunk_count.max(1));
let mut quote_stream = stream::iter(chunks_with_addr)
.map(|(content, address)| {
let limiter = quote_limiter.clone();
async move {
let result = observe_op(
&limiter,
|| async move { self.prepare_chunk_payment(content).await },
classify_error,
)
.await;
(address, result)
}
})
.buffer_unordered(quote_concurrency);
let mut prepared_chunks = Vec::with_capacity(chunk_count);
let mut already_stored = Vec::new();
let mut quoted = 0usize;
while let Some((address, result)) = quote_stream.next().await {
match result? {
Some(prepared) => prepared_chunks.push(prepared),
None => already_stored.push(address),
}
quoted += 1;
if let Some(tx) = progress {
let _ = tx.try_send(UploadEvent::ChunkQuoted {
quoted,
total: progress_total,
});
}
}
let payment_intent = PaymentIntent::from_prepared_chunks(&prepared_chunks);
info!(
"Prepared external wave-batch payment: {} chunks, {} already stored, total {} atto",
prepared_chunks.len(),
already_stored.len(),
payment_intent.total_amount,
);
Ok((
ExternalPaymentInfo::WaveBatch {
prepared_chunks,
payment_intent,
},
already_stored,
))
}
pub async fn finalize_upload(
&self,
prepared: PreparedUpload,
tx_hash_map: &HashMap<QuoteHash, TxHash>,
) -> Result<FileUploadResult> {
self.finalize_upload_with_progress(prepared, tx_hash_map, None)
.await
}
pub async fn finalize_upload_with_progress(
&self,
prepared: PreparedUpload,
tx_hash_map: &HashMap<QuoteHash, TxHash>,
progress: Option<mpsc::Sender<UploadEvent>>,
) -> Result<FileUploadResult> {
let data_map_address = prepared.data_map_address;
let already_stored_addresses = prepared.already_stored_addresses;
let already_stored_count = already_stored_addresses.len();
let total_chunks = prepared.total_chunks;
match prepared.payment_info {
ExternalPaymentInfo::WaveBatch {
prepared_chunks,
payment_intent,
} => {
let paid_chunks = finalize_batch_payment(prepared_chunks, tx_hash_map)?;
let wave_result = self
.store_paid_chunks_with_events(
paid_chunks,
progress.as_ref(),
already_stored_count,
total_chunks,
)
.await;
if !wave_result.failed.is_empty() {
let failed_count = wave_result.failed.len();
let stored_count = already_stored_count + wave_result.stored.len();
let mut stored = already_stored_addresses;
stored.extend(wave_result.stored);
return Err(Error::PartialUpload {
stored,
stored_count,
failed: wave_result.failed,
failed_count,
total_chunks,
spend: Box::new(PartialUploadSpend {
storage_cost_atto: payment_intent.total_amount.to_string(),
gas_cost_wei: 0,
}),
reason: "finalize_upload: chunk storage failed after retries".into(),
});
}
let chunks_stored = already_stored_count + wave_result.stored.len();
info!("External-signer upload finalized: {chunks_stored} chunks stored");
let mut stats = WaveAggregateStats::default();
stats.absorb(&wave_result);
Ok(FileUploadResult {
data_map: prepared.data_map,
chunks_stored,
chunks_failed: 0,
total_chunks,
payment_mode_used: PaymentMode::Single,
storage_cost_atto: payment_intent.total_amount.to_string(),
gas_cost_wei: 0,
data_map_address,
chunk_attempts_total: stats.chunk_attempts_total,
store_durations_ms: stats.store_durations_ms,
retries_histogram: stats.retries_histogram,
})
}
ExternalPaymentInfo::Merkle { .. } => Err(Error::Payment(
"Cannot finalize merkle upload with wave-batch tx hashes. \
Use finalize_upload_merkle() instead."
.to_string(),
)),
}
}
pub async fn finalize_upload_merkle(
&self,
prepared: PreparedUpload,
winner_pool_hash: [u8; 32],
) -> Result<FileUploadResult> {
self.finalize_upload_merkle_with_progress(prepared, winner_pool_hash, None)
.await
}
pub async fn finalize_upload_merkle_with_progress(
&self,
prepared: PreparedUpload,
winner_pool_hash: [u8; 32],
progress: Option<mpsc::Sender<UploadEvent>>,
) -> Result<FileUploadResult> {
let data_map_address = prepared.data_map_address;
let already_stored_count = prepared.already_stored_addresses.len();
let total_chunks = prepared.total_chunks;
match prepared.payment_info {
ExternalPaymentInfo::Merkle {
prepared_batch,
chunk_contents,
chunk_addresses,
} => {
let batch_result = finalize_merkle_batch(prepared_batch, winner_pool_hash)?;
let outcome = self
.merkle_upload_chunks(
chunk_contents,
chunk_addresses,
&batch_result,
progress.as_ref(),
already_stored_count,
total_chunks,
)
.await?;
info!(
"External-signer merkle upload finalized: {} chunks stored, {} failed",
outcome.stored, outcome.failed
);
Ok(FileUploadResult {
data_map: prepared.data_map,
chunks_stored: outcome.stored,
chunks_failed: outcome.failed,
total_chunks,
payment_mode_used: PaymentMode::Merkle,
storage_cost_atto: "0".into(),
gas_cost_wei: 0,
data_map_address,
chunk_attempts_total: outcome.stats.chunk_attempts_total,
store_durations_ms: outcome.stats.store_durations_ms,
retries_histogram: outcome.stats.retries_histogram,
})
}
ExternalPaymentInfo::WaveBatch { .. } => Err(Error::Payment(
"Cannot finalize wave-batch upload with merkle winner hash. \
Use finalize_upload() instead."
.to_string(),
)),
}
}
#[allow(clippy::too_many_lines)]
pub async fn file_upload_with_mode(
&self,
path: &Path,
mode: PaymentMode,
) -> Result<FileUploadResult> {
self.file_upload_with_progress(path, mode, None).await
}
#[allow(clippy::too_many_lines)]
pub async fn file_upload_public_with_mode(
&self,
path: &Path,
mode: PaymentMode,
) -> Result<FileUploadResult> {
self.file_upload_with_visibility_and_progress(path, mode, Visibility::Public, None)
.await
}
#[allow(clippy::too_many_lines)]
pub async fn file_upload_with_progress(
&self,
path: &Path,
mode: PaymentMode,
progress: Option<mpsc::Sender<UploadEvent>>,
) -> Result<FileUploadResult> {
self.file_upload_with_visibility_and_progress(path, mode, Visibility::Private, progress)
.await
}
#[allow(clippy::too_many_lines)]
pub async fn file_upload_public_with_progress(
&self,
path: &Path,
mode: PaymentMode,
progress: Option<mpsc::Sender<UploadEvent>>,
) -> Result<FileUploadResult> {
self.file_upload_with_visibility_and_progress(path, mode, Visibility::Public, progress)
.await
}
#[allow(clippy::too_many_lines)]
async fn file_upload_with_visibility_and_progress(
&self,
path: &Path,
mode: PaymentMode,
visibility: Visibility,
progress: Option<mpsc::Sender<UploadEvent>>,
) -> Result<FileUploadResult> {
debug!(
"Streaming file upload with mode {mode:?}, visibility {visibility:?}: {}",
path.display()
);
let file_size = std::fs::metadata(path)?.len();
check_disk_space_for_spill(file_size)?;
let (mut spill, data_map) = self.encrypt_file_to_spill(path, progress.as_ref()).await?;
let data_map_address = match visibility {
Visibility::Private => None,
Visibility::Public => {
let serialized = rmp_serde::to_vec(&data_map).map_err(|e| {
Error::Serialization(format!("Failed to serialize DataMap: {e}"))
})?;
let address = compute_address(&serialized);
info!(
"Public upload: adding DataMap chunk ({} bytes) at address {} to payment batch",
serialized.len(),
hex::encode(address)
);
spill.push(&serialized)?;
Some(address)
}
};
let chunk_count = spill.len();
info!(
"Encrypted {} into {chunk_count} chunks (spilled to disk)",
path.display()
);
if let Some(ref tx) = progress {
let _ = tx
.send(UploadEvent::Encrypted {
total_chunks: chunk_count,
})
.await;
}
let file_path_key = std::fs::canonicalize(path)
.map(|p| p.display().to_string())
.unwrap_or_else(|_| path.display().to_string());
let (chunks_stored, actual_mode, storage_cost_atto, gas_cost_wei, stats) = if self
.should_use_merkle(chunk_count, mode)
{
info!("Using merkle batch payment for {chunk_count} file chunks");
let cached_merkle =
crate::data::client::cached_merkle::try_load_for_file(&file_path_key)
.map(|(_cache_path, cached)| cached);
let merkle_plan = match self
.plan_merkle_upload(spill.chunk_entries()?, DATA_TYPE_CHUNK, progress.as_ref())
.await
{
Ok(plan) => plan,
Err(e) => {
if let Some(cached) = cached_merkle
.as_ref()
.filter(|cached| cached_merkle_covers_addresses(cached, &spill.addresses))
{
info!(
"Merkle preflight failed ({e}); \
resuming with cached merkle proofs"
);
let (stored, sc, gc, stats) = self
.upload_waves_merkle(
&spill,
&spill.addresses,
cached,
&[],
progress.as_ref(),
)
.await?;
crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
return Ok(FileUploadResult {
data_map,
chunks_stored: stored,
chunks_failed: 0,
total_chunks: chunk_count,
payment_mode_used: PaymentMode::Merkle,
storage_cost_atto: sc,
gas_cost_wei: gc,
data_map_address,
chunk_attempts_total: stats.chunk_attempts_total,
store_durations_ms: stats.store_durations_ms,
retries_histogram: stats.retries_histogram,
});
}
match &e {
Error::InsufficientPeers(msg) if mode == PaymentMode::Auto => {
info!(
"Merkle preflight needs more peers ({msg}), \
falling back to wave-batch"
);
let (stored, sc, gc, fb_stats) = self
.upload_waves_single(
&spill,
progress.as_ref(),
Some(&file_path_key),
)
.await?;
crate::data::client::cached_single::try_delete_for_file(&file_path_key);
return Ok(FileUploadResult {
data_map,
chunks_stored: stored,
chunks_failed: 0,
total_chunks: chunk_count,
payment_mode_used: PaymentMode::Single,
storage_cost_atto: sc,
gas_cost_wei: gc,
data_map_address,
chunk_attempts_total: fb_stats.chunk_attempts_total,
store_durations_ms: fb_stats.store_durations_ms,
retries_histogram: fb_stats.retries_histogram,
});
}
_ => return Err(e),
}
}
};
if merkle_plan.to_upload.is_empty() {
info!("All {chunk_count} merkle chunks already stored; skipping payment");
crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
crate::data::client::cached_single::try_delete_for_file(&file_path_key);
(
chunk_count,
PaymentMode::Merkle,
"0".to_string(),
0,
WaveAggregateStats::default(),
)
} else if !self.should_use_merkle(merkle_plan.to_upload.len(), mode) {
let remaining_chunks = merkle_plan.to_upload.len();
if let Some(cached) = cached_merkle
.as_ref()
.filter(|cached| cached_merkle_covers_addresses(cached, &merkle_plan.to_upload))
{
info!(
"{remaining_chunks} chunks remain below merkle threshold; \
reusing cached merkle proofs"
);
let (stored, sc, gc, stats) = self
.upload_waves_merkle(
&spill,
&merkle_plan.to_upload,
cached,
&merkle_plan.already_stored,
progress.as_ref(),
)
.await?;
crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
(stored, PaymentMode::Merkle, sc, gc, stats)
} else {
if cached_merkle.is_some() {
info!(
"{remaining_chunks} chunks remain below merkle threshold, \
and the cached merkle receipt does not cover them. \
Discarding cache and using single-node payment."
);
crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
} else {
info!(
"{remaining_chunks} chunks need upload after merkle preflight; \
using single-node payment"
);
}
let (stored, sc, gc, stats) = self
.upload_spill_addresses_single(
&spill,
&merkle_plan.to_upload,
progress.as_ref(),
&merkle_plan.already_stored,
chunk_count,
Some(&file_path_key),
)
.await?;
crate::data::client::cached_single::try_delete_for_file(&file_path_key);
(stored, PaymentMode::Single, sc, gc, stats)
}
} else {
let batch_result = if let Some(cached) = cached_merkle.as_ref() {
if cached_merkle_covers_addresses(cached, &merkle_plan.to_upload) {
info!(
"Skipping merkle payment phase; resuming with \
cached proofs for {} remaining chunks",
merkle_plan.to_upload.len()
);
Ok(cached.clone())
} else {
info!(
"Cached merkle receipt does not cover the current \
remaining chunks (cached={}, remaining={}). \
Discarding cache and paying fresh.",
cached.proofs.len(),
merkle_plan.to_upload.len()
);
crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
self.pay_for_merkle_batch(
&merkle_plan.to_upload,
DATA_TYPE_CHUNK,
merkle_plan.to_upload_avg_size(),
)
.await
.inspect(|result| {
crate::data::client::cached_merkle::try_save(&file_path_key, result);
})
}
} else {
self.pay_for_merkle_batch(
&merkle_plan.to_upload,
DATA_TYPE_CHUNK,
merkle_plan.to_upload_avg_size(),
)
.await
.inspect(|result| {
crate::data::client::cached_merkle::try_save(&file_path_key, result);
})
};
let batch_result = match batch_result {
Ok(result) => result,
Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => {
info!("Merkle needs more peers ({msg}), falling back to wave-batch");
let (stored, sc, gc, fb_stats) = self
.upload_spill_addresses_single(
&spill,
&merkle_plan.to_upload,
progress.as_ref(),
&merkle_plan.already_stored,
chunk_count,
Some(&file_path_key),
)
.await?;
crate::data::client::cached_single::try_delete_for_file(&file_path_key);
return Ok(FileUploadResult {
data_map,
chunks_stored: stored,
chunks_failed: 0,
total_chunks: chunk_count,
payment_mode_used: PaymentMode::Single,
storage_cost_atto: sc,
gas_cost_wei: gc,
data_map_address,
chunk_attempts_total: fb_stats.chunk_attempts_total,
store_durations_ms: fb_stats.store_durations_ms,
retries_histogram: fb_stats.retries_histogram,
});
}
Err(e) => return Err(e),
};
let (stored, sc, gc, stats) = self
.upload_waves_merkle(
&spill,
&merkle_plan.to_upload,
&batch_result,
&merkle_plan.already_stored,
progress.as_ref(),
)
.await?;
crate::data::client::cached_merkle::try_delete_for_file(&file_path_key);
(stored, PaymentMode::Merkle, sc, gc, stats)
}
} else {
let (stored, sc, gc, stats) = self
.upload_waves_single(&spill, progress.as_ref(), Some(&file_path_key))
.await?;
crate::data::client::cached_single::try_delete_for_file(&file_path_key);
(stored, PaymentMode::Single, sc, gc, stats)
};
info!(
"File uploaded with {actual_mode:?}: {chunks_stored} chunks stored ({})",
path.display()
);
Ok(FileUploadResult {
data_map,
chunks_stored,
chunks_failed: 0,
total_chunks: chunk_count,
payment_mode_used: actual_mode,
storage_cost_atto,
gas_cost_wei,
data_map_address,
chunk_attempts_total: stats.chunk_attempts_total,
store_durations_ms: stats.store_durations_ms,
retries_histogram: stats.retries_histogram,
})
}
async fn encrypt_file_to_spill(
&self,
path: &Path,
progress: Option<&mpsc::Sender<UploadEvent>>,
) -> Result<(ChunkSpill, DataMap)> {
let (mut chunk_rx, datamap_rx, handle) = spawn_file_encryption(path.to_path_buf())?;
let mut spill = ChunkSpill::new()?;
while let Some(content) = chunk_rx.recv().await {
spill.push(&content)?;
let chunks_done = spill.len();
if let Some(tx) = progress {
if chunks_done.is_multiple_of(10) {
let _ = tx.send(UploadEvent::Encrypting { chunks_done }).await;
}
}
if chunks_done % 100 == 0 {
let mb = spill.total_bytes() / (1024 * 1024);
info!(
"Encryption progress: {chunks_done} chunks spilled ({mb} MB) — {}",
path.display()
);
}
}
handle
.await
.map_err(|e| Error::Encryption(format!("encryption task panicked: {e}")))?
.map_err(|e| Error::Encryption(format!("encryption failed: {e}")))?;
let data_map = datamap_rx
.await
.map_err(|_| Error::Encryption("no DataMap from encryption thread".to_string()))?;
Ok((spill, data_map))
}
async fn upload_waves_single(
&self,
spill: &ChunkSpill,
progress: Option<&mpsc::Sender<UploadEvent>>,
resume_key: Option<&str>,
) -> Result<(usize, String, u128, WaveAggregateStats)> {
self.upload_spill_addresses_single(
spill,
&spill.addresses,
progress,
&[],
spill.len(),
resume_key,
)
.await
}
async fn upload_spill_addresses_single(
&self,
spill: &ChunkSpill,
addresses: &[[u8; 32]],
progress: Option<&mpsc::Sender<UploadEvent>>,
already_stored_addresses: &[[u8; 32]],
total_chunks: usize,
resume_key: Option<&str>,
) -> Result<(usize, String, u128, WaveAggregateStats)> {
let mut total_stored = already_stored_addresses.len();
let mut total_storage = Amount::ZERO;
let mut total_gas: u128 = 0;
let mut agg_stats = WaveAggregateStats::default();
let mut stored_addresses: Vec<[u8; 32]> = already_stored_addresses.to_vec();
let mut failed: Vec<([u8; 32], String)> = Vec::new();
let waves: Vec<&[[u8; 32]]> = addresses.chunks(UPLOAD_WAVE_SIZE).collect();
let wave_count = waves.len();
info!(
"single-node upload: {} chunk(s) in {wave_count} wave(s) (continue-on-partial)",
addresses.len()
);
for (wave_idx, wave_addrs) in waves.into_iter().enumerate() {
let wave_num = wave_idx + 1;
let wave_data: Vec<Bytes> = wave_addrs
.iter()
.map(|addr| spill.read_chunk(addr))
.collect::<Result<Vec<_>>>()?;
info!(
"Wave {wave_num}/{wave_count}: quoting {} chunks — {total_stored}/{total_chunks} stored so far",
wave_data.len()
);
if let Some(tx) = progress {
let _ = tx
.send(UploadEvent::QuotingChunks {
wave: wave_num,
total_waves: wave_count,
chunks_in_wave: wave_data.len(),
})
.await;
}
let outcome = fold_single_wave(
self.batch_upload_chunks_with_events(
wave_data,
progress,
total_stored,
total_chunks,
resume_key,
)
.await,
)?;
if !outcome.failed.is_empty() {
warn!(
"Wave {wave_num}/{wave_count}: {} chunk(s) failed to store after retries; \
continuing with remaining waves",
outcome.failed.len()
);
}
total_stored += outcome.stored.len();
stored_addresses.extend(outcome.stored);
failed.extend(outcome.failed);
total_storage += outcome.storage_atto;
total_gas = total_gas.saturating_add(outcome.gas_wei);
agg_stats.chunk_attempts_total = agg_stats
.chunk_attempts_total
.saturating_add(outcome.stats.chunk_attempts_total);
agg_stats
.store_durations_ms
.extend(outcome.stats.store_durations_ms);
for (slot, count) in agg_stats
.retries_histogram
.iter_mut()
.zip(outcome.stats.retries_histogram.iter())
{
*slot = slot.saturating_add(*count);
}
if let Some(tx) = progress {
let _ = tx
.send(UploadEvent::WaveComplete {
wave: wave_num,
total_waves: wave_count,
stored_so_far: total_stored,
total: total_chunks,
})
.await;
}
}
if !failed.is_empty() {
let failed_count = failed.len();
warn!(
"single-node upload incomplete: {failed_count}/{total_chunks} chunks failed after retries"
);
return Err(Error::PartialUpload {
stored: stored_addresses,
stored_count: total_stored,
failed,
failed_count,
total_chunks,
spend: Box::new(PartialUploadSpend {
storage_cost_atto: total_storage.to_string(),
gas_cost_wei: total_gas,
}),
reason: format!("{failed_count} chunk(s) failed to store after retries"),
});
}
Ok((
total_stored,
total_storage.to_string(),
total_gas,
agg_stats,
))
}
async fn upload_waves_merkle(
&self,
spill: &ChunkSpill,
addresses: &[[u8; 32]],
batch_result: &MerkleBatchPaymentResult,
already_stored_addresses: &[[u8; 32]],
progress: Option<&mpsc::Sender<UploadEvent>>,
) -> Result<(usize, String, u128, WaveAggregateStats)> {
let mut total_stored = already_stored_addresses.len();
let total_chunks = total_stored + addresses.len();
let mut stored_addresses: Vec<[u8; 32]> = already_stored_addresses.to_vec();
let mut failed: Vec<([u8; 32], String)> = Vec::new();
let mut deferred: Vec<([u8; 32], String)> = Vec::new();
let mut agg_stats = WaveAggregateStats::default();
let (to_store, missing_proof) =
partition_addresses_by_proof(addresses, &batch_result.proofs);
if !missing_proof.is_empty() {
warn!(
"{} chunk(s) lack a merkle proof (partial payment); reporting them as failed",
missing_proof.len()
);
for addr in &missing_proof {
failed.push((
*addr,
format!("Missing merkle proof for chunk {}", hex::encode(addr)),
));
}
}
let waves: Vec<&[[u8; 32]]> = to_store.chunks(UPLOAD_WAVE_SIZE).collect();
let wave_count = waves.len();
let store_limiter = self.controller().store.clone();
let store_one = |addr: [u8; 32], content: Bytes| {
let limiter = store_limiter.clone();
let proof_bytes = batch_result.proofs.get(&addr).cloned();
async move {
let started = std::time::Instant::now();
let proof = proof_bytes.ok_or_else(|| {
Error::Payment(format!(
"Missing merkle proof for chunk {}",
hex::encode(addr)
))
})?;
let peers = self.close_group_peers(&addr).await?;
observe_op(
&limiter,
|| async move { self.chunk_put_to_close_group(content, proof, &peers).await },
classify_error,
)
.await
.map(|_| started)
}
};
for (wave_idx, wave_addrs) in waves.into_iter().enumerate() {
let wave_num = wave_idx + 1;
let wave = spill.read_wave(wave_addrs)?;
info!(
"Wave {wave_num}/{wave_count}: storing {} chunks (merkle) — {total_stored}/{total_chunks} stored so far",
wave.len()
);
let store_concurrency = store_limiter.current().min(wave.len().max(1));
let chunks: Vec<([u8; 32], Bytes)> = wave
.into_iter()
.map(|(content, addr)| (addr, content))
.collect();
let outcome = merkle_store_with_retry(
chunks,
store_concurrency,
1,
std::time::Duration::ZERO,
progress,
total_stored,
total_chunks,
&store_one,
)
.await?;
stored_addresses.extend(&outcome.stored_addresses);
total_stored = outcome.stored;
agg_stats.chunk_attempts_total = agg_stats
.chunk_attempts_total
.saturating_add(outcome.stats.chunk_attempts_total);
agg_stats
.store_durations_ms
.extend(outcome.stats.store_durations_ms);
for (slot, count) in agg_stats
.retries_histogram
.iter_mut()
.zip(outcome.stats.retries_histogram.iter())
{
*slot = slot.saturating_add(*count);
}
if let Some(e) = outcome.fatal {
warn!("merkle wave {wave_num}/{wave_count} aborted: {e}");
let mut known_failed = failed;
known_failed.extend(outcome.failed_addresses);
known_failed.extend(std::mem::take(&mut deferred));
return Err(partial_upload_after_fatal(
addresses,
stored_addresses,
total_stored,
total_chunks,
known_failed,
PartialUploadSpend {
storage_cost_atto: batch_result.storage_cost_atto.clone(),
gas_cost_wei: batch_result.gas_cost_wei,
},
format!("merkle chunk store aborted: {e}"),
));
}
deferred.extend(outcome.failed_addresses);
if let Some(tx) = progress {
let _ = tx
.send(UploadEvent::WaveComplete {
wave: wave_num,
total_waves: wave_count,
stored_so_far: total_stored,
total: total_chunks,
})
.await;
}
}
if !deferred.is_empty() {
info!(
"Deferring {} merkle chunk(s) short of quorum for concurrent retry after final wave",
deferred.len()
);
let dr = merkle_deferred_retry(
deferred,
&DEFERRED_ROUND_DELAYS_SECS,
UPLOAD_WAVE_SIZE,
|addrs: &[[u8; 32]]| {
spill.read_wave(addrs).map(|wave| {
wave.into_iter()
.map(|(content, addr)| (addr, content))
.collect()
})
},
|n: usize| store_limiter.current().min(n.max(1)),
progress,
total_stored,
total_chunks,
&store_one,
)
.await?;
stored_addresses.extend(dr.stored_addresses);
total_stored = dr.stored;
agg_stats.chunk_attempts_total = agg_stats
.chunk_attempts_total
.saturating_add(dr.stats.chunk_attempts_total);
agg_stats
.store_durations_ms
.extend(dr.stats.store_durations_ms);
for (slot, count) in agg_stats
.retries_histogram
.iter_mut()
.zip(dr.stats.retries_histogram.iter())
{
*slot = slot.saturating_add(*count);
}
if let Some(reason) = dr.fatal {
warn!("merkle deferred retry aborted: {reason}");
let mut known_failed = failed;
known_failed.extend(dr.failed_addresses);
return Err(partial_upload_after_fatal(
addresses,
stored_addresses,
total_stored,
total_chunks,
known_failed,
PartialUploadSpend {
storage_cost_atto: batch_result.storage_cost_atto.clone(),
gas_cost_wei: batch_result.gas_cost_wei,
},
format!("merkle chunk store aborted: {reason}"),
));
}
failed.extend(dr.failed_addresses);
}
if !failed.is_empty() {
let failed_count = failed.len();
let total_attempts = 1 + DEFERRED_ROUND_DELAYS_SECS.len();
warn!(
"merkle upload incomplete: {failed_count}/{total_chunks} chunks short of quorum after retries"
);
return Err(Error::PartialUpload {
stored: stored_addresses,
stored_count: total_stored,
failed,
failed_count,
total_chunks,
spend: Box::new(PartialUploadSpend {
storage_cost_atto: batch_result.storage_cost_atto.clone(),
gas_cost_wei: batch_result.gas_cost_wei,
}),
reason: format!(
"{failed_count} chunk(s) short of quorum after {total_attempts} attempts"
),
});
}
Ok((
total_stored,
batch_result.storage_cost_atto.clone(),
batch_result.gas_cost_wei,
agg_stats,
))
}
pub async fn file_download(&self, data_map: &DataMap, output: &Path) -> Result<u64> {
self.file_download_with_progress(data_map, output, None)
.await
}
pub async fn file_download_from_closest_peers(
&self,
data_map: &DataMap,
output: &Path,
peer_count: NonZeroUsize,
) -> Result<u64> {
self.file_download_with_progress_using_peer_count(data_map, output, None, peer_count.get())
.await
}
pub async fn file_download_with_progress_from_closest_peers(
&self,
data_map: &DataMap,
output: &Path,
progress: Option<mpsc::Sender<DownloadEvent>>,
peer_count: NonZeroUsize,
) -> Result<u64> {
self.file_download_with_progress_using_peer_count(
data_map,
output,
progress,
peer_count.get(),
)
.await
}
async fn download_decrypted_chunks<F, Fut>(
&self,
data_map: &DataMap,
progress: Option<mpsc::Sender<DownloadEvent>>,
peer_count: usize,
mut on_chunk: F,
) -> Result<u64>
where
F: FnMut(Bytes) -> Fut,
Fut: std::future::Future<Output = Result<()>>,
{
let handle = Handle::current();
let root_map = if data_map.is_child() {
let dm_chunks = data_map.len();
if let Some(ref tx) = progress {
let _ = tx.try_send(DownloadEvent::ResolvingDataMap {
total_map_chunks: dm_chunks,
});
}
let resolve_progress = progress.clone();
let resolve_counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let resolved = tokio::task::block_in_place(|| {
let counter_ref = resolve_counter.clone();
let progress_ref = resolve_progress.clone();
let fetch_limiter = self.controller().fetch.clone();
let fetch = |batch: &[(usize, XorName)]| {
let batch_owned: Vec<(usize, XorName)> = batch.to_vec();
let counter = counter_ref.clone();
let prog = progress_ref.clone();
let limiter = fetch_limiter.clone();
handle.block_on(async {
let mut results = rebucketed_unordered(
&limiter,
batch_owned,
|(idx, hash): (usize, XorName)| {
let counter = counter.clone();
let prog = prog.clone();
async move {
let addr = hash.0;
let chunk = self
.chunk_get_observed_from_closest_peers(&addr, peer_count)
.await
.map_err(|e| {
self_encryption::Error::Generic(format!(
"DataMap resolution failed: {e}"
))
})?
.ok_or_else(|| {
self_encryption::Error::Generic(format!(
"DataMap chunk not found: {}",
hex::encode(addr)
))
})?;
let fetched = counter
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
+ 1;
if let Some(ref tx) = prog {
let _ =
tx.try_send(DownloadEvent::MapChunkFetched { fetched });
}
Ok::<_, self_encryption::Error>((idx, chunk.content))
}
},
)
.await?;
results.sort_by_key(|(idx, _)| *idx);
Ok(results)
})
};
get_root_data_map_parallel(data_map.clone(), &fetch)
})
.map_err(|e| Error::Encryption(format!("DataMap resolution failed: {e}")))?;
info!(
"Resolved hierarchical DataMap: {} data chunks",
resolved.len()
);
resolved
} else {
data_map.clone()
};
let total_chunks = root_map.len();
if let Some(ref tx) = progress {
let _ = tx.try_send(DownloadEvent::DataMapResolved { total_chunks });
}
let fetched_counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let fetched_for_closure = fetched_counter.clone();
let progress_for_closure = progress.clone();
let fetch_limiter_outer = self.controller().fetch.clone();
let usable_memory = usable_memory_bytes();
let configured_batch_floor = stream_decrypt_batch_size();
let fetch_cap = fetch_limiter_outer.current();
let decrypt_batch_size = adaptive_stream_decrypt_batch_size(
total_chunks,
fetch_cap,
configured_batch_floor,
usable_memory,
);
info!(
total_chunks,
fetch_cap,
configured_batch_floor,
?usable_memory,
decrypt_batch_size,
"Selected adaptive stream decrypt batch size"
);
let stream = streaming_decrypt_with_batch_size(
&root_map,
|batch: &[(usize, XorName)]| {
let batch_owned: Vec<(usize, XorName)> = batch.to_vec();
let fetched_ref = fetched_for_closure.clone();
let progress_ref = progress_for_closure.clone();
let fetch_limiter = fetch_limiter_outer.clone();
tokio::task::block_in_place(|| {
handle.block_on(async {
type BatchEntry =
(usize, std::result::Result<bytes::Bytes, XorName>);
let raw: Vec<BatchEntry> = rebucketed_unordered(
&fetch_limiter,
batch_owned,
|(idx, hash): (usize, XorName)| {
let fetched_ref = fetched_ref.clone();
let progress_ref = progress_ref.clone();
async move {
let addr = hash.0;
let addr_hex = hex::encode(addr);
match self
.chunk_get_observed_from_closest_peers(&addr, peer_count)
.await
{
Ok(Some(chunk)) => {
let fetched = fetched_ref.fetch_add(
1,
std::sync::atomic::Ordering::Relaxed,
) + 1;
info!("Downloaded {fetched}/{total_chunks}");
if let Some(ref tx) = progress_ref {
let _ = tx.try_send(
DownloadEvent::ChunksFetched {
fetched,
total: total_chunks,
},
);
}
Ok::<BatchEntry, self_encryption::Error>((
idx,
Ok(chunk.content),
))
}
Ok(None) => Ok((idx, Err(hash))),
Err(e) => {
info!(
"First-pass fetch error for {addr_hex}: {e}; deferring"
);
Ok((idx, Err(hash)))
}
}
}
},
)
.await?;
let mut results: Vec<(usize, bytes::Bytes)> = Vec::new();
let mut deferred: Vec<(usize, XorName)> = Vec::new();
for (idx, inner) in raw {
match inner {
Ok(bytes) => results.push((idx, bytes)),
Err(hash) => deferred.push((idx, hash)),
}
}
if !deferred.is_empty() {
const DEFERRED_ROUND_DELAYS_SECS: [u64; 3] = [0, 15, 45];
info!(
"Deferring {} chunk(s) for concurrent retry after batch settles",
deferred.len()
);
let mut remaining = deferred;
for (round, &delay_secs) in DEFERRED_ROUND_DELAYS_SECS
.iter()
.enumerate()
{
if remaining.is_empty() {
break;
}
if delay_secs > 0 {
tokio::time::sleep(std::time::Duration::from_secs(
delay_secs,
))
.await;
}
info!(
"Deferred retry round {}/{}: {} chunk(s)",
round + 1,
DEFERRED_ROUND_DELAYS_SECS.len(),
remaining.len(),
);
let round_input = std::mem::take(&mut remaining);
let round_results: Vec<BatchEntry> = rebucketed_unordered(
&fetch_limiter,
round_input,
|(idx, hash): (usize, XorName)| {
let fetched_ref = fetched_ref.clone();
let progress_ref = progress_ref.clone();
async move {
let addr = hash.0;
match self
.chunk_get_observed_from_closest_peers(
&addr, peer_count,
)
.await
{
Ok(Some(chunk)) => {
let fetched = fetched_ref.fetch_add(
1,
std::sync::atomic::Ordering::Relaxed,
) + 1;
info!(
"Downloaded {fetched}/{total_chunks} (deferred retry)"
);
if let Some(ref tx) = progress_ref {
let _ = tx.try_send(
DownloadEvent::ChunksFetched {
fetched,
total: total_chunks,
},
);
}
Ok::<BatchEntry, self_encryption::Error>((
idx,
Ok(chunk.content),
))
}
Ok(None) => Ok((idx, Err(hash))),
Err(e) => {
info!(
"Deferred retry for {} hit transient error: {e}; re-deferring",
hex::encode(addr)
);
Ok((idx, Err(hash)))
}
}
}
},
)
.await?;
for (idx, inner) in round_results {
match inner {
Ok(bytes) => results.push((idx, bytes)),
Err(hash) => remaining.push((idx, hash)),
}
}
}
if let Some((_, hash)) = remaining.first() {
return Err(self_encryption::Error::Generic(format!(
"Chunk not found after {} deferred retry rounds: {}",
DEFERRED_ROUND_DELAYS_SECS.len(),
hex::encode(hash.0),
)));
}
}
results.sort_by_key(|(idx, _)| *idx);
Ok(results)
})
})
},
decrypt_batch_size,
)
.map_err(|e| Error::Encryption(format!("streaming decrypt failed: {e}")))?;
let mut bytes_total = 0u64;
for chunk_result in stream {
let chunk: Bytes =
chunk_result.map_err(|e| Error::Encryption(format!("decryption failed: {e}")))?;
bytes_total += chunk.len() as u64;
on_chunk(chunk).await?;
}
Ok(bytes_total)
}
pub async fn file_download_with_progress(
&self,
data_map: &DataMap,
output: &Path,
progress: Option<mpsc::Sender<DownloadEvent>>,
) -> Result<u64> {
self.file_download_with_progress_using_peer_count(
data_map,
output,
progress,
self.config().close_group_size,
)
.await
}
async fn file_download_with_progress_using_peer_count(
&self,
data_map: &DataMap,
output: &Path,
progress: Option<mpsc::Sender<DownloadEvent>>,
peer_count: usize,
) -> Result<u64> {
debug!("Downloading file to {}", output.display());
let parent = output.parent().unwrap_or_else(|| Path::new("."));
let unique: u64 = rand::random();
let tmp_path = parent.join(format!(".ant_download_{}_{unique}.tmp", std::process::id()));
let tmp = TempDownload::new(tmp_path);
let mut file = std::fs::File::create(tmp.path())?;
let bytes_written = self
.download_decrypted_chunks(data_map, progress, peer_count, |bytes| {
let r = file.write_all(&bytes).map_err(Error::from);
std::future::ready(r)
})
.await?;
file.flush()?;
drop(file);
tmp.commit(output)?;
info!(
"File downloaded: {bytes_written} bytes written to {}",
output.display()
);
Ok(bytes_written)
}
pub async fn file_download_to_sender(
&self,
data_map: &DataMap,
sink: mpsc::Sender<std::result::Result<Bytes, Error>>,
progress: Option<mpsc::Sender<DownloadEvent>>,
) -> Result<u64> {
let peer_count = self.config().close_group_size;
self.download_decrypted_chunks(data_map, progress, peer_count, |bytes| {
let sink = sink.clone();
async move {
sink.send(Ok(bytes))
.await
.map_err(|_| Error::Cancelled("download stream receiver dropped".into()))
}
})
.await
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
#[test]
fn distributed_sample_indices_spreads_across_large_file() {
assert_eq!(distributed_sample_indices(100, 5), vec![0, 24, 49, 74, 99]);
}
#[test]
fn distributed_sample_indices_covers_whole_small_file() {
assert_eq!(distributed_sample_indices(3, 5), vec![0, 1, 2]);
assert_eq!(distributed_sample_indices(5, 5), vec![0, 1, 2, 3, 4]);
}
#[test]
fn distributed_sample_indices_is_in_range_and_increasing() {
assert!(distributed_sample_indices(0, 5).is_empty());
assert_eq!(distributed_sample_indices(1, 5), vec![0]);
for total in 1..200usize {
let idx = distributed_sample_indices(total, 5);
assert_eq!(*idx.first().unwrap(), 0);
assert_eq!(*idx.last().unwrap(), total - 1);
assert!(idx.iter().all(|&i| i < total));
assert!(idx.windows(2).all(|w| w[0] < w[1]));
}
}
#[test]
fn disk_space_check_passes_for_small_file() {
check_disk_space_for_spill(1024).unwrap();
}
#[test]
fn disk_space_check_fails_for_absurd_size() {
let result = check_disk_space_for_spill(u64::MAX / 2);
assert!(result.is_err());
let err = result.unwrap_err();
assert!(
matches!(err, Error::InsufficientDiskSpace(_)),
"expected InsufficientDiskSpace, got: {err}"
);
}
#[test]
fn adaptive_stream_decrypt_batch_size_tracks_fetch_headroom() {
let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, Some(u64::MAX));
assert_eq!(batch_size, 64 * DOWNLOAD_STREAM_BATCH_FETCH_MULTIPLIER);
}
#[test]
fn adaptive_stream_decrypt_batch_size_caps_to_total_chunks() {
let batch_size = adaptive_stream_decrypt_batch_size(12, 64, 10, Some(u64::MAX));
assert_eq!(batch_size, 12);
}
#[test]
fn adaptive_stream_decrypt_batch_size_honours_configured_floor() {
let batch_size = adaptive_stream_decrypt_batch_size(1_000, 1, 32, None);
assert_eq!(batch_size, 32);
}
#[test]
fn adaptive_stream_decrypt_batch_size_does_not_expand_without_memory_reading() {
let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, None);
assert_eq!(batch_size, 10);
}
#[test]
fn adaptive_stream_decrypt_batch_size_caps_to_memory_budget() {
let estimated_bytes_per_chunk = (self_encryption::MAX_CHUNK_SIZE as u64)
.saturating_mul(DOWNLOAD_STREAM_BATCH_BYTES_PER_CHUNK_MULTIPLIER)
.max(1);
let usable_memory = estimated_bytes_per_chunk
.saturating_mul(16)
.saturating_mul(DOWNLOAD_STREAM_BATCH_MEMORY_BUDGET_DIVISOR);
let batch_size = adaptive_stream_decrypt_batch_size(1_000, 256, 10, Some(usable_memory));
assert_eq!(batch_size, 16);
}
#[test]
fn adaptive_stream_decrypt_batch_size_keeps_one_chunk_when_memory_is_tight() {
let batch_size = adaptive_stream_decrypt_batch_size(1_000, 64, 10, Some(1));
assert_eq!(batch_size, 1);
}
#[test]
fn cached_merkle_covers_only_when_all_addresses_have_proofs() {
let covered = compute_address(&Bytes::from_static(b"covered"));
let extra = compute_address(&Bytes::from_static(b"extra"));
let missing = compute_address(&Bytes::from_static(b"missing"));
let cached = MerkleBatchPaymentResult {
proofs: HashMap::from([(covered, vec![1]), (extra, vec![2])]),
chunk_count: 2,
storage_cost_atto: "0".to_string(),
gas_cost_wei: 0,
merkle_payment_timestamp: 0,
};
assert!(cached_merkle_covers_addresses(&cached, &[covered]));
assert!(cached_merkle_covers_addresses(&cached, &[covered, extra]));
assert!(!cached_merkle_covers_addresses(
&cached,
&[covered, missing]
));
}
#[test]
fn partition_addresses_by_proof_splits_paid_and_unpaid() {
let paid_a = [1u8; 32];
let unpaid_b = [2u8; 32];
let paid_c = [3u8; 32];
let unpaid_d = [4u8; 32];
let proofs: HashMap<[u8; 32], Vec<u8>> =
HashMap::from([(paid_a, vec![0xaa]), (paid_c, vec![0xcc])]);
let (to_store, missing) =
partition_addresses_by_proof(&[paid_a, unpaid_b, paid_c, unpaid_d], &proofs);
assert_eq!(to_store, vec![paid_a, paid_c]);
assert_eq!(missing, vec![unpaid_b, unpaid_d]);
}
#[test]
fn fold_single_wave_keeps_ok_wave() {
let stored = vec![[1u8; 32], [2u8; 32]];
let stats = WaveAggregateStats {
chunk_attempts_total: 7,
..Default::default()
};
let outcome = fold_single_wave(Ok((stored.clone(), "100".to_string(), 9, stats))).unwrap();
assert_eq!(outcome.stored, stored);
assert!(outcome.failed.is_empty());
assert_eq!(outcome.storage_atto.to_string(), "100");
assert_eq!(outcome.gas_wei, 9);
assert_eq!(outcome.stats.chunk_attempts_total, 7);
}
#[test]
fn fold_single_wave_folds_partial_upload() {
let stored = vec![[3u8; 32]];
let failed = vec![([4u8; 32], "short of quorum".to_string())];
let err = Error::PartialUpload {
stored: stored.clone(),
stored_count: 1,
failed: failed.clone(),
failed_count: 1,
total_chunks: 2,
spend: Box::new(PartialUploadSpend {
storage_cost_atto: "250".to_string(),
gas_cost_wei: 11,
}),
reason: "wave store failed after retries".to_string(),
};
let outcome = fold_single_wave(Err(err)).unwrap();
assert_eq!(outcome.stored, stored);
assert_eq!(outcome.failed, failed);
assert_eq!(outcome.storage_atto.to_string(), "250");
assert_eq!(outcome.gas_wei, 11);
assert_eq!(outcome.stats.chunk_attempts_total, 0);
}
#[test]
fn fold_single_wave_propagates_fatal_error() {
let result = fold_single_wave(Err(Error::Payment("wallet unavailable".to_string())));
assert!(
matches!(result, Err(Error::Payment(_))),
"fatal payment error must propagate, got: {result:?}"
);
}
#[test]
fn partition_addresses_by_proof_handles_all_or_nothing() {
let a = [5u8; 32];
let b = [6u8; 32];
let empty: HashMap<[u8; 32], Vec<u8>> = HashMap::new();
let (to_store, missing) = partition_addresses_by_proof(&[a, b], &empty);
assert!(to_store.is_empty());
assert_eq!(missing, vec![a, b]);
let full: HashMap<[u8; 32], Vec<u8>> = HashMap::from([(a, vec![1]), (b, vec![2])]);
let (to_store, missing) = partition_addresses_by_proof(&[a, b], &full);
assert_eq!(to_store, vec![a, b]);
assert!(missing.is_empty());
}
#[test]
fn chunk_spill_round_trip() {
let mut spill = ChunkSpill::new().unwrap();
let data1 = vec![0xAA; 1024];
let data2 = vec![0xBB; 2048];
spill.push(&data1).unwrap();
spill.push(&data2).unwrap();
assert_eq!(spill.len(), 2);
assert_eq!(spill.total_bytes(), 1024 + 2048);
let chunk_entries = spill.chunk_entries().unwrap();
let entry_total: u64 = chunk_entries.iter().map(|(_, size)| *size).sum();
assert_eq!(entry_total, 1024 + 2048);
let chunk1 = spill.read_chunk(spill.addresses.first().unwrap()).unwrap();
assert_eq!(&chunk1[..], &data1[..]);
let chunk2 = spill.read_chunk(spill.addresses.get(1).unwrap()).unwrap();
assert_eq!(&chunk2[..], &data2[..]);
let waves: Vec<_> = spill.addresses.chunks(1).collect();
assert_eq!(waves.len(), 2);
}
#[test]
fn chunk_spill_cleanup_on_drop() {
let dir;
{
let spill = ChunkSpill::new().unwrap();
dir = spill.dir.clone();
assert!(dir.exists());
}
assert!(!dir.exists(), "spill dir should be removed on drop");
}
#[test]
fn chunk_spill_deduplicates_identical_content() {
let mut spill = ChunkSpill::new().unwrap();
let data = vec![0xCC; 512];
spill.push(&data).unwrap();
spill.push(&data).unwrap(); spill.push(&data).unwrap();
assert_eq!(spill.len(), 1, "duplicate chunks should be deduplicated");
assert_eq!(
spill.total_bytes(),
512,
"total_bytes should count unique only"
);
let data2 = vec![0xDD; 256];
spill.push(&data2).unwrap();
assert_eq!(spill.len(), 2);
assert_eq!(spill.total_bytes(), 512 + 256);
}
}
#[cfg(test)]
mod send_assertions {
use super::*;
fn _assert_send<T: Send>(_: &T) {}
#[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
async fn _file_upload_is_send(client: &Client) {
let fut = client.file_upload(Path::new("/dev/null"));
_assert_send(&fut);
}
#[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
async fn _file_upload_with_mode_is_send(client: &Client) {
let fut = client.file_upload_with_mode(Path::new("/dev/null"), PaymentMode::Auto);
_assert_send(&fut);
}
#[allow(
dead_code,
unreachable_code,
unused_variables,
clippy::diverging_sub_expression
)]
async fn _file_download_is_send(client: &Client) {
let dm: DataMap = todo!();
let fut = client.file_download(&dm, Path::new("/dev/null"));
_assert_send(&fut);
}
}