use super::payments::MerklePaymentReceipt;
use crate::Client;
use crate::client::config::CHUNK_UPLOAD_BATCH_SIZE;
use crate::client::data_types::chunk::DataMapChunk;
use crate::client::files::Metadata;
use crate::networking::NetworkError;
use crate::self_encryption::EncryptionStream;
use crate::utils::process_tasks_with_max_concurrency;
use ant_evm::merkle_payments::MerklePaymentProof;
use ant_protocol::NetworkAddress;
use ant_protocol::storage::{Chunk, ChunkAddress, DataTypes, RecordKind, try_serialize_record};
use libp2p::kad::Record;
use std::path::PathBuf;
use thiserror::Error;
use tracing::debug;
use xor_name::XorName;
#[derive(Debug, Error)]
pub enum MerklePutError {
#[error("Network error: {0}")]
Network(#[from] NetworkError),
#[error("Serialization error: {0}")]
Serialization(String),
#[error(
"Missing payment proof for xorname: {0}. This could be caused by a change in content between the payment and the upload. Please try again but make sure the uploaded files are the same as the ones used for the payment."
)]
MissingPaymentProofFor(XorName),
#[error(
"Stream should have a datamap, this is a bug: please report it and save your logs from ~/.autonomi/client/logs/"
)]
StreamShouldHaveDatamap,
}
impl Client {
pub async fn upload_batch_with_merkle(
&self,
mut streams: Vec<EncryptionStream>,
receipt: &MerklePaymentReceipt,
limit: usize,
) -> Result<
(
Vec<EncryptionStream>,
Vec<(PathBuf, DataMapChunk, Metadata)>,
),
MerklePutError,
> {
let mut completed_files: Vec<(PathBuf, DataMapChunk, Metadata)> = Vec::new();
let mut chunks_uploaded = 0;
let total_files = receipt.file_chunk_counts.len();
let upload_batch_size = std::cmp::max(1, *CHUNK_UPLOAD_BATCH_SIZE);
while chunks_uploaded < limit {
let Some(stream) = streams.first_mut() else {
break;
};
let remaining_in_batch = limit - chunks_uploaded;
let batch_size = std::cmp::min(upload_batch_size, remaining_in_batch);
match stream.next_batch(batch_size) {
Some(chunks) if !chunks.is_empty() => {
let mut tasks = Vec::with_capacity(chunks.len());
for chunk in chunks {
let xor_name = *chunk.name();
let proof = receipt
.proofs
.get(&xor_name)
.ok_or(MerklePutError::MissingPaymentProofFor(xor_name))?
.clone();
let client = self.clone();
tasks.push(async move {
client.upload_chunk_with_merkle_proof(&chunk, &proof).await
});
}
let results = process_tasks_with_max_concurrency(tasks, batch_size).await;
for result in results {
let addr = result?;
chunks_uploaded += 1;
debug!("Uploaded chunk {chunks_uploaded}/{limit}: {addr:?}");
#[cfg(feature = "loud")]
println!("({chunks_uploaded}/{limit}) Chunk stored at: {addr:?}");
}
}
_ => {
let exhausted_stream = streams.remove(0);
let path = exhausted_stream.relative_path.clone();
let metadata = exhausted_stream.metadata.clone();
let datamap = exhausted_stream
.data_map_chunk()
.ok_or(MerklePutError::StreamShouldHaveDatamap)?;
completed_files.push((path.clone(), datamap, metadata));
let f = total_files - streams.len();
if let Some(a) = exhausted_stream.data_address() {
debug!("[File {f}/{total_files}] ({path:?}) is now available at: {a:?}");
#[cfg(feature = "loud")]
println!("[File {f}/{total_files}] ({path:?}) is now available at: {a:?}");
}
}
}
}
Ok((streams, completed_files))
}
pub async fn upload_chunk_with_merkle_proof(
&self,
chunk: &Chunk,
proof: &MerklePaymentProof,
) -> Result<ChunkAddress, MerklePutError> {
let address = *chunk.address();
let network_addr = NetworkAddress::from(address);
self.upload_record_with_merkle_proof(network_addr, DataTypes::Chunk, chunk, proof)
.await?;
Ok(address)
}
pub async fn upload_record_with_merkle_proof<T: serde::Serialize>(
&self,
network_addr: NetworkAddress,
data_type: DataTypes,
data: T,
proof: &MerklePaymentProof,
) -> Result<(), MerklePutError> {
let record_kind = RecordKind::DataWithMerklePayment(data_type);
let record = Record {
key: network_addr.to_record_key(),
value: try_serialize_record(&(proof.clone(), data), record_kind)
.map_err(|e| {
MerklePutError::Serialization(format!(
"Failed to serialize chunk with Merkle proof: {e:?}"
))
})?
.to_vec(),
publisher: None,
expires: None,
};
let storing_nodes = self
.network
.get_closest_peers_with_retries(network_addr.clone(), None)
.await?;
self.network
.put_record_with_retries(record, storing_nodes.clone(), &self.config.chunks)
.await?;
Ok(())
}
}