use crate::Client;
use crate::chunk::DataMapChunk;
use crate::client::config::{UPLOAD_FLOW_BATCH_SIZE, upload_retry_pause};
use crate::client::merkle_payments::MerklePaymentReceipt;
use crate::client::payment::PayError::EvmWalletError;
use crate::client::payment::PaymentOption;
use crate::client::payment::Receipt;
use crate::client::{ClientEvent, PutError, UploadSummary};
use crate::self_encryption::EncryptionStream;
use crate::utils::format_upload_error;
use ant_evm::{Amount, AttoTokens};
use ant_protocol::storage::{Chunk, DataTypes};
use bytes::Bytes;
use evmlib::wallet::Error::InsufficientTokensForQuotes;
type AggregatedChunks = Vec<((String, usize, usize), Chunk)>;
impl Client {
pub(crate) async fn send_upload_complete(
&self,
records_paid: usize,
records_already_paid: usize,
tokens_spent: Amount,
) {
if let Some(sender) = &self.client_event_sender {
let summary = UploadSummary {
records_paid,
records_already_paid,
tokens_spent,
};
if let Err(err) = sender.send(ClientEvent::UploadComplete(summary)).await {
error!("Failed to send upload completion event: {err:?}");
}
}
}
pub(crate) async fn send_merkle_batch_payment_complete(&self, receipt: &MerklePaymentReceipt) {
if let Some(sender) = &self.client_event_sender
&& let Err(err) = sender
.send(ClientEvent::MerkleBatchPaymentComplete(receipt.clone()))
.await
{
error!("Failed to send merkle batch payment event: {err:?}");
}
}
pub(crate) async fn send_regular_batch_payment_complete(&self, receipt: &Receipt) {
if let Some(sender) = &self.client_event_sender
&& let Err(err) = sender
.send(ClientEvent::RegularBatchPaymentComplete(receipt.clone()))
.await
{
error!("Failed to send regular batch payment event: {err:?}");
}
}
pub(crate) async fn calculate_total_cost(
&self,
total_chunks: usize,
payment_receipts: Vec<Receipt>,
total_free_chunks: usize,
) -> AttoTokens {
let total_tokens: Amount = payment_receipts
.into_iter()
.flat_map(|receipt| receipt.into_values().map(|(_, cost)| cost.as_atto()))
.sum();
self.send_upload_complete(
total_chunks.saturating_sub(total_free_chunks),
total_free_chunks,
total_tokens,
)
.await;
AttoTokens::from_atto(total_tokens)
}
pub(crate) async fn pay_and_upload(
&self,
payment_option: PaymentOption,
encryption_streams: &mut [EncryptionStream],
) -> Result<AttoTokens, PutError> {
let start = tokio::time::Instant::now();
let total_files = encryption_streams.len();
let mut receipts = Vec::new();
let mut total_free_chunks = 0;
let mut total_chunks = 0;
let maybe_file = if total_files > 1 {
&format!(" of {total_files} files")
} else {
""
};
let est_total_chunks: usize = encryption_streams
.iter()
.map(|stream| stream.total_chunks())
.sum();
crate::loud_info!("Processing estimated total {est_total_chunks} chunks{maybe_file}");
for stream in encryption_streams.iter_mut() {
if !stream.file_path.is_empty() {
crate::loud_info!("Uploading file: {}", stream.file_path);
}
let (processed_chunks, free_chunks, receipt) = self
.pay_and_upload_file(payment_option.clone(), stream)
.await?;
total_chunks += processed_chunks;
total_free_chunks += free_chunks;
receipts.extend(receipt);
let filename = stream.file_path.clone();
let addr_if_pub = stream
.data_address()
.map(|addr| format!(" at {}", addr.to_hex()))
.unwrap_or_else(|| "".to_string());
let filename_if_any = if !filename.is_empty() {
&format!(" for file {filename}")
} else {
""
};
crate::loud_info!("Upload completed{filename_if_any}{addr_if_pub}");
}
let total_elapsed = start.elapsed();
crate::loud_info!("Upload{maybe_file} completed in {total_elapsed:?}");
Ok(self
.calculate_total_cost(total_chunks, receipts, total_free_chunks)
.await)
}
pub(crate) async fn pay_and_upload_file(
&self,
payment_option: PaymentOption,
file: &mut EncryptionStream,
) -> Result<(usize, usize, Vec<Receipt>), PutError> {
let est_total_todo = file.total_chunks();
let mut processed_chunks = 0;
let mut total_free_chunks = 0;
let mut receipts = vec![];
let mut retry_on_failure = true;
let mut attempted_uploads = 0;
let allowed_attempts =
est_total_todo + std::cmp::max(20, est_total_todo * self.retry_failed as usize);
let mut current_batch = vec![];
loop {
if let Some(next_batch) = file.next_batch(*UPLOAD_FLOW_BATCH_SIZE - current_batch.len())
{
let next_batch_len = next_batch.len();
let path = file.file_path.clone();
let aggr_batch: AggregatedChunks = next_batch
.into_iter()
.enumerate()
.map(|(i, chunk)| ((path.clone(), processed_chunks + i, est_total_todo), chunk))
.collect();
current_batch.extend(aggr_batch);
processed_chunks += next_batch_len;
}
if current_batch.is_empty() {
break;
}
attempted_uploads += current_batch.len();
let (retry_chunks, receipt, free_chunks_count, put_error) = self
.process_chunk_batch(current_batch, payment_option.clone(), retry_on_failure)
.await;
receipts.extend(receipt);
total_free_chunks += free_chunks_count;
if let Some(err) = put_error {
return Err(err);
}
if !retry_chunks.is_empty() {
if attempted_uploads > allowed_attempts {
retry_on_failure = false;
}
upload_retry_pause().await;
crate::loud_info!("🔄 Retrying {} chunks...", retry_chunks.len());
}
current_batch = retry_chunks;
}
Ok((processed_chunks, total_free_chunks, receipts))
}
#[allow(clippy::too_many_arguments)]
async fn process_chunk_batch(
&self,
mut batch: AggregatedChunks,
payment_option: PaymentOption,
retry_on_failure: bool,
) -> (AggregatedChunks, Vec<Receipt>, usize, Option<PutError>) {
let payment_info: Vec<_> = batch
.iter()
.map(|(_, chunk)| (*chunk.name(), chunk.size()))
.collect();
crate::loud_info!("Processing batch of {} chunks", batch.len());
let mut file_infos = vec![];
let mut batch_chunks = vec![];
let mut put_error = None;
for (chunk_info, chunk) in batch.clone() {
file_infos.push(chunk_info);
batch_chunks.push(chunk);
}
for (file_name, i, est_total) in file_infos.iter() {
let maybe_file = if !file_name.is_empty() {
&format!(" of {file_name}")
} else {
""
};
crate::loud_info!("Processing chunk ({}/{est_total}){maybe_file}", i + 1);
}
let is_new_payment = matches!(&payment_option, PaymentOption::Wallet(_));
let (receipt, free_chunks) = match self
.pay_for_content_addrs(DataTypes::Chunk, payment_info.into_iter(), payment_option)
.await
{
Ok((receipt, free_chunks)) => (receipt, free_chunks),
Err(err) if matches!(err, EvmWalletError(InsufficientTokensForQuotes(_, _))) => {
crate::loud_error!("Insufficient tokens: {err:?}. Returning immediately.");
return (vec![], vec![], 0, Some(PutError::from(err)));
}
Err(err) => {
return if retry_on_failure {
crate::loud_error!(
"Quoting or payment error encountered, retry scheduled: {err}"
);
(batch, vec![], 0, None)
} else {
crate::loud_error!(
"Quoting or payment error encountered, no retry scheduled: {err}"
);
(vec![], vec![], 0, Some(PutError::from(err)))
};
}
};
if free_chunks > 0 {
crate::loud_info!(
"{free_chunks} chunks were free in this batch {}",
batch_chunks.len()
);
}
if is_new_payment && !receipt.is_empty() {
self.send_regular_batch_payment_complete(&receipt).await;
}
let mut retry_chunks = vec![];
match self
.chunk_batch_upload(batch_chunks.iter().collect(), &receipt)
.await
{
Ok(()) => {}
Err(err) if retry_on_failure => {
let error_msg = format_upload_error(&err);
crate::loud_info!("⚠️ {error_msg}. Retrying scheduled");
info!("Upload error: {err}. Retrying scheduled");
if let PutError::Batch(ref upload_state) = err {
let failed_chunks: Vec<_> =
upload_state.failed.iter().map(|(addr, _)| *addr).collect();
batch.retain(|(_, chunk)| failed_chunks.contains(chunk.address()));
retry_chunks.extend(batch);
} else {
put_error = Some(err);
};
}
Err(err) => put_error = Some(err),
}
(retry_chunks, vec![receipt], free_chunks, put_error)
}
pub(crate) async fn data_put_internal(
&self,
data: Bytes,
payment_option: PaymentOption,
is_public: bool,
) -> Result<(AttoTokens, DataMapChunk), PutError> {
let (chunk_stream, data_map_chunk) = EncryptionStream::new_in_memory(data, is_public)?;
let mut chunk_streams = vec![chunk_stream];
let total_cost = self
.pay_and_upload(payment_option, &mut chunk_streams)
.await?;
Ok((total_cost, data_map_chunk))
}
}