use super::payments::{MerklePaymentError, MerklePaymentReceipt};
use super::upload::MerklePutError;
use crate::Client;
use crate::client::config::CHUNK_UPLOAD_BATCH_SIZE;
use crate::client::data_types::chunk::DataMapChunk;
use crate::client::files::Metadata;
use crate::client::{ClientEvent, UploadSummary};
use crate::self_encryption::{EncryptionStream, MAX_CHUNK_SIZE, encrypt_directory_files};
use ant_evm::merkle_payments::MAX_LEAVES;
use ant_evm::{AttoTokens, EvmWallet};
use ant_protocol::storage::DataTypes;
use std::collections::HashMap;
use std::path::PathBuf;
use thiserror::Error;
use tracing::{debug, info};
use xor_name::XorName;
#[derive(Clone)]
pub enum MerklePaymentOption<'a> {
Wallet(&'a EvmWallet),
Receipt(MerklePaymentReceipt),
ContinueWithReceipt(&'a EvmWallet, MerklePaymentReceipt),
}
#[derive(Debug, Error)]
#[error("{error}")]
pub struct MerkleUploadErrorWithReceipt {
pub receipt: Option<MerklePaymentReceipt>,
#[source]
pub error: MerkleUploadError,
}
impl MerkleUploadErrorWithReceipt {
fn new(receipt: MerklePaymentReceipt, kind: MerkleUploadError) -> Self {
let receipt = if receipt.proofs.is_empty() {
None } else {
Some(receipt) };
Self {
receipt,
error: kind,
}
}
fn encryption(receipt: MerklePaymentReceipt, msg: String) -> Self {
Self::new(receipt, MerkleUploadError::Encryption(msg))
}
fn payment(receipt: MerklePaymentReceipt, err: MerklePaymentError) -> Self {
Self::new(receipt, MerkleUploadError::Payment(err))
}
fn upload(receipt: MerklePaymentReceipt, err: MerklePutError) -> Self {
Self::new(receipt, MerkleUploadError::Upload(err))
}
}
#[derive(Debug, Error)]
pub enum MerkleUploadError {
#[error("Encryption error: {0}")]
Encryption(String),
#[error("Payment error: {0}")]
Payment(MerklePaymentError),
#[error("Upload error: {0}")]
Upload(MerklePutError),
}
impl Client {
pub async fn file_cost_merkle(
&self,
path: PathBuf,
is_public: bool,
wallet: &EvmWallet,
) -> Result<AttoTokens, MerkleUploadError> {
debug!(
"merkle payment: file_cost_merkle starting for path: {path:?}, is_public: {is_public}"
);
if wallet.network() != self.evm_network() {
return Err(MerkleUploadError::Payment(
MerklePaymentError::EvmWalletNetworkMismatch,
));
}
#[cfg(feature = "loud")]
println!("Encrypting files to calculate cost...");
let (all_xor_names, _file_chunk_counts) = self
.collect_xornames_from_dir(path, is_public)
.await
.map_err(MerkleUploadError::Encryption)?;
let total_chunks = all_xor_names.len();
debug!("merkle payment: file_cost_merkle total chunks: {total_chunks}");
#[cfg(feature = "loud")]
println!("Encrypted into {total_chunks} chunks");
let batches: Vec<Vec<XorName>> = all_xor_names
.chunks(MAX_LEAVES)
.map(|c| c.to_vec())
.collect();
let num_batches = batches.len();
#[cfg(feature = "loud")]
println!("Estimating cost for {num_batches} batch(es)...");
let mut total_cost = ant_evm::U256::ZERO;
for (batch_idx, batch_xornames) in batches.into_iter().enumerate() {
let batch_num = batch_idx + 1;
debug!("Estimating batch {batch_num}/{num_batches}");
let (tree, _candidate_pools, pool_commitments, merkle_payment_timestamp) = self
.prepare_merkle_batch(DataTypes::Chunk, batch_xornames, MAX_CHUNK_SIZE)
.await
.map_err(MerkleUploadError::Payment)?;
let batch_cost = wallet
.estimate_merkle_payment_cost(
tree.depth(),
&pool_commitments,
merkle_payment_timestamp,
)
.await
.map_err(|e| MerkleUploadError::Payment(MerklePaymentError::EvmWalletError(e)))?;
total_cost = total_cost.saturating_add(batch_cost);
}
let estimated_cost = AttoTokens::from_atto(total_cost);
debug!("merkle payment: file_cost_merkle estimated total cost: {estimated_cost}");
#[cfg(feature = "loud")]
println!("Total estimated cost: {estimated_cost}");
Ok(estimated_cost)
}
async fn files_put_with_merkle_payment_internal(
&self,
path: PathBuf,
is_public: bool,
wallet: Option<&EvmWallet>,
mut receipt: MerklePaymentReceipt,
) -> Result<(AttoTokens, Vec<(PathBuf, DataMapChunk, Metadata)>), MerkleUploadErrorWithReceipt>
{
debug!("merkle payment: starting for path: {path:?}, is_public: {is_public}");
if let Some(w) = wallet
&& w.network() != self.evm_network()
{
return Err(MerkleUploadErrorWithReceipt::payment(
receipt,
MerklePaymentError::EvmWalletNetworkMismatch,
));
}
#[cfg(feature = "loud")]
println!("Encrypting files a first time to create the Merkle Tree(s)...");
let (mut all_xor_names, file_chunk_counts) = self
.collect_xornames_from_dir(path.clone(), is_public)
.await
.map_err(|e| MerkleUploadErrorWithReceipt::encryption(receipt.clone(), e))?;
receipt.file_chunk_counts = file_chunk_counts;
let total_files = receipt.file_chunk_counts.len();
let total_chunks = all_xor_names.len();
info!("Collected {total_chunks} XorNames from {total_files} files");
let num_batches = total_chunks.div_ceil(MAX_LEAVES);
let mut batches: Vec<Vec<XorName>> = Vec::with_capacity(num_batches);
while !all_xor_names.is_empty() {
let drain_count = std::cmp::min(MAX_LEAVES, all_xor_names.len());
batches.push(all_xor_names.drain(..drain_count).collect());
}
info!("Split into {num_batches} Merkle Tree(s) of up to {MAX_LEAVES} chunks each");
#[cfg(feature = "loud")]
println!("🚀 Starting upload of {total_chunks} chunks in {num_batches} Merkle Tree(s)...");
let mut streams: Vec<EncryptionStream> = encrypt_directory_files(path, is_public)
.await
.map_err(|e| MerkleUploadErrorWithReceipt::encryption(receipt.clone(), e.to_string()))?
.into_iter()
.map(|stream| {
stream.map_err(|e| MerkleUploadErrorWithReceipt::encryption(receipt.clone(), e))
})
.collect::<Result<Vec<EncryptionStream>, MerkleUploadErrorWithReceipt>>()?;
let mut results: Vec<(PathBuf, DataMapChunk, Metadata)> = Vec::new();
for (batch_idx, batch_xornames) in batches.into_iter().enumerate() {
let batch_num = batch_idx + 1;
let batch_size = batch_xornames.len();
info!("Processing batch {batch_num}/{num_batches} ({batch_size} chunks)");
let needs_payment = batch_xornames
.iter()
.any(|xn| !receipt.proofs.contains_key(xn));
if needs_payment {
receipt = self
.pay_for_merkle_tree_batch(
wallet,
batch_xornames,
receipt.clone(),
batch_num,
num_batches,
)
.await
.map_err(|kind| MerkleUploadErrorWithReceipt::new(receipt.clone(), kind))?;
}
#[cfg(feature = "loud")]
println!("🌳 Merkle Tree {batch_num}/{num_batches}: Uploading {batch_size} chunks...");
let (remaining_streams, completed_files) = self
.upload_batch_with_merkle(streams, &receipt, batch_size)
.await
.map_err(|err| MerkleUploadErrorWithReceipt::upload(receipt.clone(), err))?;
streams = remaining_streams;
results.extend(completed_files);
info!(
"Batch {batch_num}/{num_batches} complete, {} files finished so far",
results.len()
);
}
for mut stream in streams {
let datamap = if let Some(datamap) = stream.data_map_chunk() {
datamap
} else {
let _should_be_none = stream.next_batch(1);
stream
.data_map_chunk()
.ok_or(MerkleUploadErrorWithReceipt::upload(
receipt.clone(),
MerklePutError::StreamShouldHaveDatamap,
))?
};
results.push((
stream.relative_path.clone(),
datamap,
stream.metadata.clone(),
));
if let Some(public_addr) = stream.data_address() {
let path = &stream.relative_path;
let f = results.len();
debug!("[File {f}/{total_files}] ({path:?}) is now available at: {public_addr:?}");
#[cfg(feature = "loud")]
println!(
"[File {f}/{total_files}] ({path:?}) is now available at: {public_addr:?}"
);
}
}
debug!(
"merkle payment: {total_chunks} chunks uploaded for {total_files} files successfully"
);
#[cfg(feature = "loud")]
println!("✓ All {total_chunks} chunks uploaded successfully!");
self.send_upload_complete_event(&receipt).await;
Ok((receipt.amount_paid, results))
}
pub async fn files_put_with_merkle_payment(
&self,
path: PathBuf,
is_public: bool,
payment: MerklePaymentOption<'_>,
) -> Result<(AttoTokens, Vec<(PathBuf, DataMapChunk, Metadata)>), MerkleUploadErrorWithReceipt>
{
debug!(
"merkle payment: files_put starting upload for path: {path:?}, is_public: {is_public}"
);
match payment {
MerklePaymentOption::Wallet(wallet) => {
self.files_put_with_merkle_payment_internal(
path,
is_public,
Some(wallet),
MerklePaymentReceipt::default(),
)
.await
}
MerklePaymentOption::Receipt(receipt) => {
self.files_put_with_merkle_payment_internal(path, is_public, None, receipt)
.await
}
MerklePaymentOption::ContinueWithReceipt(wallet, receipt) => {
self.files_put_with_merkle_payment_internal(path, is_public, Some(wallet), receipt)
.await
}
}
}
async fn collect_xornames_from_dir(
&self,
path: PathBuf,
is_public: bool,
) -> Result<(Vec<XorName>, HashMap<String, usize>), String> {
let streams: Vec<EncryptionStream> = encrypt_directory_files(path, is_public)
.await
.map_err(|e| e.to_string())?
.into_iter()
.collect::<Result<Vec<EncryptionStream>, String>>()?;
let mut all_xor_names = Vec::new();
let mut file_chunk_counts = HashMap::new();
for stream in streams {
let file_path = stream.file_path.clone();
let xor_names = collect_xor_names_from_stream(stream);
file_chunk_counts.insert(file_path, xor_names.len());
all_xor_names.extend(xor_names);
}
Ok((all_xor_names, file_chunk_counts))
}
async fn pay_for_merkle_tree_batch(
&self,
wallet: Option<&EvmWallet>,
batch_xornames: Vec<XorName>,
mut receipt: MerklePaymentReceipt,
batch_num: usize,
num_batches: usize,
) -> Result<MerklePaymentReceipt, MerkleUploadError> {
let w = wallet.ok_or_else(|| {
let missing_xn = batch_xornames
.iter()
.find(|xn| !receipt.proofs.contains_key(xn))
.copied()
.unwrap_or_default();
MerkleUploadError::Upload(MerklePutError::MissingPaymentProofFor(missing_xn))
})?;
let batch_size = batch_xornames.len();
debug!("Merkle Tree {batch_num}/{num_batches}: Paying for {batch_size} chunks...");
#[cfg(feature = "loud")]
println!("💸 Merkle Tree {batch_num}/{num_batches}: Paying for {batch_size} chunks...");
let batch_receipt = self
.pay_for_single_merkle_batch(DataTypes::Chunk, batch_xornames, MAX_CHUNK_SIZE, w)
.await
.map_err(MerkleUploadError::Payment)?;
receipt.merge(batch_receipt);
Ok(receipt)
}
async fn send_upload_complete_event(&self, receipt: &MerklePaymentReceipt) {
let total_chunks: usize = receipt.file_chunk_counts.values().sum();
if let Some(sender) = &self.client_event_sender {
let summary = UploadSummary {
records_paid: total_chunks,
records_already_paid: 0,
tokens_spent: receipt.amount_paid.as_atto(),
};
if let Err(err) = sender.send(ClientEvent::UploadComplete(summary)).await {
error!("Failed to send upload completion event: {err:?}");
}
}
}
}
fn collect_xor_names_from_stream(mut encryption_stream: EncryptionStream) -> Vec<XorName> {
let mut xor_names: Vec<XorName> = Vec::new();
let xorname_collection_batch_size: usize = std::cmp::max(32, *CHUNK_UPLOAD_BATCH_SIZE);
let mut total = 0;
let estimated_total = encryption_stream.total_chunks();
let file_path = &encryption_stream.file_path;
let start = std::time::Instant::now();
#[cfg(feature = "loud")]
println!("Begin encrypting ~{estimated_total} chunks from {file_path}...");
debug!("Begin encrypting ~{estimated_total} chunks from {file_path}...");
while let Some(batch) = encryption_stream.next_batch(xorname_collection_batch_size) {
let batch_len = batch.len();
total += batch_len;
for chunk in batch {
xor_names.push(*chunk.name());
}
#[cfg(feature = "loud")]
println!(
"Encrypted {total}/{estimated_total} chunks in {:?}",
start.elapsed()
);
debug!(
"Encrypted {total}/{estimated_total} chunks in {:?}",
start.elapsed()
);
}
xor_names
}