use crate::data::client::batch::{PaymentIntent, PreparedChunk};
use crate::data::client::file::{ExternalPaymentInfo, PreparedUpload};
use crate::data::client::merkle::PaymentMode;
use crate::data::client::Client;
use crate::data::error::{Error, Result};
use ant_protocol::{compute_address, DATA_TYPE_CHUNK};
use bytes::Bytes;
use futures::stream::{self, StreamExt, TryStreamExt};
use self_encryption::{decrypt, encrypt, DataMap, EncryptedChunk};
use tracing::{debug, info};
#[derive(Debug, Clone)]
pub struct DataUploadResult {
pub data_map: DataMap,
pub chunks_stored: usize,
pub payment_mode_used: PaymentMode,
}
impl Client {
pub async fn data_upload(&self, content: Bytes) -> Result<DataUploadResult> {
let content_len = content.len();
debug!("Encrypting data ({content_len} bytes)");
let (data_map, encrypted_chunks) = encrypt(content)
.map_err(|e| Error::Encryption(format!("Failed to encrypt data: {e}")))?;
info!("Data encrypted into {} chunks", encrypted_chunks.len());
let chunk_contents: Vec<Bytes> = encrypted_chunks
.into_iter()
.map(|chunk| chunk.content)
.collect();
let (addresses, _storage_cost, _gas_cost) =
self.batch_upload_chunks(chunk_contents).await?;
let chunks_stored = addresses.len();
info!("Data uploaded: {chunks_stored} chunks stored ({content_len} bytes original)");
Ok(DataUploadResult {
data_map,
chunks_stored,
payment_mode_used: PaymentMode::Single,
})
}
pub async fn data_upload_with_mode(
&self,
content: Bytes,
mode: PaymentMode,
) -> Result<DataUploadResult> {
let content_len = content.len();
debug!("Encrypting data ({content_len} bytes) with mode {mode:?}");
let (data_map, encrypted_chunks) = encrypt(content)
.map_err(|e| Error::Encryption(format!("Failed to encrypt data: {e}")))?;
let chunk_count = encrypted_chunks.len();
info!("Data encrypted into {chunk_count} chunks");
let chunk_contents: Vec<Bytes> = encrypted_chunks
.into_iter()
.map(|chunk| chunk.content)
.collect();
if self.should_use_merkle(chunk_count, mode) {
info!("Using merkle batch payment for {chunk_count} chunks");
let addresses: Vec<[u8; 32]> =
chunk_contents.iter().map(|c| compute_address(c)).collect();
let avg_size =
chunk_contents.iter().map(bytes::Bytes::len).sum::<usize>() / chunk_count.max(1);
let avg_size_u64 = u64::try_from(avg_size).unwrap_or(0);
let batch_result = match self
.pay_for_merkle_batch(&addresses, DATA_TYPE_CHUNK, avg_size_u64)
.await
{
Ok(result) => result,
Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => {
info!("Merkle needs more peers ({msg}), falling back to wave-batch");
let (addresses, _sc, _gc) = self.batch_upload_chunks(chunk_contents).await?;
return Ok(DataUploadResult {
data_map,
chunks_stored: addresses.len(),
payment_mode_used: PaymentMode::Single,
});
}
Err(e) => return Err(e),
};
let chunks_stored = self
.merkle_upload_chunks(chunk_contents, addresses, &batch_result)
.await?;
info!("Data uploaded via merkle: {chunks_stored} chunks stored ({content_len} bytes)");
Ok(DataUploadResult {
data_map,
chunks_stored,
payment_mode_used: PaymentMode::Merkle,
})
} else {
let (addresses, _sc, _gc) = self.batch_upload_chunks(chunk_contents).await?;
info!(
"Data uploaded: {} chunks stored ({content_len} bytes original)",
addresses.len()
);
Ok(DataUploadResult {
data_map,
chunks_stored: addresses.len(),
payment_mode_used: PaymentMode::Single,
})
}
}
pub async fn data_prepare_upload(&self, content: Bytes) -> Result<PreparedUpload> {
let content_len = content.len();
debug!("Preparing data upload for external signing ({content_len} bytes)");
let (data_map, encrypted_chunks) = encrypt(content)
.map_err(|e| Error::Encryption(format!("Failed to encrypt data: {e}")))?;
let chunk_count = encrypted_chunks.len();
info!("Data encrypted into {chunk_count} chunks");
let chunk_contents: Vec<Bytes> = encrypted_chunks
.into_iter()
.map(|chunk| chunk.content)
.collect();
let quote_concurrency = self.config().quote_concurrency;
let results: Vec<Result<Option<PreparedChunk>>> = futures::stream::iter(chunk_contents)
.map(|content| async move { self.prepare_chunk_payment(content).await })
.buffer_unordered(quote_concurrency)
.collect()
.await;
let mut prepared_chunks = Vec::with_capacity(results.len());
for result in results {
if let Some(prepared) = result? {
prepared_chunks.push(prepared);
}
}
let payment_intent = PaymentIntent::from_prepared_chunks(&prepared_chunks);
info!(
"Data prepared for external signing: {} chunks, total {} atto ({content_len} bytes)",
prepared_chunks.len(),
payment_intent.total_amount,
);
Ok(PreparedUpload {
data_map,
payment_info: ExternalPaymentInfo::WaveBatch {
prepared_chunks,
payment_intent,
},
data_map_address: None,
})
}
pub async fn data_map_store(&self, data_map: &DataMap) -> Result<[u8; 32]> {
let serialized = rmp_serde::to_vec(data_map)
.map_err(|e| Error::Serialization(format!("Failed to serialize DataMap: {e}")))?;
info!(
"Storing DataMap as public chunk ({} bytes serialized)",
serialized.len()
);
self.chunk_put(Bytes::from(serialized)).await
}
pub async fn data_map_fetch(&self, address: &[u8; 32]) -> Result<DataMap> {
let chunk = self.chunk_get(address).await?.ok_or_else(|| {
Error::InvalidData(format!(
"DataMap chunk not found at {}",
hex::encode(address)
))
})?;
rmp_serde::from_slice(&chunk.content)
.map_err(|e| Error::Serialization(format!("Failed to deserialize DataMap: {e}")))
}
pub async fn data_download(&self, data_map: &DataMap) -> Result<Bytes> {
let chunk_infos = data_map.infos();
debug!("Downloading data ({} chunks)", chunk_infos.len());
let addresses: Vec<[u8; 32]> = chunk_infos.iter().map(|info| info.dst_hash.0).collect();
let encrypted_chunks: Vec<EncryptedChunk> = stream::iter(addresses)
.map(|address| async move {
let chunk = self.chunk_get(&address).await?.ok_or_else(|| {
Error::InvalidData(format!(
"Missing chunk {} required for data reconstruction",
hex::encode(address)
))
})?;
Ok::<_, Error>(EncryptedChunk {
content: chunk.content,
})
})
.buffered(self.config().quote_concurrency)
.try_collect()
.await?;
debug!(
"All {} chunks retrieved, decrypting",
encrypted_chunks.len()
);
let content = decrypt(data_map, &encrypted_chunks)
.map_err(|e| Error::Encryption(format!("Failed to decrypt data: {e}")))?;
info!("Data downloaded and decrypted ({} bytes)", content.len());
Ok(content)
}
}
#[cfg(test)]
mod send_assertions {
use super::*;
fn _assert_send<T: Send>(_: &T) {}
#[allow(
dead_code,
unreachable_code,
unused_variables,
clippy::diverging_sub_expression
)]
async fn _data_download_is_send(client: &Client) {
let dm: DataMap = todo!();
let fut = client.data_download(&dm);
_assert_send(&fut);
}
#[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
async fn _data_upload_is_send(client: &Client) {
let fut = client.data_upload(Bytes::new());
_assert_send(&fut);
}
#[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
async fn _data_upload_with_mode_is_send(client: &Client) {
let fut = client.data_upload_with_mode(Bytes::new(), PaymentMode::Auto);
_assert_send(&fut);
}
#[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
async fn _data_prepare_upload_is_send(client: &Client) {
let fut = client.data_prepare_upload(Bytes::new());
_assert_send(&fut);
}
}