use std::{env, fs, io};
use std::fs::create_dir;
use std::io::Write;
use std::path::PathBuf;
use std::collections::HashMap;
use std::sync::Arc;
use actix_multipart::form::MultipartForm;
use actix_web::web::Bytes;
use ant_core::data::{Wallet, XorName};
use log::{debug, info, warn};
use sanitize_filename::sanitize;
use uuid::Uuid;
use tar::Builder;
use base64::Engine;
use base64::prelude::BASE64_STANDARD;
use hex::{FromHex, ToHex};
#[double]
use crate::service::public_data_service::PublicDataService;
#[double]
use crate::service::file_service::FileService;
use mockall_double::double;
#[double]
use crate::client::tarchive_caching_client::TArchiveCachingClient;
#[double]
use crate::service::resolver_service::ResolverService;
use crate::error::tarchive_error::TarchiveError;
use crate::error::UpdateError;
use crate::controller::StoreType;
use crate::model::tarchive::Tarchive;
use crate::model::archive::Archive;
use crate::config::anttp_config::AntTpConfig;
use tokio::sync::Mutex as TokioMutex;
use crate::service::archive_service::{ArchiveRaw, ArchiveResponse, PublicArchiveForm, Upload};
#[derive(Clone)]
pub struct TarchiveService {
public_data_service: PublicDataService,
tarchive_caching_client: TArchiveCachingClient,
file_service: FileService,
resolver_service: ResolverService,
ant_tp_config: AntTpConfig,
address_locks: Arc<TokioMutex<HashMap<String, Arc<TokioMutex<()>>>>>,
}
impl TarchiveService {
pub fn new(public_data_service: PublicDataService, tarchive_caching_client: TArchiveCachingClient, file_service: FileService, resolver_service: ResolverService, ant_tp_config: AntTpConfig) -> Self {
TarchiveService { public_data_service, tarchive_caching_client, file_service, resolver_service, ant_tp_config, address_locks: Arc::new(TokioMutex::new(HashMap::new())) }
}
pub async fn get_tarchive(&self, address: String, path: Option<String>) -> Result<ArchiveResponse, TarchiveError> {
let resolved_address = self.resolver_service.resolve_name(&address).await.unwrap_or(address);
let res = self.get_tarchive_binary(resolved_address, path).await?;
Ok(ArchiveResponse::new(res.items, BASE64_STANDARD.encode(res.content), res.address))
}
pub async fn get_tarchive_binary(&self, address: String, path: Option<String>) -> Result<ArchiveRaw, TarchiveError> {
let resolved_address = self.resolver_service.resolve_name(&address).await.unwrap_or(address);
let data_address = XorName::from_hex(resolved_address)
.map_err(|e| TarchiveError::GetError(crate::error::GetError::BadAddress(e.to_string())))?;
let bytes = self.tarchive_caching_client.get_archive_from_tar(&data_address).await?;
let archive = Archive::build_from_tar(&data_address, bytes);
let path = path.unwrap_or_default();
match archive.find_file(&path) {
Some(data_address_offset) => {
debug!("download file from tarchive at [{}]", path);
let bytes = self.file_service.download_data_bytes(data_address_offset.data_address, data_address_offset.offset, data_address_offset.size).await?;
Ok(ArchiveRaw::new(vec![], bytes.into(), data_address.encode_hex()))
}
None => {
debug!("download directory from tarchive at [{}]", path);
let path_details = archive.list_dir(path);
Ok(ArchiveRaw::new(path_details, Bytes::new(), data_address.encode_hex()))
}
}
}
pub async fn create_tarchive(&self, target_path: Option<String>, tarchive_form: MultipartForm<PublicArchiveForm>, evm_wallet: Wallet, store_type: StoreType) -> Result<Upload, TarchiveError> {
info!("Creating new tarchive");
let tmp_dir = Self::create_tmp_dir()?;
let tar_path = tmp_dir.join("archive.tar");
{
let tar_file = fs::File::create(&tar_path)?;
let mut builder = Builder::new(tar_file);
self.build_tar_from_form(&mut builder, target_path, tarchive_form)?;
builder.finish()?;
}
let final_tar_path = self.rebuild_with_index(&tar_path, &tmp_dir)?;
let result = self.upload_tar(&final_tar_path, store_type).await;
Self::purge_tmp_dir(&tmp_dir);
result
}
pub async fn push_tarchive(&self, address: String, evm_wallet: Wallet, store_type: StoreType) -> Result<Upload, TarchiveError> {
let resolved_address = self.resolver_service.resolve_name(&address).await.unwrap_or(address);
Ok(self.public_data_service.push_public_data(resolved_address, store_type).await
.map(|chunk| Upload::new(chunk.address))?)
}
pub async fn update_tarchive(&self, address: String, target_path: Option<String>, tarchive_form: MultipartForm<PublicArchiveForm>, evm_wallet: Wallet, store_type: StoreType) -> Result<Upload, TarchiveError> {
let addr_lock = {
let mut locks = self.address_locks.lock().await;
Arc::clone(locks.entry(address.clone()).or_insert_with(|| Arc::new(TokioMutex::new(()))))
};
let _guard = addr_lock.lock().await;
let resolved_address = self.resolver_service.resolve_name(&address).await.unwrap_or(address);
info!("Updating tarchive at address [{}]", resolved_address);
let tmp_dir = Self::create_tmp_dir()?;
let tar_path = tmp_dir.join("archive.tar");
let existing_data = self.public_data_service.get_public_data_binary(resolved_address).await?;
let mut tar_file = fs::File::create(&tar_path)?;
tar_file.write_all(&existing_data)?;
let updated_tar_path = tmp_dir.join("updated_archive.tar");
{
let updated_tar_file = fs::File::create(&updated_tar_path)?;
let mut builder = Builder::new(updated_tar_file);
let mut existing_tar_file = fs::File::open(&tar_path)?;
let mut archive = tar::Archive::new(&mut existing_tar_file);
for entry_result in archive.entries()? {
let mut entry = entry_result?;
let header = entry.header().clone();
let path = entry.path()?.to_path_buf();
if path.to_str() == Some("archive.tar.idx") {
continue;
}
builder.append_data(&mut header.clone(), path, &mut entry)?;
}
self.build_tar_from_form(&mut builder, target_path, tarchive_form)?;
builder.finish()?;
}
let final_tar_path = self.rebuild_with_index(&updated_tar_path, &tmp_dir)?;
let result = self.upload_tar(&final_tar_path, store_type).await;
Self::purge_tmp_dir(&tmp_dir);
result
}
pub async fn truncate_tarchive(&self, address: String, path: String, evm_wallet: Wallet, store_type: StoreType) -> Result<Upload, TarchiveError> {
let addr_lock = {
let mut locks = self.address_locks.lock().await;
Arc::clone(locks.entry(address.clone()).or_insert_with(|| Arc::new(TokioMutex::new(()))))
};
let _guard = addr_lock.lock().await;
let resolved_address = self.resolver_service.resolve_name(&address).await.unwrap_or(address);
info!("Truncating tarchive at address [{}]", resolved_address);
let tmp_dir = Self::create_tmp_dir()?;
let tar_path = tmp_dir.join("archive.tar");
let existing_data = self.public_data_service.get_public_data_binary(resolved_address).await?;
let mut tar_file = fs::File::create(&tar_path)?;
tar_file.write_all(&existing_data)?;
let updated_tar_path = tmp_dir.join("updated_archive.tar");
{
let updated_tar_file = fs::File::create(&updated_tar_path)?;
let mut builder = Builder::new(updated_tar_file);
let sanitised_delete_path = Tarchive::sanitise_path(&path);
let delete_prefix = format!("{}/", sanitised_delete_path);
let mut existing_tar_file = fs::File::open(&tar_path)?;
let mut archive = tar::Archive::new(&mut existing_tar_file);
for entry_result in archive.entries()? {
let mut entry = entry_result?;
let header = entry.header().clone();
let entry_path = entry.path()?.to_path_buf();
let entry_path_str = entry_path.to_str().unwrap_or_default();
if entry_path_str == "archive.tar.idx" {
continue;
}
if entry_path_str == sanitised_delete_path || entry_path_str.starts_with(&delete_prefix) {
info!("Skipping file [{}] from truncated tarchive", entry_path_str);
continue;
}
builder.append_data(&mut header.clone(), entry_path, &mut entry)?;
}
builder.finish()?;
}
let final_tar_path = self.rebuild_with_index(&updated_tar_path, &tmp_dir)?;
let result = self.upload_tar(&final_tar_path, store_type).await;
Self::purge_tmp_dir(&tmp_dir);
result
}
fn build_tar_from_form<W: Write>(&self, builder: &mut Builder<W>, target_path: Option<String>, tarchive_form: MultipartForm<PublicArchiveForm>) -> Result<(), TarchiveError> {
for temp_file in tarchive_form.files.iter() {
if let Some(raw_file_name) = &temp_file.file_name {
let mut file_path = PathBuf::new();
if let Some(target_path_str) = &target_path {
for part in target_path_str.split('/') {
let sanitised_part = sanitize(part);
if !sanitised_part.is_empty() && sanitised_part != ".." && sanitised_part != "." {
file_path.push(sanitised_part);
}
}
}
let file_name = sanitize(raw_file_name);
file_path.push(file_name);
builder.append_path_with_name(temp_file.file.path(), file_path)?;
} else {
return Err(UpdateError::TemporaryStorage("Failed to get filename from multipart field".to_string()).into());
}
}
Ok(())
}
fn rebuild_with_index(&self, tar_path: &PathBuf, tmp_dir: &PathBuf) -> Result<PathBuf, TarchiveError> {
let index_str = {
let mut tar_file = fs::File::open(tar_path)?;
let app_private_key = self.ant_tp_config.get_app_private_key()
.map_err(|_| TarchiveError::UpdateError(UpdateError::AppKeyMissing("App private key missing or invalid".to_string())))?;
Tarchive::index(&mut tar_file, &app_private_key)?
};
let final_tar_path = tmp_dir.join("final_archive.tar");
let final_tar_file = fs::File::create(&final_tar_path)?;
let mut builder = Builder::new(final_tar_file);
let mut src_tar_file = fs::File::open(tar_path)?;
let mut archive = tar::Archive::new(&mut src_tar_file);
for entry_result in archive.entries()? {
let mut entry = entry_result?;
let header = entry.header().clone();
let path = entry.path()?.to_path_buf();
builder.append_data(&mut header.clone(), path, &mut entry)?;
}
let mut header = tar::Header::new_gnu();
header.set_size(index_str.len() as u64);
header.set_path("archive.tar.idx").unwrap();
header.set_cksum();
builder.append(&header, index_str.as_bytes())?;
builder.finish()?;
Ok(final_tar_path)
}
async fn upload_tar(&self, tar_path: &PathBuf, store_type: StoreType) -> Result<Upload, TarchiveError> {
let tar_data = fs::read(tar_path)?;
let chunk = self.public_data_service.create_public_data(Bytes::from(tar_data), store_type).await?;
Ok(Upload::new(chunk.address))
}
fn create_tmp_dir() -> Result<PathBuf, io::Error> {
let random_name = Uuid::new_v4();
let tmp_dir = env::temp_dir().as_path().join(random_name.to_string());
create_dir(&tmp_dir)?;
info!("Created temporary directory for tarchive: {:?}", &tmp_dir);
Ok(tmp_dir)
}
fn purge_tmp_dir(tmp_dir: &PathBuf) {
fs::remove_dir_all(tmp_dir.clone()).unwrap_or_else(|e| warn!("failed to delete temporary directory at [{:?}]: {}", tmp_dir, e));
}
}
#[cfg(test)]
mod tests {
use ant_core::data::EvmNetwork;
use super::*;
use crate::service::resolver_service::MockResolverService;
use crate::service::file_service::MockFileService;
use crate::client::{MockPublicDataCachingClient, MockChunkCachingClient, MockTArchiveCachingClient};
use clap::Parser;
fn create_mock_service() -> TarchiveService {
let mock_client = MockPublicDataCachingClient::default();
let mut mock_chunk_client = MockChunkCachingClient::default();
mock_chunk_client.expect_clone()
.returning(|| {
let mut m = MockChunkCachingClient::default();
m.expect_clone().returning(MockChunkCachingClient::default);
m
});
let mock_tarchive_client = MockTArchiveCachingClient::default();
let mut mock_resolver = MockResolverService::default();
let config = AntTpConfig::parse_from(&["anttp", "--app-private-key", "55dcbc4624699d219b8ec293339a3b81e68815397f5a502026784d8122d09fce"]);
mock_resolver.expect_resolve_name()
.returning(|address| Some(address.clone()));
mock_resolver.expect_clone()
.returning(|| {
let mut m = MockResolverService::default();
m.expect_resolve_name()
.returning(|address| Some(address.clone()));
m
});
let public_data_service = PublicDataService::new(mock_client, mock_resolver.clone());
let mut file_service = MockFileService::default();
file_service.expect_clone().returning(MockFileService::default);
TarchiveService::new(public_data_service, mock_tarchive_client, file_service, mock_resolver, config)
}
#[test]
fn test_truncate_tarchive() {
let mut mock_client = MockPublicDataCachingClient::default();
let mut initial_tar = Vec::new();
{
let mut builder = Builder::new(&mut initial_tar);
let data = b"content1";
let mut header = tar::Header::new_gnu();
header.set_size(data.len() as u64);
header.set_path("keep.txt").unwrap();
header.set_cksum();
builder.append(&header, &data[..]).unwrap();
let data2 = b"content2";
let mut header2 = tar::Header::new_gnu();
header2.set_size(data2.len() as u64);
header2.set_path("delete.txt").unwrap();
header2.set_cksum();
builder.append(&header2, &data2[..]).unwrap();
builder.finish().unwrap();
}
let initial_tar_bytes = Bytes::from(initial_tar);
let get_tar_bytes = initial_tar_bytes.clone();
mock_client.expect_data_get_public()
.returning(move |_| Ok(get_tar_bytes.clone()));
let xor_name = XorName::default();
mock_client.expect_data_put_public()
.returning(|_, _| Ok(XorName::default()));
let mut mock_resolver = MockResolverService::default();
mock_resolver.expect_resolve_name()
.returning(|address| Some(address.clone()));
mock_resolver.expect_clone()
.returning(|| {
let mut m = MockResolverService::default();
m.expect_resolve_name()
.returning(|address| Some(address.clone()));
m
});
let public_data_service = PublicDataService::new(mock_client, mock_resolver.clone());
let mock_tarchive_client = MockTArchiveCachingClient::default();
let mut file_service = MockFileService::default();
file_service.expect_clone().returning(MockFileService::default);
let config = AntTpConfig::parse_from(&["anttp", "--app-private-key", "55dcbc4624699d219b8ec293339a3b81e68815397f5a502026784d8122d09fce"]);
let service = TarchiveService::new(public_data_service, mock_tarchive_client, file_service, mock_resolver, config);
let wallet = Wallet::new_with_random_wallet(EvmNetwork::ArbitrumOne);
let result = tokio::runtime::Runtime::new().unwrap().block_on(
service.truncate_tarchive(xor_name.encode_hex(), "delete.txt".to_string(), wallet, StoreType::Memory)
).unwrap();
assert!(result.address.is_some());
}
#[test]
fn test_get_tarchive_directory_listing() {
let mock_client = MockPublicDataCachingClient::default();
let mut mock_tarchive_client = MockTArchiveCachingClient::default();
let index_data = "file1.txt 512 11\n";
mock_tarchive_client.expect_get_archive_from_tar()
.returning(move |_| Ok(Bytes::from(index_data)));
let mut mock_resolver = MockResolverService::default();
mock_resolver.expect_resolve_name()
.returning(|address| Some(address.clone()));
mock_resolver.expect_clone()
.returning(|| {
let mut m = MockResolverService::default();
m.expect_resolve_name()
.returning(|address| Some(address.clone()));
m
});
let public_data_service = PublicDataService::new(mock_client, mock_resolver.clone());
let mut file_service = MockFileService::default();
file_service.expect_clone().returning(MockFileService::default);
let config = AntTpConfig::parse_from(&["anttp"]);
let service = TarchiveService::new(public_data_service, mock_tarchive_client, file_service, mock_resolver, config);
let xor_name = XorName::default();
let result = tokio::runtime::Runtime::new().unwrap().block_on(
service.get_tarchive(xor_name.encode_hex(), None)
).unwrap();
assert_eq!(result.items.len(), 1);
assert_eq!(result.items[0].display, "file1.txt");
assert!(result.content.is_empty());
}
#[tokio::test]
async fn test_push_tarchive_success() {
let mut mock_client = MockPublicDataCachingClient::default();
let xor_name = XorName::default();
let expected_hex = xor_name.encode_hex();
let bytes = Bytes::from("test tar data");
let get_bytes = bytes.clone();
mock_client
.expect_data_get_public()
.returning(move |_| Ok(get_bytes.clone()));
mock_client
.expect_data_put_public()
.returning(move |_, _| Ok(xor_name));
let mut mock_resolver = MockResolverService::default();
mock_resolver.expect_resolve_name()
.returning(|address| Some(address.clone()));
mock_resolver.expect_clone()
.returning(|| {
let mut m = MockResolverService::default();
m.expect_resolve_name()
.returning(|address| Some(address.clone()));
m
});
let public_data_service = PublicDataService::new(mock_client, mock_resolver.clone());
let mut file_service = MockFileService::default();
file_service.expect_clone().returning(MockFileService::default);
let mock_tarchive_client = MockTArchiveCachingClient::default();
let config = AntTpConfig::parse_from(&["anttp"]);
let service = TarchiveService::new(public_data_service, mock_tarchive_client, file_service, mock_resolver, config);
let wallet = Wallet::new_with_random_wallet(EvmNetwork::ArbitrumOne);
let result = service.push_tarchive(xor_name.encode_hex(), wallet, StoreType::Network).await;
assert!(result.is_ok());
let upload = result.unwrap();
assert_eq!(upload.address, Some(expected_hex));
}
#[test]
fn test_get_tarchive_file() {
let mock_client = MockPublicDataCachingClient::default();
let mut mock_tarchive_client = MockTArchiveCachingClient::default();
let index_data = "file1.txt 512 8\n";
mock_tarchive_client.expect_get_archive_from_tar()
.returning(move |_| Ok(Bytes::from(index_data)));
let mut mock_resolver = MockResolverService::default();
mock_resolver.expect_resolve_name()
.returning(|address| Some(address.clone()));
mock_resolver.expect_clone()
.returning(|| {
let mut m = MockResolverService::default();
m.expect_resolve_name()
.returning(|address| Some(address.clone()));
m
});
let public_data_service = PublicDataService::new(mock_client, mock_resolver.clone());
let mut file_service = MockFileService::default();
file_service.expect_clone().returning(MockFileService::default);
file_service.expect_download_data_bytes()
.returning(|_, _, _| Ok(bytes::BytesMut::from(b"content1".as_slice())));
let config = AntTpConfig::parse_from(&["anttp"]);
let service = TarchiveService::new(public_data_service, mock_tarchive_client, file_service, mock_resolver, config);
let xor_name = XorName::default();
let address = xor_name.encode_hex();
let result = tokio::runtime::Runtime::new().unwrap().block_on(
service.get_tarchive(address, Some("file1.txt".to_string()))
).unwrap();
assert!(result.items.is_empty());
assert_eq!(BASE64_STANDARD.decode(result.content).unwrap(), b"content1");
}
#[test]
fn test_rebuild_with_index() {
let service = create_mock_service();
let tmp_dir = TarchiveService::create_tmp_dir().unwrap();
let tar_path = tmp_dir.join("test.tar");
{
let file = fs::File::create(&tar_path).unwrap();
let mut builder = Builder::new(file);
let mut header = tar::Header::new_gnu();
let data = b"content";
header.set_size(data.len() as u64);
header.set_path("test.txt").unwrap();
header.set_cksum();
builder.append(&header, &data[..]).unwrap();
builder.finish().unwrap();
}
let final_tar_path = service.rebuild_with_index(&tar_path, &tmp_dir).unwrap();
assert!(final_tar_path.exists());
let file = fs::File::open(final_tar_path).unwrap();
let mut archive = tar::Archive::new(file);
let entries: Vec<_> = archive.entries().unwrap().map(|e| e.unwrap().path().unwrap().to_str().unwrap().to_string()).collect();
assert!(entries.contains(&"test.txt".to_string()));
assert!(entries.contains(&"archive.tar.idx".to_string()));
TarchiveService::purge_tmp_dir(&tmp_dir);
}
#[test]
fn test_concurrent_truncate_same_address_serialized() {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use std::thread::sleep;
let mut mock_client = MockPublicDataCachingClient::default();
let mut initial_tar = Vec::new();
{
let mut builder = Builder::new(&mut initial_tar);
let data = b"content1";
let mut header = tar::Header::new_gnu();
header.set_size(data.len() as u64);
header.set_path("keep.txt").unwrap();
header.set_cksum();
builder.append(&header, &data[..]).unwrap();
builder.finish().unwrap();
}
let initial_tar_bytes = Bytes::from(initial_tar);
let put_counter = Arc::new(AtomicUsize::new(0));
let counter_for_orig = Arc::clone(&put_counter);
let getb_for_orig = initial_tar_bytes.clone();
mock_client
.expect_data_get_public()
.returning(move |_| Ok(getb_for_orig.clone()));
mock_client
.expect_data_put_public()
.returning(move |_, _| {
counter_for_orig.fetch_add(1, Ordering::SeqCst);
sleep(Duration::from_millis(100));
Ok(XorName::from([1; 32]))
});
let getb_for_clone = initial_tar_bytes.clone();
let counter_for_clone = Arc::clone(&put_counter);
mock_client
.expect_clone()
.returning(move || {
let mut m = MockPublicDataCachingClient::default();
let getb = getb_for_clone.clone();
let c = Arc::clone(&counter_for_clone);
m.expect_data_get_public().returning(move |_| Ok(getb.clone()));
m.expect_data_put_public().returning(move |_, _| {
c.fetch_add(1, Ordering::SeqCst);
sleep(Duration::from_millis(100));
Ok(XorName::from([1; 32]))
});
m
});
let mut mock_resolver = MockResolverService::default();
mock_resolver
.expect_resolve_name()
.returning(|address| Some(address.clone()));
mock_resolver.expect_clone().returning(|| {
let mut m = MockResolverService::default();
m.expect_resolve_name()
.returning(|address| Some(address.clone()));
m.expect_clone()
.returning(|| {
let mut inner = MockResolverService::default();
inner.expect_resolve_name().returning(|address| Some(address.clone()));
inner.expect_clone().returning(MockResolverService::default);
inner
});
m
});
let public_data_service = PublicDataService::new(mock_client, mock_resolver.clone());
let mut file_service = MockFileService::default();
file_service.expect_clone().returning(MockFileService::default);
let mut mock_tarchive_client = MockTArchiveCachingClient::default();
mock_tarchive_client.expect_clone().returning(MockTArchiveCachingClient::default);
let config = AntTpConfig::parse_from(&["anttp", "--app-private-key", "55dcbc4624699d219b8ec293339a3b81e68815397f5a502026784d8122d09fce"]);
let service = TarchiveService::new(public_data_service, mock_tarchive_client, file_service, mock_resolver, config);
let rt = tokio::runtime::Runtime::new().unwrap();
let wallet1 = Wallet::new_with_random_wallet(EvmNetwork::ArbitrumOne);
let wallet2 = Wallet::new_with_random_wallet(EvmNetwork::ArbitrumOne);
let addr = hex::encode(XorName::from([9; 32]));
let start = Instant::now();
rt.block_on(async {
let s1 = service.clone();
let s2 = service.clone();
let addr_a = addr.clone();
let addr_b = addr.clone();
let f1 = tokio::spawn(async move {
s1.truncate_tarchive(addr_a, "keep.txt".to_string(), wallet1, StoreType::Memory)
.await
.unwrap()
});
let f2 = tokio::spawn(async move {
s2.truncate_tarchive(addr_b, "keep.txt".to_string(), wallet2, StoreType::Memory)
.await
.unwrap()
});
let _ = futures_util::future::join(f1, f2).await;
});
let elapsed = start.elapsed();
assert_eq!(put_counter.load(Ordering::SeqCst), 2);
assert!(elapsed >= Duration::from_millis(180));
}
#[test]
fn test_concurrent_truncate_different_addresses_parallel() {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use std::thread::sleep;
let mut mock_client = MockPublicDataCachingClient::default();
let mut initial_tar = Vec::new();
{
let mut builder = Builder::new(&mut initial_tar);
let data = b"content1";
let mut header = tar::Header::new_gnu();
header.set_size(data.len() as u64);
header.set_path("keep.txt").unwrap();
header.set_cksum();
builder.append(&header, &data[..]).unwrap();
builder.finish().unwrap();
}
let initial_tar_bytes = Bytes::from(initial_tar);
let call_counter = Arc::new(AtomicUsize::new(0));
let counter_for_orig = Arc::clone(&call_counter);
let getb_for_orig = initial_tar_bytes.clone();
mock_client
.expect_data_get_public()
.returning(move |_| Ok(getb_for_orig.clone()));
mock_client
.expect_data_put_public()
.returning(move |_, _| {
counter_for_orig.fetch_add(1, Ordering::SeqCst);
sleep(Duration::from_millis(100));
Ok(XorName::from([3; 32]))
});
let getb_for_clone = initial_tar_bytes.clone();
let counter_for_clone = Arc::clone(&call_counter);
mock_client
.expect_clone()
.returning(move || {
let mut m = MockPublicDataCachingClient::default();
let getb = getb_for_clone.clone();
let c = Arc::clone(&counter_for_clone);
m.expect_data_get_public().returning(move |_| Ok(getb.clone()));
m.expect_data_put_public().returning(move |_, _| {
c.fetch_add(1, Ordering::SeqCst);
sleep(Duration::from_millis(100));
Ok(XorName::from([3; 32]))
});
m
});
let mut mock_resolver = MockResolverService::default();
mock_resolver
.expect_resolve_name()
.returning(|address| Some(address.clone()));
mock_resolver.expect_clone().returning(|| {
let mut m = MockResolverService::default();
m.expect_resolve_name()
.returning(|address| Some(address.clone()));
m.expect_clone()
.returning(|| {
let mut inner = MockResolverService::default();
inner.expect_resolve_name().returning(|address| Some(address.clone()));
inner.expect_clone().returning(MockResolverService::default);
inner
});
m
});
let public_data_service = PublicDataService::new(mock_client, mock_resolver.clone());
let mut file_service = MockFileService::default();
file_service.expect_clone().returning(MockFileService::default);
let mut mock_tarchive_client = MockTArchiveCachingClient::default();
mock_tarchive_client.expect_clone().returning(MockTArchiveCachingClient::default);
let config = AntTpConfig::parse_from(&["anttp", "--app-private-key", "55dcbc4624699d219b8ec293339a3b81e68815397f5a502026784d8122d09fce"]);
let service = TarchiveService::new(public_data_service, mock_tarchive_client, file_service, mock_resolver, config);
let rt = tokio::runtime::Runtime::new().unwrap();
let wallet1 = Wallet::new_with_random_wallet(EvmNetwork::ArbitrumOne);
let wallet2 = Wallet::new_with_random_wallet(EvmNetwork::ArbitrumOne);
let addr1 = hex::encode(XorName::from([9; 32]));
let addr2 = hex::encode(XorName::from([8; 32]));
let start = Instant::now();
rt.block_on(async {
let s1 = service.clone();
let s2 = service.clone();
let a1 = addr1.clone();
let a2 = addr2.clone();
let f1 = tokio::spawn(async move {
s1.truncate_tarchive(a1, "keep.txt".to_string(), wallet1, StoreType::Memory)
.await
.unwrap()
});
let f2 = tokio::spawn(async move {
s2.truncate_tarchive(a2, "keep.txt".to_string(), wallet2, StoreType::Memory)
.await
.unwrap()
});
let _ = futures_util::future::join(f1, f2).await;
});
let elapsed = start.elapsed();
assert_eq!(call_counter.load(Ordering::SeqCst), 2);
assert!(elapsed < Duration::from_millis(180));
}
}