use super::payments::{MerklePaymentError, MerklePaymentReceipt};
use super::upload::MerklePutError;
use crate::Client;
use crate::client::config::CHUNK_UPLOAD_BATCH_SIZE;
use crate::client::data_types::chunk::DataMapChunk;
use crate::client::files::Metadata;
use crate::client::{ClientEvent, UploadSummary};
use crate::self_encryption::{EncryptionStream, MAX_CHUNK_SIZE, encrypt_directory_files};
use ant_evm::merkle_payments::MAX_LEAVES;
use ant_evm::{AttoTokens, EvmWallet};
use ant_protocol::NetworkAddress;
use ant_protocol::storage::{ChunkAddress, DataTypes};
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use thiserror::Error;
use xor_name::XorName;
#[derive(Clone)]
pub enum MerklePaymentOption<'a> {
Wallet(&'a EvmWallet),
Receipt(MerklePaymentReceipt),
ContinueWithReceipt(&'a EvmWallet, MerklePaymentReceipt),
}
#[derive(Debug, Error)]
#[error("{error}")]
pub struct MerkleUploadErrorWithReceipt {
pub receipt: Option<MerklePaymentReceipt>,
#[source]
pub error: MerkleUploadError,
}
impl MerkleUploadErrorWithReceipt {
fn new(receipt: MerklePaymentReceipt, kind: MerkleUploadError) -> Self {
let receipt = if receipt.proofs.is_empty() {
None } else {
Some(receipt) };
Self {
receipt,
error: kind,
}
}
fn encryption(receipt: MerklePaymentReceipt, msg: String) -> Self {
Self::new(receipt, MerkleUploadError::Encryption(msg))
}
fn payment(receipt: MerklePaymentReceipt, err: MerklePaymentError) -> Self {
Self::new(receipt, MerkleUploadError::Payment(err))
}
fn upload(receipt: MerklePaymentReceipt, err: MerklePutError) -> Self {
Self::new(receipt, MerkleUploadError::Upload(err))
}
}
#[derive(Debug, Error)]
pub enum MerkleUploadError {
#[error("Encryption error: {0}")]
Encryption(String),
#[error("Payment error: {0}")]
Payment(MerklePaymentError),
#[error("Upload error: {0}")]
Upload(MerklePutError),
}
impl Client {
pub async fn file_cost_merkle(
&self,
path: PathBuf,
is_public: bool,
wallet: &EvmWallet,
) -> Result<AttoTokens, MerkleUploadError> {
debug!(
"merkle payment: file_cost_merkle starting for path: {path:?}, is_public: {is_public}"
);
if wallet.network() != self.evm_network() {
return Err(MerkleUploadError::Payment(
MerklePaymentError::EvmWalletNetworkMismatch,
));
}
#[cfg(feature = "loud")]
println!("Encrypting files to calculate cost...");
let (all_xor_names, _file_chunk_counts, _file_results) = self
.collect_xornames_from_dir(path, is_public)
.await
.map_err(MerkleUploadError::Encryption)?;
let total_chunks = all_xor_names.len();
debug!("merkle payment: file_cost_merkle total chunks: {total_chunks}");
#[cfg(feature = "loud")]
println!("Encrypted into {total_chunks} chunks");
let batches: Vec<Vec<XorName>> = all_xor_names
.chunks(MAX_LEAVES)
.map(|c| c.to_vec())
.collect();
let num_batches = batches.len();
#[cfg(feature = "loud")]
println!("Estimating cost for {num_batches} batch(es)...");
let mut total_cost = ant_evm::U256::ZERO;
for (batch_idx, batch_xornames) in batches.into_iter().enumerate() {
let batch_num = batch_idx + 1;
debug!("Estimating batch {batch_num}/{num_batches}");
let (tree, _candidate_pools, pool_commitments, merkle_payment_timestamp) = self
.prepare_merkle_batch(DataTypes::Chunk, batch_xornames, MAX_CHUNK_SIZE)
.await
.map_err(MerkleUploadError::Payment)?;
let batch_cost = wallet
.estimate_merkle_payment_cost(
tree.depth(),
&pool_commitments,
merkle_payment_timestamp,
)
.await
.map_err(|e| MerkleUploadError::Payment(MerklePaymentError::EvmWalletError(e)))?;
total_cost = total_cost.saturating_add(batch_cost);
}
let estimated_cost = AttoTokens::from_atto(total_cost);
debug!("merkle payment: file_cost_merkle estimated total cost: {estimated_cost}");
#[cfg(feature = "loud")]
println!("Total estimated cost: {estimated_cost}");
Ok(estimated_cost)
}
async fn files_put_with_merkle_payment_internal(
&self,
path: PathBuf,
is_public: bool,
wallet: Option<&EvmWallet>,
mut receipt: MerklePaymentReceipt,
) -> Result<(AttoTokens, Vec<(PathBuf, DataMapChunk, Metadata)>), MerkleUploadErrorWithReceipt>
{
debug!("merkle payment: starting for path: {path:?}, is_public: {is_public}");
if let Some(w) = wallet
&& w.network() != self.evm_network()
{
return Err(MerkleUploadErrorWithReceipt::payment(
receipt,
MerklePaymentError::EvmWalletNetworkMismatch,
));
}
#[cfg(feature = "loud")]
println!("Encrypting files a first time to create the Merkle Tree(s)...");
let (all_xor_names, file_chunk_counts, first_pass_results) = self
.collect_xornames_from_dir(path.clone(), is_public)
.await
.map_err(|e| MerkleUploadErrorWithReceipt::encryption(receipt.clone(), e))?;
receipt.file_chunk_counts = file_chunk_counts;
let total_files = receipt.file_chunk_counts.len();
let total_chunks = all_xor_names.len();
info!("Collected {total_chunks} XorNames from {total_files} files");
let (mut already_exist, mut xor_to_pay_ordered) =
self.split_existing_and_new_chunks(all_xor_names).await;
let to_pay_len = xor_to_pay_ordered.len();
let already_paid_count = already_exist.len();
if to_pay_len == 0 {
#[cfg(feature = "loud")]
println!("✓ All {total_chunks} chunks already exist on the network, nothing to upload");
self.send_upload_complete_event(&receipt, already_paid_count)
.await;
return Ok((receipt.amount_paid, first_pass_results));
} else {
std::mem::drop(first_pass_results);
}
let num_batches = to_pay_len.div_ceil(MAX_LEAVES);
let mut batches: Vec<Vec<XorName>> = Vec::with_capacity(num_batches);
while !xor_to_pay_ordered.is_empty() {
let drain_count = std::cmp::min(MAX_LEAVES, xor_to_pay_ordered.len());
batches.push(xor_to_pay_ordered.drain(..drain_count).collect());
}
info!("Split into {num_batches} Merkle Tree(s) of up to {MAX_LEAVES} chunks each");
#[cfg(feature = "loud")]
println!("🚀 Starting upload of {to_pay_len} chunks in {num_batches} Merkle Tree(s)...");
let mut streams: Vec<EncryptionStream> = encrypt_directory_files(path, is_public)
.await
.map_err(|e| MerkleUploadErrorWithReceipt::encryption(receipt.clone(), e.to_string()))?
.into_iter()
.map(|stream| {
stream.map_err(|e| MerkleUploadErrorWithReceipt::encryption(receipt.clone(), e))
})
.collect::<Result<Vec<EncryptionStream>, MerkleUploadErrorWithReceipt>>()?;
let mut results: Vec<(PathBuf, DataMapChunk, Metadata)> = Vec::new();
for (batch_idx, batch_xornames) in batches.into_iter().enumerate() {
let batch_num = batch_idx + 1;
let batch_size = batch_xornames.len();
info!("Processing batch {batch_num}/{num_batches} ({batch_size} chunks)");
let needs_payment = batch_xornames
.iter()
.any(|xn| !receipt.proofs.contains_key(xn));
if needs_payment {
receipt = self
.pay_for_merkle_tree_batch(
wallet,
batch_xornames,
receipt.clone(),
batch_num,
num_batches,
)
.await
.map_err(|kind| MerkleUploadErrorWithReceipt::new(receipt.clone(), kind))?;
}
#[cfg(feature = "loud")]
println!("🌳 Merkle Tree {batch_num}/{num_batches}: Uploading {batch_size} chunks...");
let upload_result = self
.upload_batch_with_merkle(streams, &receipt, &mut already_exist, batch_size)
.await
.map_err(|err| MerkleUploadErrorWithReceipt::upload(receipt.clone(), err))?;
streams = upload_result.streams;
results.extend(upload_result.completed_files);
if !upload_result.failed_chunks.is_empty() {
const MAX_RETRIES: usize = 3;
const RETRY_PAUSE_SECS: u64 = 60;
let remaining_failures = self
.retry_failed_merkle_chunks(
upload_result.failed_chunks,
&receipt,
&mut already_exist,
MAX_RETRIES,
RETRY_PAUSE_SECS,
)
.await
.map_err(|err| MerkleUploadErrorWithReceipt::upload(receipt.clone(), err))?;
if !remaining_failures.is_empty() {
let failed_count = remaining_failures.len();
error!("{failed_count} chunks failed after {MAX_RETRIES} retries");
return Err(MerkleUploadErrorWithReceipt::upload(
receipt,
MerklePutError::Batch(super::upload::MerkleBatchUploadState {
failed: remaining_failures,
}),
));
}
}
info!(
"Batch {batch_num}/{num_batches} complete, {} files finished so far",
results.len()
);
}
for mut stream in streams {
let datamap = if let Some(datamap) = stream.data_map_chunk() {
datamap
} else {
while stream.next_batch(16).is_some() {}
stream
.data_map_chunk()
.ok_or(MerkleUploadErrorWithReceipt::upload(
receipt.clone(),
MerklePutError::StreamShouldHaveDatamap,
))?
};
results.push((
stream.relative_path.clone(),
datamap,
stream.metadata.clone(),
));
if let Some(public_addr) = stream.data_address() {
let path = &stream.relative_path;
let f = results.len();
debug!("[File {f}/{total_files}] ({path:?}) is now available at: {public_addr:?}");
#[cfg(feature = "loud")]
println!(
"[File {f}/{total_files}] ({path:?}) is now available at: {public_addr:?}"
);
}
}
debug!(
"merkle payment: {total_chunks} chunks uploaded for {total_files} files successfully"
);
#[cfg(feature = "loud")]
println!("✓ All {total_chunks} chunks uploaded successfully!");
self.send_upload_complete_event(&receipt, already_paid_count)
.await;
Ok((receipt.amount_paid, results))
}
pub async fn files_put_with_merkle_payment(
&self,
path: PathBuf,
is_public: bool,
payment: MerklePaymentOption<'_>,
) -> Result<(AttoTokens, Vec<(PathBuf, DataMapChunk, Metadata)>), MerkleUploadErrorWithReceipt>
{
debug!(
"merkle payment: files_put starting upload for path: {path:?}, is_public: {is_public}"
);
match payment {
MerklePaymentOption::Wallet(wallet) => {
self.files_put_with_merkle_payment_internal(
path,
is_public,
Some(wallet),
MerklePaymentReceipt::default(),
)
.await
}
MerklePaymentOption::Receipt(receipt) => {
self.files_put_with_merkle_payment_internal(path, is_public, None, receipt)
.await
}
MerklePaymentOption::ContinueWithReceipt(wallet, receipt) => {
self.files_put_with_merkle_payment_internal(path, is_public, Some(wallet), receipt)
.await
}
}
}
async fn collect_xornames_from_dir(
&self,
path: PathBuf,
is_public: bool,
) -> Result<
(
Vec<XorName>,
HashMap<String, usize>,
Vec<(PathBuf, DataMapChunk, Metadata)>,
),
String,
> {
let streams: Vec<EncryptionStream> = encrypt_directory_files(path, is_public)
.await
.map_err(|e| e.to_string())?
.into_iter()
.collect::<Result<Vec<EncryptionStream>, String>>()?;
let mut all_xor_names = Vec::new();
let mut file_chunk_counts = HashMap::new();
let mut file_results = Vec::new();
for stream in streams {
let file_path = stream.file_path.clone();
let (xor_names, relative_path, datamap, metadata) =
collect_xor_names_from_stream(stream)?;
file_chunk_counts.insert(file_path, xor_names.len());
all_xor_names.extend(xor_names);
file_results.push((relative_path, datamap, metadata));
}
Ok((all_xor_names, file_chunk_counts, file_results))
}
async fn split_existing_and_new_chunks(
&self,
xornames: Vec<XorName>,
) -> (HashSet<XorName>, Vec<XorName>) {
#[cfg(feature = "loud")]
println!("Checking for existing chunks on the network...");
debug!("Checking for existing chunks on the network...");
let mut seen: HashSet<XorName> = HashSet::new();
let unique_ordered: Vec<XorName> =
xornames.into_iter().filter(|xn| seen.insert(*xn)).collect();
let total = unique_ordered.len();
let addresses: Vec<NetworkAddress> = unique_ordered
.iter()
.map(|xn| NetworkAddress::from(ChunkAddress::new(*xn)))
.collect();
let batch_size = std::cmp::max(16, *CHUNK_UPLOAD_BATCH_SIZE);
let existing_addrs = self.check_records_exist_batch(&addresses, batch_size).await;
let existing_set: HashSet<XorName> = existing_addrs
.into_iter()
.filter_map(|addr| addr.xorname())
.collect();
let existing_count = existing_set.len();
info!("Found {existing_count}/{total} unique chunks already exist on the network");
#[cfg(feature = "loud")]
println!("Found {existing_count}/{total} unique chunks already exist on the network");
let new_chunks: Vec<XorName> = unique_ordered
.into_iter()
.filter(|xn| !existing_set.contains(xn))
.collect();
(existing_set, new_chunks)
}
async fn pay_for_merkle_tree_batch(
&self,
wallet: Option<&EvmWallet>,
batch_xornames: Vec<XorName>,
mut receipt: MerklePaymentReceipt,
batch_num: usize,
num_batches: usize,
) -> Result<MerklePaymentReceipt, MerkleUploadError> {
let w = wallet.ok_or_else(|| {
let missing_xn = batch_xornames
.iter()
.find(|xn| !receipt.proofs.contains_key(xn))
.copied()
.unwrap_or_default();
MerkleUploadError::Upload(MerklePutError::MissingPaymentProofFor(missing_xn))
})?;
let batch_size = batch_xornames.len();
debug!("Merkle Tree {batch_num}/{num_batches}: Paying for {batch_size} chunks...");
#[cfg(feature = "loud")]
println!("💸 Merkle Tree {batch_num}/{num_batches}: Paying for {batch_size} chunks...");
let batch_receipt = self
.pay_for_single_merkle_batch(DataTypes::Chunk, batch_xornames, MAX_CHUNK_SIZE, w)
.await
.map_err(MerkleUploadError::Payment)?;
receipt.merge(batch_receipt);
Ok(receipt)
}
async fn send_upload_complete_event(
&self,
receipt: &MerklePaymentReceipt,
records_already_paid: usize,
) {
let records_paid = receipt.proofs.len();
if let Some(sender) = &self.client_event_sender {
let summary = UploadSummary {
records_paid,
records_already_paid,
tokens_spent: receipt.amount_paid.as_atto(),
};
if let Err(err) = sender.send(ClientEvent::UploadComplete(summary)).await {
error!("Failed to send upload completion event: {err:?}");
}
}
}
}
fn collect_xor_names_from_stream(
mut encryption_stream: EncryptionStream,
) -> Result<(Vec<XorName>, PathBuf, DataMapChunk, Metadata), String> {
let mut xor_names: Vec<XorName> = Vec::new();
let xorname_collection_batch_size: usize = std::cmp::max(32, *CHUNK_UPLOAD_BATCH_SIZE);
let mut total = 0;
let estimated_total = encryption_stream.total_chunks();
let file_path = encryption_stream.file_path.clone();
let start = std::time::Instant::now();
#[cfg(feature = "loud")]
println!("Begin encrypting ~{estimated_total} chunks from {file_path}...");
debug!("Begin encrypting ~{estimated_total} chunks from {file_path}...");
while let Some(batch) = encryption_stream.next_batch(xorname_collection_batch_size) {
let batch_len = batch.len();
total += batch_len;
for chunk in batch {
xor_names.push(*chunk.name());
}
#[cfg(feature = "loud")]
println!(
"Encrypted {total}/{estimated_total} chunks in {:?}",
start.elapsed()
);
debug!(
"Encrypted {total}/{estimated_total} chunks in {:?}",
start.elapsed()
);
}
let datamap = encryption_stream
.data_map_chunk()
.ok_or_else(|| format!("No datamap available for {file_path}"))?;
Ok((
xor_names,
encryption_stream.relative_path,
datamap,
encryption_stream.metadata,
))
}