use self_encryption::DataMap;
use serde::{Deserialize, Serialize};
use std::{
path::{Path, PathBuf},
time::{Duration, SystemTime, UNIX_EPOCH},
};
use thiserror::Error;
use tracing::info;
use crate::client::config::MERKLE_PAYMENT_THRESHOLD;
use crate::client::data_types::chunk::ChunkAddress;
use crate::client::merkle_payments::{
MerklePaymentError, MerklePaymentOption, MerkleUploadErrorWithReceipt,
};
use crate::client::{GetError, PutError, quote::CostError};
use crate::self_encryption::{EncryptionStream, MAX_CHUNK_SIZE};
use crate::utils::process_tasks_with_max_concurrency;
use crate::{
Client,
chunk::DataMapChunk,
client::payment::{BulkPaymentOption, PaymentOption},
};
use ant_evm::AttoTokens;
use bytes::Bytes;
use self_encryption::streaming_decrypt_from_storage;
use xor_name::XorName;
pub mod archive_private;
pub mod archive_public;
mod cost;
pub mod fs_private;
pub mod fs_public;
pub use archive_private::PrivateArchive;
pub use archive_public::PublicArchive;
pub fn estimate_directory_chunks(dir_path: &PathBuf) -> Result<usize, std::io::Error> {
let mut total_chunks = 0;
if dir_path.is_file() {
let size = std::fs::metadata(dir_path)?.len() as usize;
return Ok(std::cmp::max(3, size.div_ceil(MAX_CHUNK_SIZE)));
}
for entry in walkdir::WalkDir::new(dir_path)
.follow_links(true)
.into_iter()
.filter_map(|e| e.ok())
{
if entry.file_type().is_file() {
let size = entry.metadata().map(|m| m.len() as usize).unwrap_or(0);
total_chunks += std::cmp::max(3, size.div_ceil(MAX_CHUNK_SIZE));
}
}
Ok(total_chunks)
}
pub(crate) fn streams_to_file_results(
streams: Vec<EncryptionStream>,
) -> Result<Vec<(PathBuf, DataMapChunk, Metadata)>, UploadError> {
let mut results = Vec::with_capacity(streams.len());
for stream in streams {
let datamap = stream.data_map_chunk().ok_or_else(|| {
UploadError::Encryption(format!(
"Datamap chunk not found for file: {:?}",
stream.file_path
))
})?;
results.push((stream.relative_path, datamap, stream.metadata));
}
Ok(results)
}
pub(crate) async fn bulk_upload_internal<A, F>(
client: &Client,
dir_path: PathBuf,
payment_option: BulkPaymentOption,
is_public: bool,
build_archive: F,
) -> Result<(AttoTokens, A), UploadError>
where
F: FnOnce(Vec<(PathBuf, DataMapChunk, Metadata)>) -> A,
{
match payment_option {
BulkPaymentOption::Wallet(wallet) => {
let estimated_chunks = estimate_directory_chunks(&dir_path)?;
if estimated_chunks >= MERKLE_PAYMENT_THRESHOLD {
info!(
"Auto-selected merkle payments for ~{estimated_chunks} chunks (>= {MERKLE_PAYMENT_THRESHOLD} threshold)"
);
let (cost, results) = client
.files_put_with_merkle_payment(
dir_path,
is_public,
MerklePaymentOption::Wallet(&wallet),
)
.await?;
Ok((cost, build_archive(results)))
} else {
info!(
"Auto-selected regular payments for ~{estimated_chunks} chunks (< {MERKLE_PAYMENT_THRESHOLD} threshold)"
);
let (cost, streams) = client
.dir_content_upload_internal(dir_path, PaymentOption::Wallet(wallet), is_public)
.await?;
let results = streams_to_file_results(streams)?;
Ok((cost, build_archive(results)))
}
}
BulkPaymentOption::ForceMerkle(wallet) => {
let (cost, results) = client
.files_put_with_merkle_payment(
dir_path,
is_public,
MerklePaymentOption::Wallet(&wallet),
)
.await?;
Ok((cost, build_archive(results)))
}
BulkPaymentOption::ForceRegular(wallet) => {
let (cost, streams) = client
.dir_content_upload_internal(dir_path, PaymentOption::Wallet(wallet), is_public)
.await?;
let results = streams_to_file_results(streams)?;
Ok((cost, build_archive(results)))
}
BulkPaymentOption::Receipt(receipt) => {
let (cost, streams) = client
.dir_content_upload_internal(dir_path, PaymentOption::Receipt(receipt), is_public)
.await?;
let results = streams_to_file_results(streams)?;
Ok((cost, build_archive(results)))
}
BulkPaymentOption::MerkleReceipt(receipt) => {
let (cost, results) = client
.files_put_with_merkle_payment(
dir_path,
is_public,
MerklePaymentOption::Receipt(receipt),
)
.await?;
Ok((cost, build_archive(results)))
}
BulkPaymentOption::ContinueMerkle(wallet, receipt) => {
let (cost, results) = client
.files_put_with_merkle_payment(
dir_path,
is_public,
MerklePaymentOption::ContinueWithReceipt(&wallet, receipt),
)
.await?;
Ok((cost, build_archive(results)))
}
}
}
pub(crate) async fn file_upload_internal(
client: &Client,
path: PathBuf,
payment_option: BulkPaymentOption,
is_public: bool,
) -> Result<(AttoTokens, DataMapChunk), UploadError> {
let (cost, results) =
bulk_upload_internal(client, path, payment_option, is_public, |r| r).await?;
let datamap = results
.into_iter()
.next()
.map(|(_, dm, _)| dm)
.ok_or_else(|| UploadError::Encryption("No file results from upload".to_string()))?;
Ok((cost, datamap))
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct Metadata {
pub created: u64,
pub modified: u64,
pub size: u64,
pub extra: Option<String>,
}
impl Default for Metadata {
fn default() -> Self {
Self::new_with_size(0)
}
}
impl Metadata {
pub fn new_with_size(size: u64) -> Self {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or(Duration::from_secs(0))
.as_secs();
Self {
created: now,
modified: now,
size,
extra: None,
}
}
pub fn empty() -> Self {
Self {
created: 0,
modified: 0,
size: 0,
extra: None,
}
}
}
#[derive(Error, Debug, PartialEq, Eq)]
pub enum RenameError {
#[error("File not found in archive: {0}")]
FileNotFound(PathBuf),
}
#[derive(Debug, thiserror::Error)]
pub enum UploadError {
#[error("Failed to recursively traverse directory")]
WalkDir(#[from] walkdir::Error),
#[error("Input/output failure")]
IoError(#[from] std::io::Error),
#[error("Failed to upload file")]
PutError(#[from] PutError),
#[error("Encryption error")]
Encryption(String),
#[error("Merkle upload error: {0}")]
MerkleUpload(#[from] MerkleUploadErrorWithReceipt),
}
#[derive(Debug, thiserror::Error)]
pub enum DownloadError {
#[error("Failed to download file")]
GetError(#[from] GetError),
#[error("IO failure")]
IoError(#[from] std::io::Error),
}
#[derive(Debug, thiserror::Error)]
pub enum FileCostError {
#[error("Cost error: {0}")]
Cost(#[from] CostError),
#[error("IO failure")]
IoError(#[from] std::io::Error),
#[error("Serialization error")]
Serialization(#[from] rmp_serde::encode::Error),
#[error("Walkdir error")]
WalkDir(#[from] walkdir::Error),
#[error("Merkle payment error: {0}")]
MerklePayment(#[from] MerklePaymentError),
#[error("Encryption error: {0}")]
Encryption(String),
}
pub(crate) fn normalize_path(path: PathBuf) -> PathBuf {
let normalized = path
.components()
.map(|c| c.as_os_str().to_string_lossy())
.collect::<Vec<_>>()
.join("/")
.replace('\\', "/")
.replace("//", "/");
PathBuf::from(normalized)
}
impl Client {
pub(crate) fn stream_download_from_datamap(
&self,
data_map: DataMap,
to_dest: &Path,
) -> Result<(), DownloadError> {
if let Err(e) = std::fs::File::create(to_dest) {
crate::loud_info!(
"Input destination path {to_dest:?} cannot be used for streaming disk flushing: {e}"
);
crate::loud_info!(
"This file may have been uploaded without a metadata archive. A file name must be provided to download and save it."
);
return Err(DownloadError::IoError(e));
}
if let Err(cleanup_err) = std::fs::remove_file(to_dest) {
crate::loud_info!(
"Warning: Failed to clean up temporary verification file {to_dest:?}: {cleanup_err}"
);
return Err(DownloadError::IoError(cleanup_err));
}
let total_chunks = data_map.infos().len();
crate::loud_info!("Streaming fetching {total_chunks} chunks to {to_dest:?} ...");
let client_clone = self.clone();
let parallel_chunk_fetcher = move |chunk_names: &[(usize, XorName)]| -> Result<
Vec<(usize, Bytes)>,
self_encryption::Error,
> {
let chunk_addresses: Vec<(usize, ChunkAddress)> = chunk_names
.iter()
.map(|(i, name)| (*i, ChunkAddress::new(*name)))
.collect();
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async {
client_clone
.fetch_chunks_parallel(&chunk_addresses, total_chunks)
.await
})
})
};
streaming_decrypt_from_storage(&data_map, to_dest, parallel_chunk_fetcher).map_err(
|e| {
DownloadError::GetError(crate::client::GetError::Decryption(
crate::self_encryption::Error::SelfEncryption(e),
))
},
)?;
let chunk_addrs: Vec<ChunkAddress> = data_map
.infos()
.iter()
.map(|info| ChunkAddress::new(info.dst_hash))
.collect();
self.cleanup_cached_chunks(&chunk_addrs);
Ok(())
}
pub(super) async fn fetch_chunks_parallel(
&self,
chunk_addresses: &[(usize, ChunkAddress)],
total_chunks: usize,
) -> Result<Vec<(usize, Bytes)>, self_encryption::Error> {
let mut download_tasks = vec![];
for (i, chunk_addr) in chunk_addresses {
let client_clone = self.clone();
let addr_clone = *chunk_addr;
download_tasks.push(async move {
crate::loud_debug!("Fetching chunk {i}/{total_chunks}({addr_clone:?})");
let result = client_clone
.chunk_get(&addr_clone)
.await
.map(|chunk| (*i, chunk.value))
.map_err(|e| {
self_encryption::Error::Generic(format!(
"Failed to fetch chunk {addr_clone:?}: {e:?}"
))
});
crate::loud_debug!("Fetching chunk {i}/{total_chunks}({addr_clone:?}) [DONE]");
result
});
}
let chunks = process_tasks_with_max_concurrency(
download_tasks,
*crate::client::config::CHUNK_DOWNLOAD_BATCH_SIZE,
)
.await
.into_iter()
.collect::<Result<Vec<(usize, Bytes)>, self_encryption::Error>>()?;
Ok(chunks)
}
pub(crate) async fn dir_content_upload_internal(
&self,
dir_path: PathBuf,
payment_option: PaymentOption,
is_public: bool,
) -> Result<(AttoTokens, Vec<EncryptionStream>), UploadError> {
info!("Uploading directory: {dir_path:?}, public: {is_public}");
let encryption_results =
crate::self_encryption::encrypt_directory_files(dir_path, is_public).await?;
let mut chunk_iterators = vec![];
for encryption_result in encryption_results {
match encryption_result {
Ok(stream) => {
crate::loud_info!("Successfully encrypted file: {:?}", stream.file_path);
chunk_iterators.push(stream);
}
Err(err_msg) => {
crate::loud_error!("Error during file encryption: {err_msg}");
return Err(UploadError::Encryption(err_msg));
}
}
}
let total_cost = self
.pay_and_upload(payment_option, &mut chunk_iterators)
.await?;
Ok((total_cost, chunk_iterators))
}
}
pub(crate) fn get_relative_file_path_from_abs_file_and_folder_path(
abs_file_path: &Path,
abs_folder_path: &Path,
) -> Result<PathBuf, String> {
let is_file = abs_folder_path.is_file();
let dir_name = abs_folder_path
.file_name()
.ok_or_else(|| format!("Failed to get file/dir name from path: {abs_folder_path:?}"))
.map(PathBuf::from)?;
if is_file {
Ok(dir_name)
} else {
let folder_prefix = abs_folder_path
.parent()
.unwrap_or(Path::new(""))
.to_path_buf();
abs_file_path
.strip_prefix(&folder_prefix)
.map_err(|e| {
format!("Could not strip prefix {folder_prefix:?} from path {abs_file_path:?}: {e}")
})
.map(|p| p.to_path_buf())
}
}
#[cfg(test)]
mod tests {
use super::normalize_path;
use std::path::PathBuf;
#[cfg(windows)]
#[test]
fn test_normalize_path_to_forward_slashes() {
let windows_path = PathBuf::from(r"folder\test\file.txt");
let normalized = normalize_path(windows_path);
assert_eq!(normalized, PathBuf::from("folder/test/file.txt"));
}
#[test]
fn test_normalize_path_preserves_leading_slash() {
let path = PathBuf::from("/folder/test/file.txt");
let normalized = normalize_path(path);
assert_eq!(
normalized.to_string_lossy(),
"/folder/test/file.txt",
"Leading slash should not produce double slash"
);
}
}