use super::payments::{MerklePaymentError, MerklePaymentReceipt};
use super::upload::MerklePutError;
use crate::Client;
use crate::client::config::{CHUNK_UPLOAD_BATCH_SIZE, UPLOAD_MAX_RETRIES, UPLOAD_RETRY_PAUSE_SECS};
use crate::client::data_types::chunk::DataMapChunk;
use crate::client::files::Metadata;
use crate::self_encryption::{EncryptionStream, MAX_CHUNK_SIZE, encrypt_directory_files};
use ant_evm::merkle_payments::MAX_LEAVES;
use ant_evm::{AttoTokens, EvmWallet};
use ant_protocol::NetworkAddress;
use ant_protocol::storage::{ChunkAddress, DataTypes};
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use thiserror::Error;
use xor_name::XorName;
#[derive(Clone)]
pub enum MerklePaymentOption<'a> {
Wallet(&'a EvmWallet),
Receipt(MerklePaymentReceipt),
ContinueWithReceipt(&'a EvmWallet, MerklePaymentReceipt),
}
#[derive(Debug, Error)]
#[error("{error}")]
pub struct MerkleUploadErrorWithReceipt {
pub receipt: Option<MerklePaymentReceipt>,
#[source]
pub error: MerkleUploadError,
}
impl MerkleUploadErrorWithReceipt {
fn new(receipt: MerklePaymentReceipt, kind: MerkleUploadError) -> Self {
let receipt = if receipt.proofs.is_empty() {
None } else {
Some(receipt) };
Self {
receipt,
error: kind,
}
}
fn encryption(receipt: MerklePaymentReceipt, msg: String) -> Self {
Self::new(receipt, MerkleUploadError::Encryption(msg))
}
fn payment(receipt: MerklePaymentReceipt, err: MerklePaymentError) -> Self {
Self::new(receipt, MerkleUploadError::Payment(err))
}
fn upload(receipt: MerklePaymentReceipt, err: MerklePutError) -> Self {
Self::new(receipt, MerkleUploadError::Upload(err))
}
}
#[derive(Debug, Error)]
pub enum MerkleUploadError {
#[error("Encryption error: {0}")]
Encryption(String),
#[error("Payment error: {0}")]
Payment(MerklePaymentError),
#[error("Upload error: {0}")]
Upload(MerklePutError),
}
impl Client {
async fn files_put_with_merkle_payment_internal(
&self,
path: PathBuf,
is_public: bool,
wallet: Option<&EvmWallet>,
mut receipt: MerklePaymentReceipt,
) -> Result<(AttoTokens, Vec<(PathBuf, DataMapChunk, Metadata)>), MerkleUploadErrorWithReceipt>
{
debug!("merkle payment: starting for path: {path:?}, is_public: {is_public}");
if let Some(w) = wallet
&& w.network() != self.evm_network()
{
return Err(MerkleUploadErrorWithReceipt::payment(
receipt,
MerklePaymentError::EvmWalletNetworkMismatch,
));
}
crate::loud_info!("Encrypting files a first time to create the Merkle Tree(s)...");
let (all_xor_names, file_chunk_counts, first_pass_results) = self
.collect_xornames_from_dir(path.clone(), is_public)
.await
.map_err(|e| MerkleUploadErrorWithReceipt::encryption(receipt.clone(), e))?;
receipt.file_chunk_counts = file_chunk_counts;
let total_files = receipt.file_chunk_counts.len();
let total_chunks = all_xor_names.len();
info!("Collected {total_chunks} XorNames from {total_files} files");
let (mut already_exist, mut xor_to_pay_ordered) = self
.split_existing_and_new_chunks(all_xor_names, &receipt)
.await;
let to_pay_len = xor_to_pay_ordered.len();
let already_paid_count = already_exist.len();
receipt.add_already_existed(already_exist.iter().copied());
if !receipt.proofs.is_empty() || !receipt.already_existed.is_empty() {
self.send_merkle_batch_payment_complete(&receipt).await;
}
if to_pay_len == 0 {
crate::loud_info!(
"✓ All {total_chunks} chunks already exist on the network, nothing to upload"
);
self.send_upload_complete(
receipt.proofs.len(),
already_paid_count,
receipt.amount_paid.as_atto(),
)
.await;
return Ok((receipt.amount_paid, first_pass_results));
} else {
std::mem::drop(first_pass_results);
}
let num_batches = to_pay_len.div_ceil(MAX_LEAVES);
let mut batches: Vec<Vec<XorName>> = Vec::with_capacity(num_batches);
while !xor_to_pay_ordered.is_empty() {
let drain_count = std::cmp::min(MAX_LEAVES, xor_to_pay_ordered.len());
batches.push(xor_to_pay_ordered.drain(..drain_count).collect());
}
info!("Split into {num_batches} Merkle Tree(s) of up to {MAX_LEAVES} chunks each");
crate::loud_info!(
"🚀 Starting upload of {to_pay_len} chunks in {num_batches} Merkle Tree(s)..."
);
let mut streams: Vec<EncryptionStream> = encrypt_directory_files(path, is_public)
.await
.map_err(|e| MerkleUploadErrorWithReceipt::encryption(receipt.clone(), e.to_string()))?
.into_iter()
.map(|stream| {
stream.map_err(|e| MerkleUploadErrorWithReceipt::encryption(receipt.clone(), e))
})
.collect::<Result<Vec<EncryptionStream>, MerkleUploadErrorWithReceipt>>()?;
let mut results: Vec<(PathBuf, DataMapChunk, Metadata)> = Vec::new();
for (batch_idx, batch_xornames) in batches.into_iter().enumerate() {
let batch_num = batch_idx + 1;
let batch_size = batch_xornames.len();
info!("Processing batch {batch_num}/{num_batches} ({batch_size} chunks)");
let needs_payment = batch_xornames
.iter()
.any(|xn| !receipt.proofs.contains_key(xn));
if needs_payment {
receipt = self
.pay_for_merkle_tree_batch(
wallet,
batch_xornames,
receipt.clone(),
batch_num,
num_batches,
)
.await
.map_err(|kind| MerkleUploadErrorWithReceipt::new(receipt.clone(), kind))?;
self.send_merkle_batch_payment_complete(&receipt).await;
}
crate::loud_info!(
"🌳 Merkle Tree {batch_num}/{num_batches}: Uploading {batch_size} chunks..."
);
let already_exist_before_batch: HashSet<XorName> =
already_exist.iter().copied().collect();
let upload_result = self
.upload_batch_with_merkle(streams, &receipt, &mut already_exist, batch_size)
.await
.map_err(|err| MerkleUploadErrorWithReceipt::upload(receipt.clone(), err))?;
streams = upload_result.streams;
results.extend(upload_result.completed_files);
if !upload_result.failed_chunks.is_empty() {
let max_retries = if self.retry_failed == 0 {
UPLOAD_MAX_RETRIES
} else {
self.retry_failed as usize
};
let remaining_failures = self
.retry_failed_merkle_chunks(
upload_result.failed_chunks,
&receipt,
&mut already_exist,
max_retries,
UPLOAD_RETRY_PAUSE_SECS,
)
.await
.map_err(|err| MerkleUploadErrorWithReceipt::upload(receipt.clone(), err))?;
if !remaining_failures.is_empty() {
let failed_count = remaining_failures.len();
error!("{failed_count} chunks failed after {max_retries} retries");
return Err(MerkleUploadErrorWithReceipt::upload(
receipt,
MerklePutError::Batch(super::upload::MerkleBatchUploadState {
failed: remaining_failures,
}),
));
}
}
let newly_uploaded = already_exist
.difference(&already_exist_before_batch)
.copied()
.collect::<Vec<_>>();
if !newly_uploaded.is_empty() {
receipt.add_uploaded(newly_uploaded);
self.send_merkle_batch_payment_complete(&receipt).await;
}
info!(
"Batch {batch_num}/{num_batches} complete, {} files finished so far",
results.len()
);
}
for mut stream in streams {
let datamap = if let Some(datamap) = stream.data_map_chunk() {
datamap
} else {
while stream.next_batch(16).is_some() {}
stream
.data_map_chunk()
.ok_or(MerkleUploadErrorWithReceipt::upload(
receipt.clone(),
MerklePutError::StreamShouldHaveDatamap,
))?
};
results.push((
stream.relative_path.clone(),
datamap,
stream.metadata.clone(),
));
if let Some(public_addr) = stream.data_address() {
let path = &stream.relative_path;
let f = results.len();
crate::loud_info!(
"[File {f}/{total_files}] ({path:?}) is now available at: {public_addr:?}"
);
}
}
crate::loud_info!("✓ All {total_chunks} chunks uploaded successfully!");
self.send_upload_complete(
receipt.proofs.len(),
already_paid_count,
receipt.amount_paid.as_atto(),
)
.await;
Ok((receipt.amount_paid, results))
}
pub(crate) async fn files_put_with_merkle_payment(
&self,
path: PathBuf,
is_public: bool,
payment: MerklePaymentOption<'_>,
) -> Result<(AttoTokens, Vec<(PathBuf, DataMapChunk, Metadata)>), MerkleUploadErrorWithReceipt>
{
debug!(
"merkle payment: files_put starting upload for path: {path:?}, is_public: {is_public}"
);
match payment {
MerklePaymentOption::Wallet(wallet) => {
self.files_put_with_merkle_payment_internal(
path,
is_public,
Some(wallet),
MerklePaymentReceipt::default(),
)
.await
}
MerklePaymentOption::Receipt(receipt) => {
self.files_put_with_merkle_payment_internal(path, is_public, None, receipt)
.await
}
MerklePaymentOption::ContinueWithReceipt(wallet, receipt) => {
self.files_put_with_merkle_payment_internal(path, is_public, Some(wallet), receipt)
.await
}
}
}
async fn collect_xornames_from_dir(
&self,
path: PathBuf,
is_public: bool,
) -> Result<
(
Vec<XorName>,
HashMap<String, usize>,
Vec<(PathBuf, DataMapChunk, Metadata)>,
),
String,
> {
let streams: Vec<EncryptionStream> = encrypt_directory_files(path, is_public)
.await
.map_err(|e| e.to_string())?
.into_iter()
.collect::<Result<Vec<EncryptionStream>, String>>()?;
let mut all_xor_names = Vec::new();
let mut file_chunk_counts = HashMap::new();
let mut file_results = Vec::new();
for stream in streams {
let file_path = stream.file_path.clone();
let (xor_names, relative_path, datamap, metadata) =
collect_xor_names_from_stream(stream)?;
file_chunk_counts.insert(file_path, xor_names.len());
all_xor_names.extend(xor_names);
file_results.push((relative_path, datamap, metadata));
}
Ok((all_xor_names, file_chunk_counts, file_results))
}
async fn split_existing_and_new_chunks(
&self,
xornames: Vec<XorName>,
receipt: &MerklePaymentReceipt,
) -> (HashSet<XorName>, Vec<XorName>) {
let mut seen: HashSet<XorName> = HashSet::new();
let unique_ordered: Vec<XorName> =
xornames.into_iter().filter(|xn| seen.insert(*xn)).collect();
let total = unique_ordered.len();
let mut known_existing: HashSet<XorName> = HashSet::new();
let mut already_paid: HashSet<XorName> = HashSet::new();
let mut need_check: Vec<XorName> = Vec::new();
for xn in &unique_ordered {
if receipt.already_existed.contains(xn) || receipt.uploaded.contains(xn) {
known_existing.insert(*xn);
} else if receipt.proofs.contains_key(xn) {
already_paid.insert(*xn);
} else {
need_check.push(*xn);
}
}
let known_count = known_existing.len();
let paid_count = already_paid.len();
let check_count = need_check.len();
if known_count > 0 || paid_count > 0 {
crate::loud_info!(
"Resuming: {known_count} chunks known to exist (incl. previously uploaded), {paid_count} already paid, {check_count} need checking"
);
}
if need_check.is_empty() {
crate::loud_info!("All {total} chunks accounted for (no network check needed)");
let new_chunks: Vec<XorName> = unique_ordered
.into_iter()
.filter(|xn| !known_existing.contains(xn))
.collect();
return (known_existing, new_chunks);
}
crate::loud_info!("Checking {check_count} chunks for existence on the network...");
let addresses: Vec<NetworkAddress> = need_check
.iter()
.map(|xn| NetworkAddress::from(ChunkAddress::new(*xn)))
.collect();
let batch_size = std::cmp::max(16, *CHUNK_UPLOAD_BATCH_SIZE);
let existing_addrs = self.check_records_exist_batch(&addresses, batch_size).await;
let newly_found_existing: HashSet<XorName> = existing_addrs
.into_iter()
.filter_map(|addr| addr.xorname())
.collect();
let mut all_existing = known_existing;
all_existing.extend(newly_found_existing.iter().copied());
let existing_count = all_existing.len();
let newly_found_count = newly_found_existing.len();
crate::loud_info!(
"Found {newly_found_count} more chunks on network. Total existing: {existing_count}/{total}"
);
let new_chunks: Vec<XorName> = unique_ordered
.into_iter()
.filter(|xn| !all_existing.contains(xn))
.collect();
(all_existing, new_chunks)
}
async fn pay_for_merkle_tree_batch(
&self,
wallet: Option<&EvmWallet>,
batch_xornames: Vec<XorName>,
mut receipt: MerklePaymentReceipt,
batch_num: usize,
num_batches: usize,
) -> Result<MerklePaymentReceipt, MerkleUploadError> {
let w = wallet.ok_or_else(|| {
let missing_xn = batch_xornames
.iter()
.find(|xn| !receipt.proofs.contains_key(xn))
.copied()
.unwrap_or_default();
MerkleUploadError::Upload(MerklePutError::MissingPaymentProofFor(missing_xn))
})?;
let batch_size = batch_xornames.len();
crate::loud_info!(
"💸 Merkle Tree {batch_num}/{num_batches}: Paying for {batch_size} chunks..."
);
let batch_receipt = self
.pay_for_single_merkle_batch(DataTypes::Chunk, batch_xornames, MAX_CHUNK_SIZE, w)
.await
.map_err(MerkleUploadError::Payment)?;
receipt.merge(batch_receipt);
Ok(receipt)
}
}
fn collect_xor_names_from_stream(
mut encryption_stream: EncryptionStream,
) -> Result<(Vec<XorName>, PathBuf, DataMapChunk, Metadata), String> {
let mut xor_names: Vec<XorName> = Vec::new();
let xorname_collection_batch_size: usize = std::cmp::max(32, *CHUNK_UPLOAD_BATCH_SIZE);
let mut total = 0;
let estimated_total = encryption_stream.total_chunks();
let file_path = encryption_stream.file_path.clone();
let start = std::time::Instant::now();
crate::loud_debug!("Begin encrypting ~{estimated_total} chunks from {file_path}...");
while let Some(batch) = encryption_stream.next_batch(xorname_collection_batch_size) {
let batch_len = batch.len();
total += batch_len;
for chunk in batch {
xor_names.push(*chunk.name());
}
crate::loud_debug!(
"Encrypted {total}/{estimated_total} chunks in {:?}",
start.elapsed()
);
}
let datamap = encryption_stream
.data_map_chunk()
.ok_or_else(|| format!("No datamap available for {file_path}"))?;
Ok((
xor_names,
encryption_stream.relative_path,
datamap,
encryption_stream.metadata,
))
}