use std::future::Future;
use std::sync::Arc;
use bytes::Bytes;
use xet_core_structures::merklehash::MerkleHash;
use super::{ClientTestingUtils, DirectAccessClient};
use crate::cas_types::FileRange;
use crate::error::ClientError;
pub async fn test_client_functionality<Fut>(factory: impl Fn() -> Fut)
where
Fut: Future<Output = Arc<dyn DirectAccessClient>>,
{
test_reconstruction_merges_adjacent_ranges(factory().await).await;
test_reconstruction_with_multiple_xorbs(factory().await).await;
test_reconstruction_overlapping_range_merging(factory().await).await;
test_range_requests(factory().await).await;
test_upload_configurations(factory().await).await;
test_chunk_boundary_shrinking(factory().await).await;
test_chunk_boundary_multiple_segments(factory().await).await;
test_batch_reconstruction(factory().await).await;
test_basic_xorb_put_get(factory().await).await;
test_xorb_ranges(factory().await).await;
test_xorb_length(factory().await).await;
test_missing_xorb(factory().await).await;
test_xorb_list_and_delete(factory().await).await;
test_get_file_data(factory().await).await;
test_get_file_data_with_ranges(factory().await).await;
test_get_file_size(factory().await).await;
test_global_dedup(factory().await).await;
test_v2_reconstruction_basic(factory().await).await;
test_v2_reconstruction_ranges(factory().await).await;
test_v2_reconstruction_matches_v1(factory().await).await;
test_v2_max_ranges_per_fetch(factory().await).await;
test_v2_url_encoding(factory().await).await;
}
pub async fn test_reconstruction_merges_adjacent_ranges(client: Arc<dyn DirectAccessClient>) {
let term_spec = &[(1, (0, 2)), (1, (2, 4))];
let file = client.upload_random_file(term_spec, 2048).await.unwrap();
let reconstruction = client.get_reconstruction_v1(&file.file_hash, None).await.unwrap().unwrap();
assert_eq!(reconstruction.terms.len(), 2);
assert_eq!(reconstruction.fetch_info.len(), 1);
let xorb_hash_hex = reconstruction.terms[0].hash;
let fetch_infos = reconstruction.fetch_info.get(&xorb_hash_hex).unwrap();
assert_eq!(fetch_infos.len(), 1);
assert_eq!(fetch_infos[0].range.start, 0);
assert_eq!(fetch_infos[0].range.end, 4);
}
pub async fn test_reconstruction_with_multiple_xorbs(client: Arc<dyn DirectAccessClient>) {
let term_spec = &[(1, (0, 3)), (2, (0, 2)), (1, (3, 5))];
let file = client.upload_random_file(term_spec, 2048).await.unwrap();
let reconstruction = client.get_reconstruction_v1(&file.file_hash, None).await.unwrap().unwrap();
assert_eq!(reconstruction.terms.len(), 3);
assert_eq!(reconstruction.fetch_info.len(), 2);
}
pub async fn test_reconstruction_overlapping_range_merging(client: Arc<dyn DirectAccessClient>) {
let chunk_size = 2048usize;
{
let term_spec = &[(1, (0, 3)), (1, (1, 4))];
let file = client.upload_random_file(term_spec, chunk_size).await.unwrap();
let reconstruction = client.get_reconstruction_v1(&file.file_hash, None).await.unwrap().unwrap();
assert_eq!(reconstruction.terms.len(), 2);
assert_eq!(reconstruction.fetch_info.len(), 1);
let xorb_hash_hex = reconstruction.terms[0].hash;
let fetch_infos = reconstruction.fetch_info.get(&xorb_hash_hex).unwrap();
assert_eq!(fetch_infos.len(), 1);
assert_eq!(fetch_infos[0].range.start, 0);
assert_eq!(fetch_infos[0].range.end, 4);
}
{
let term_spec = &[(1, (0, 5)), (1, (1, 3))];
let file = client.upload_random_file(term_spec, chunk_size).await.unwrap();
let reconstruction = client.get_reconstruction_v1(&file.file_hash, None).await.unwrap().unwrap();
assert_eq!(reconstruction.terms.len(), 2);
assert_eq!(reconstruction.fetch_info.len(), 1);
let xorb_hash_hex = reconstruction.terms[0].hash;
let fetch_infos = reconstruction.fetch_info.get(&xorb_hash_hex).unwrap();
assert_eq!(fetch_infos.len(), 1);
assert_eq!(fetch_infos[0].range.start, 0);
assert_eq!(fetch_infos[0].range.end, 5);
}
{
let term_spec = &[(1, (0, 2)), (1, (1, 4)), (1, (3, 6))];
let file = client.upload_random_file(term_spec, chunk_size).await.unwrap();
let reconstruction = client.get_reconstruction_v1(&file.file_hash, None).await.unwrap().unwrap();
assert_eq!(reconstruction.terms.len(), 3);
assert_eq!(reconstruction.fetch_info.len(), 1);
let xorb_hash_hex = reconstruction.terms[0].hash;
let fetch_infos = reconstruction.fetch_info.get(&xorb_hash_hex).unwrap();
assert_eq!(fetch_infos.len(), 1);
assert_eq!(fetch_infos[0].range.start, 0);
assert_eq!(fetch_infos[0].range.end, 6);
}
{
let term_spec = &[(1, (0, 2)), (1, (4, 6))];
let file = client.upload_random_file(term_spec, chunk_size).await.unwrap();
let reconstruction = client.get_reconstruction_v1(&file.file_hash, None).await.unwrap().unwrap();
assert_eq!(reconstruction.terms.len(), 2);
assert_eq!(reconstruction.fetch_info.len(), 1);
let xorb_hash_hex = reconstruction.terms[0].hash;
let fetch_infos = reconstruction.fetch_info.get(&xorb_hash_hex).unwrap();
assert_eq!(fetch_infos.len(), 2);
assert_eq!(fetch_infos[0].range.start, 0);
assert_eq!(fetch_infos[0].range.end, 2);
assert_eq!(fetch_infos[1].range.start, 4);
assert_eq!(fetch_infos[1].range.end, 6);
}
{
let term_spec = &[(1, (0, 3)), (1, (3, 5))];
let file = client.upload_random_file(term_spec, chunk_size).await.unwrap();
let reconstruction = client.get_reconstruction_v1(&file.file_hash, None).await.unwrap().unwrap();
assert_eq!(reconstruction.terms.len(), 2);
assert_eq!(reconstruction.fetch_info.len(), 1);
let xorb_hash_hex = reconstruction.terms[0].hash;
let fetch_infos = reconstruction.fetch_info.get(&xorb_hash_hex).unwrap();
assert_eq!(fetch_infos.len(), 1);
assert_eq!(fetch_infos[0].range.start, 0);
assert_eq!(fetch_infos[0].range.end, 5);
}
{
let term_spec = &[(1, (2, 5)), (1, (2, 5)), (1, (2, 5))];
let file = client.upload_random_file(term_spec, chunk_size).await.unwrap();
let reconstruction = client.get_reconstruction_v1(&file.file_hash, None).await.unwrap().unwrap();
assert_eq!(reconstruction.terms.len(), 3);
assert_eq!(reconstruction.fetch_info.len(), 1);
let xorb_hash_hex = reconstruction.terms[0].hash;
let fetch_infos = reconstruction.fetch_info.get(&xorb_hash_hex).unwrap();
assert_eq!(fetch_infos.len(), 1);
assert_eq!(fetch_infos[0].range.start, 2);
assert_eq!(fetch_infos[0].range.end, 5);
}
{
let term_spec = &[(1, (0, 3)), (1, (2, 4)), (1, (6, 8)), (1, (7, 10))];
let file = client.upload_random_file(term_spec, chunk_size).await.unwrap();
let reconstruction = client.get_reconstruction_v1(&file.file_hash, None).await.unwrap().unwrap();
assert_eq!(reconstruction.terms.len(), 4);
assert_eq!(reconstruction.fetch_info.len(), 1);
let xorb_hash_hex = reconstruction.terms[0].hash;
let fetch_infos = reconstruction.fetch_info.get(&xorb_hash_hex).unwrap();
assert_eq!(fetch_infos.len(), 2);
assert_eq!(fetch_infos[0].range.start, 0);
assert_eq!(fetch_infos[0].range.end, 4);
assert_eq!(fetch_infos[1].range.start, 6);
assert_eq!(fetch_infos[1].range.end, 10);
}
}
pub async fn test_range_requests(client: Arc<dyn DirectAccessClient>) {
let term_spec = &[(1, (0, 5))];
let file = client.upload_random_file(term_spec, 2048).await.unwrap();
let reconstruction_full = client.get_reconstruction_v1(&file.file_hash, None).await.unwrap().unwrap();
let total_file_size: u64 = reconstruction_full.terms.iter().map(|t| t.unpacked_length as u64).sum();
let response = client
.get_reconstruction_v1(&file.file_hash, Some(FileRange::new(total_file_size / 2, total_file_size + 1000)))
.await
.unwrap()
.unwrap();
assert!(!response.terms.is_empty());
assert!(response.offset_into_first_range > 0);
let result = client
.get_reconstruction_v1(&file.file_hash, Some(FileRange::new(total_file_size + 100, total_file_size + 1000)))
.await;
assert!(result.unwrap().is_none());
let result = client
.get_reconstruction_v1(&file.file_hash, Some(FileRange::new(total_file_size, total_file_size + 100)))
.await;
assert!(result.unwrap().is_none());
let response = client
.get_reconstruction_v1(&file.file_hash, Some(FileRange::new(0, total_file_size / 2)))
.await
.unwrap()
.unwrap();
assert!(!response.terms.is_empty());
assert_eq!(response.offset_into_first_range, 0);
let response = client
.get_reconstruction_v1(&file.file_hash, Some(FileRange::new(0, total_file_size)))
.await
.unwrap()
.unwrap();
let total_unpacked: u64 = response.terms.iter().map(|t| t.unpacked_length as u64).sum();
assert_eq!(total_unpacked, total_file_size);
}
pub async fn test_upload_configurations(client: Arc<dyn DirectAccessClient>) {
{
let file = client.upload_random_file(&[(1, (0, 3))], 2048).await.unwrap();
let reconstruction = client.get_reconstruction_v1(&file.file_hash, None).await.unwrap().unwrap();
assert_eq!(reconstruction.terms.len(), 1);
}
{
let term_spec = &[(1, (0, 2)), (1, (2, 4)), (1, (4, 6))];
let file = client.upload_random_file(term_spec, 2048).await.unwrap();
let reconstruction = client.get_reconstruction_v1(&file.file_hash, None).await.unwrap().unwrap();
assert_eq!(reconstruction.terms.len(), 3);
assert_eq!(reconstruction.fetch_info.len(), 1);
}
{
let term_spec = &[(1, (0, 3)), (2, (0, 2)), (3, (0, 4))];
let file = client.upload_random_file(term_spec, 2048).await.unwrap();
let reconstruction = client.get_reconstruction_v1(&file.file_hash, None).await.unwrap().unwrap();
assert_eq!(reconstruction.terms.len(), 3);
assert_eq!(reconstruction.fetch_info.len(), 3);
}
{
let term_spec = &[(1, (0, 3)), (1, (1, 4)), (1, (2, 5))];
let file = client.upload_random_file(term_spec, 2048).await.unwrap();
let reconstruction = client.get_reconstruction_v1(&file.file_hash, None).await.unwrap().unwrap();
assert_eq!(reconstruction.terms.len(), 3);
assert_eq!(reconstruction.fetch_info.len(), 1);
}
}
pub async fn test_chunk_boundary_shrinking(client: Arc<dyn DirectAccessClient>) {
let chunk_size: usize = 2048;
let term_spec = &[(1, (0, 5))];
let file = client.upload_random_file(term_spec, chunk_size).await.unwrap();
let reconstruction_full = client.get_reconstruction_v1(&file.file_hash, None).await.unwrap().unwrap();
let total_file_size: u64 = reconstruction_full.terms.iter().map(|t| t.unpacked_length as u64).sum();
assert_eq!(total_file_size, (5 * chunk_size) as u64);
{
let start = chunk_size as u64 + 500;
let end = total_file_size;
let response = client
.get_reconstruction_v1(&file.file_hash, Some(FileRange::new(start, end)))
.await
.unwrap()
.unwrap();
assert_eq!(response.terms.len(), 1);
assert_eq!(response.terms[0].range.start, 1);
assert_eq!(response.terms[0].range.end, 5);
assert_eq!(response.offset_into_first_range, 500);
}
{
let start = (chunk_size * 2) as u64;
let end = total_file_size;
let response = client
.get_reconstruction_v1(&file.file_hash, Some(FileRange::new(start, end)))
.await
.unwrap()
.unwrap();
assert_eq!(response.terms.len(), 1);
assert_eq!(response.terms[0].range.start, 2);
assert_eq!(response.terms[0].range.end, 5);
assert_eq!(response.offset_into_first_range, 0);
}
{
let start = 0u64;
let end = (chunk_size * 2) as u64 + 500;
let response = client
.get_reconstruction_v1(&file.file_hash, Some(FileRange::new(start, end)))
.await
.unwrap()
.unwrap();
assert_eq!(response.terms.len(), 1);
assert_eq!(response.terms[0].range.start, 0);
assert_eq!(response.terms[0].range.end, 3);
assert_eq!(response.offset_into_first_range, 0);
}
{
let start = (chunk_size * 2) as u64 + 100;
let end = (chunk_size * 2) as u64 + 500;
let response = client
.get_reconstruction_v1(&file.file_hash, Some(FileRange::new(start, end)))
.await
.unwrap()
.unwrap();
assert_eq!(response.terms.len(), 1);
assert_eq!(response.terms[0].range.start, 2);
assert_eq!(response.terms[0].range.end, 3);
assert_eq!(response.offset_into_first_range, 100);
}
{
let start = chunk_size as u64 - 100;
let end = chunk_size as u64 + 100;
let response = client
.get_reconstruction_v1(&file.file_hash, Some(FileRange::new(start, end)))
.await
.unwrap()
.unwrap();
assert_eq!(response.terms.len(), 1);
assert_eq!(response.terms[0].range.start, 0);
assert_eq!(response.terms[0].range.end, 2);
assert_eq!(response.offset_into_first_range, chunk_size as u64 - 100);
}
}
pub async fn test_chunk_boundary_multiple_segments(client: Arc<dyn DirectAccessClient>) {
let chunk_size = 2048usize;
let term_spec = &[(1, (0, 4)), (2, (0, 4))];
let file = client.upload_random_file(term_spec, chunk_size).await.unwrap();
let reconstruction_full = client.get_reconstruction_v1(&file.file_hash, None).await.unwrap().unwrap();
let total_file_size: u64 = reconstruction_full.terms.iter().map(|t| t.unpacked_length as u64).sum();
assert_eq!(total_file_size, (8 * chunk_size) as u64);
{
let start = chunk_size as u64 + 500;
let end = total_file_size;
let response = client
.get_reconstruction_v1(&file.file_hash, Some(FileRange::new(start, end)))
.await
.unwrap()
.unwrap();
assert_eq!(response.terms.len(), 2);
assert_eq!(response.terms[0].range.start, 1);
assert_eq!(response.terms[0].range.end, 4);
assert_eq!(response.terms[1].range.start, 0);
assert_eq!(response.terms[1].range.end, 4);
assert_eq!(response.offset_into_first_range, 500);
}
{
let start = chunk_size as u64;
let end = (chunk_size * 3) as u64;
let response = client
.get_reconstruction_v1(&file.file_hash, Some(FileRange::new(start, end)))
.await
.unwrap()
.unwrap();
assert_eq!(response.terms.len(), 1);
assert_eq!(response.terms[0].range.start, 1);
assert_eq!(response.terms[0].range.end, 3);
assert_eq!(response.offset_into_first_range, 0);
}
{
let xorb1_size = (chunk_size * 4) as u64;
let start = xorb1_size + chunk_size as u64;
let end = xorb1_size + (chunk_size * 3) as u64;
let response = client
.get_reconstruction_v1(&file.file_hash, Some(FileRange::new(start, end)))
.await
.unwrap()
.unwrap();
assert_eq!(response.terms.len(), 1);
assert_eq!(response.terms[0].range.start, 1);
assert_eq!(response.terms[0].range.end, 3);
assert_eq!(response.offset_into_first_range, 0);
}
{
let xorb1_size = (chunk_size * 4) as u64;
let start = (chunk_size * 2) as u64;
let end = xorb1_size + (chunk_size * 2) as u64 + 500;
let response = client
.get_reconstruction_v1(&file.file_hash, Some(FileRange::new(start, end)))
.await
.unwrap()
.unwrap();
assert_eq!(response.terms.len(), 2);
assert_eq!(response.terms[0].range.start, 2);
assert_eq!(response.terms[0].range.end, 4);
assert_eq!(response.terms[1].range.start, 0);
assert_eq!(response.terms[1].range.end, 3);
assert_eq!(response.offset_into_first_range, 0);
}
}
pub async fn test_batch_reconstruction(client: Arc<dyn DirectAccessClient>) {
let file1 = client.upload_random_file(&[(1, (0, 3))], 2048).await.unwrap();
let file2 = client.upload_random_file(&[(2, (0, 4))], 2048).await.unwrap();
let file3 = client.upload_random_file(&[(3, (0, 2))], 2048).await.unwrap();
let batch_response = client
.batch_get_reconstruction(&[file1.file_hash, file2.file_hash, file3.file_hash])
.await
.unwrap();
assert_eq!(batch_response.files.len(), 3);
assert!(batch_response.files.contains_key(&file1.file_hash.into()));
assert!(batch_response.files.contains_key(&file2.file_hash.into()));
assert!(batch_response.files.contains_key(&file3.file_hash.into()));
}
pub async fn test_basic_xorb_put_get(client: Arc<dyn DirectAccessClient>) {
let file = client.upload_random_file(&[(1, (0, 3))], 2048).await.unwrap();
let xorb_hash = file.term_xorb_hash(0).unwrap();
assert!(client.xorb_exists(&xorb_hash).await.unwrap());
let xorb_data = client.get_full_xorb(&xorb_hash).await.unwrap();
assert!(!xorb_data.is_empty());
assert_eq!(xorb_data, file.data);
}
pub async fn test_xorb_ranges(client: Arc<dyn DirectAccessClient>) {
let file = client.upload_random_file(&[(1, (0, 4))], 2048).await.unwrap();
let xorb_hash = file.term_xorb_hash(0).unwrap();
let ranges = vec![(0, 2), (2, 4)];
let result = client.get_xorb_ranges(&xorb_hash, ranges).await.unwrap();
assert_eq!(result.len(), 2);
let combined: Bytes = [result[0].as_ref(), result[1].as_ref()].concat().into();
assert_eq!(combined, file.data);
let empty_result = client.get_xorb_ranges(&xorb_hash, vec![]).await.unwrap();
assert_eq!(empty_result.len(), 1);
assert!(empty_result[0].is_empty());
let empty_range_result = client.get_xorb_ranges(&xorb_hash, vec![(2, 2)]).await.unwrap();
assert_eq!(empty_range_result.len(), 1);
assert!(empty_range_result[0].is_empty());
}
pub async fn test_xorb_length(client: Arc<dyn DirectAccessClient>) {
let file = client.upload_random_file(&[(1, (0, 3))], 2048).await.unwrap();
let xorb_hash = file.term_xorb_hash(0).unwrap();
let length = client.xorb_length(&xorb_hash).await.unwrap();
assert_eq!(length as usize, file.data.len());
}
pub async fn test_missing_xorb(client: Arc<dyn DirectAccessClient>) {
let fake_hash = xet_core_structures::merklehash::MerkleHash::from_hex(
"d760aaf4beb07581956e24c847c47f1abd2e419166aa68259035bc412232e9da",
)
.unwrap();
assert!(!client.xorb_exists(&fake_hash).await.unwrap());
let result = client.get_full_xorb(&fake_hash).await;
assert!(matches!(result, Err(ClientError::XORBNotFound(_))));
let result = client.xorb_length(&fake_hash).await;
assert!(matches!(result, Err(ClientError::XORBNotFound(_))));
let result = client.get_xorb_ranges(&fake_hash, vec![(0, 1)]).await;
assert!(matches!(result, Err(ClientError::XORBNotFound(_))));
}
pub async fn test_xorb_list_and_delete(client: Arc<dyn DirectAccessClient>) {
let initial_list = client.list_xorbs().await.unwrap();
assert!(initial_list.is_empty());
let file = client.upload_random_file(&[(1, (0, 2))], 2048).await.unwrap();
let xorb_hash = file.term_xorb_hash(0).unwrap();
let list = client.list_xorbs().await.unwrap();
assert_eq!(list.len(), 1);
assert!(list.contains(&xorb_hash));
client.delete_xorb(&xorb_hash).await;
let final_list = client.list_xorbs().await.unwrap();
assert!(final_list.is_empty());
assert!(!client.xorb_exists(&xorb_hash).await.unwrap());
client.delete_xorb(&xorb_hash).await;
}
pub async fn test_get_file_data(client: Arc<dyn DirectAccessClient>) {
let file = client.upload_random_file(&[(1, (0, 4))], 2048).await.unwrap();
let data = client.get_file_data(&file.file_hash, None).await.unwrap();
assert_eq!(data, file.data);
let file2 = client.upload_random_file(&[(1, (0, 2)), (2, (0, 3))], 2048).await.unwrap();
let data2 = client.get_file_data(&file2.file_hash, None).await.unwrap();
assert_eq!(data2, file2.data);
}
pub async fn test_get_file_data_with_ranges(client: Arc<dyn DirectAccessClient>) {
let file = client.upload_random_file(&[(1, (0, 5))], 2048).await.unwrap();
let file_size = file.data.len() as u64;
let half = file_size / 2;
let first_half = client
.get_file_data(&file.file_hash, Some(FileRange::new(0, half)))
.await
.unwrap();
assert_eq!(first_half, &file.data[..half as usize]);
let second_half = client
.get_file_data(&file.file_hash, Some(FileRange::new(half, file_size)))
.await
.unwrap();
assert_eq!(second_half, &file.data[half as usize..]);
let truncated = client
.get_file_data(&file.file_hash, Some(FileRange::new(half, file_size + 1000)))
.await
.unwrap();
assert_eq!(truncated, &file.data[half as usize..]);
let result = client
.get_file_data(&file.file_hash, Some(FileRange::new(file_size + 100, file_size + 1000)))
.await;
assert!(matches!(result.unwrap_err(), ClientError::InvalidRange));
let result = client
.get_file_data(&file.file_hash, Some(FileRange::new(file_size, file_size + 100)))
.await;
assert!(matches!(result.unwrap_err(), ClientError::InvalidRange));
}
pub async fn test_get_file_size(client: Arc<dyn DirectAccessClient>) {
let file = client.upload_random_file(&[(1, (0, 4))], 2048).await.unwrap();
let size = client.get_file_size(&file.file_hash).await.unwrap();
assert_eq!(size as usize, file.data.len());
}
pub async fn test_global_dedup(client: Arc<dyn DirectAccessClient>) {
use std::io::Cursor;
use tempfile::TempDir;
use xet_core_structures::metadata_shard::shard_format::test_routines::gen_random_shard_with_xorb_references;
use xet_core_structures::metadata_shard::utils::parse_shard_filename;
use xet_core_structures::metadata_shard::{MDBShardFile, MDBShardInfo};
let tmp_dir = TempDir::new().unwrap();
let shard_dir_1 = tmp_dir.path().join("shard_1");
std::fs::create_dir_all(&shard_dir_1).unwrap();
let shard_dir_2 = tmp_dir.path().join("shard_2");
std::fs::create_dir_all(&shard_dir_2).unwrap();
let shard_in = gen_random_shard_with_xorb_references(0, &[16; 8], &[2; 20], true, true).unwrap();
let new_shard_path = shard_in.write_to_directory(&shard_dir_1, None).unwrap();
let shard_hash = parse_shard_filename(&new_shard_path).unwrap();
let permit = client.acquire_upload_permit().await.unwrap();
client
.upload_shard(std::fs::read(&new_shard_path).unwrap().into(), permit)
.await
.unwrap();
let dedup_hashes =
MDBShardInfo::filter_cas_chunks_for_global_dedup(&mut std::fs::File::open(&new_shard_path).unwrap()).unwrap();
assert_ne!(dedup_hashes.len(), 0);
let new_shard = client
.query_for_global_dedup_shard("default", &dedup_hashes[0])
.await
.unwrap()
.unwrap();
let sf = MDBShardFile::write_out_from_reader(shard_dir_2.clone(), &mut Cursor::new(&new_shard)).unwrap();
let returned_dedup_hashes = MDBShardInfo::filter_cas_chunks_for_global_dedup(&mut Cursor::new(&new_shard)).unwrap();
for hash in &dedup_hashes {
assert!(returned_dedup_hashes.contains(hash));
}
assert!(sf.path.exists());
let _ = shard_hash; }
pub async fn test_global_dedup_shard_expiration_functionality<Fut>(factory: impl Fn() -> Fut)
where
Fut: Future<Output = Arc<dyn DirectAccessClient>>,
{
test_global_dedup_shard_expiration_strips_file_data(factory().await).await;
test_global_dedup_shard_expiration_sets_expiry(factory().await).await;
test_global_dedup_shard_expiration_rounds_up_subsecond(factory().await).await;
test_global_dedup_shard_always_returned(factory().await).await;
test_global_dedup_shard_no_expiration_returns_full(factory().await).await;
}
async fn upload_shard_and_get_dedup_hashes(client: &Arc<dyn DirectAccessClient>) -> Vec<MerkleHash> {
use xet_core_structures::metadata_shard::MDBShardInfo;
use xet_core_structures::metadata_shard::shard_format::test_routines::gen_random_shard_with_xorb_references;
let tmp_dir = tempfile::TempDir::new().unwrap();
let shard_dir = tmp_dir.path().join("shard");
std::fs::create_dir_all(&shard_dir).unwrap();
let shard_in = gen_random_shard_with_xorb_references(0, &[16; 8], &[2; 20], true, true).unwrap();
let shard_path = shard_in.write_to_directory(&shard_dir, None).unwrap();
let permit = client.acquire_upload_permit().await.unwrap();
client
.upload_shard(std::fs::read(&shard_path).unwrap().into(), permit)
.await
.unwrap();
let dedup_hashes =
MDBShardInfo::filter_cas_chunks_for_global_dedup(&mut std::fs::File::open(&shard_path).unwrap()).unwrap();
assert_ne!(dedup_hashes.len(), 0);
dedup_hashes
}
async fn test_global_dedup_shard_expiration_strips_file_data(client: Arc<dyn DirectAccessClient>) {
use std::io::Cursor;
use xet_core_structures::metadata_shard::streaming_shard::MDBMinimalShard;
client.set_global_dedup_shard_expiration(Some(tokio::time::Duration::from_secs(300)));
let dedup_hashes = upload_shard_and_get_dedup_hashes(&client).await;
let shard_bytes = client
.query_for_global_dedup_shard("default", &dedup_hashes[0])
.await
.unwrap()
.unwrap();
let mut reader = Cursor::new(&shard_bytes);
let minimal_shard = MDBMinimalShard::from_reader(&mut reader, true, true).unwrap();
assert_eq!(minimal_shard.num_files(), 0);
assert_ne!(minimal_shard.num_xorb(), 0);
let returned_dedup_hashes = minimal_shard.global_dedup_eligible_chunks();
for hash in &dedup_hashes {
assert!(returned_dedup_hashes.contains(hash));
}
}
async fn test_global_dedup_shard_expiration_sets_expiry(client: Arc<dyn DirectAccessClient>) {
use std::io::Cursor;
use xet_core_structures::metadata_shard::MDBShardInfo;
client.set_global_dedup_shard_expiration(Some(tokio::time::Duration::from_secs(300)));
let dedup_hashes = upload_shard_and_get_dedup_hashes(&client).await;
let shard_bytes = client
.query_for_global_dedup_shard("default", &dedup_hashes[0])
.await
.unwrap()
.unwrap();
let mut reader = Cursor::new(&shard_bytes);
let shard_info = MDBShardInfo::load_from_reader(&mut reader).unwrap();
let now_epoch = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
assert_ne!(shard_info.metadata.shard_key_expiry, 0);
assert!(shard_info.metadata.shard_key_expiry > now_epoch);
assert!(shard_info.metadata.shard_key_expiry <= now_epoch + 300 + 5);
}
async fn test_global_dedup_shard_expiration_rounds_up_subsecond(client: Arc<dyn DirectAccessClient>) {
client.set_global_dedup_shard_expiration(Some(tokio::time::Duration::from_millis(1)));
let dedup_hashes = upload_shard_and_get_dedup_hashes(&client).await;
let now_epoch = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
let shard_bytes = client
.query_for_global_dedup_shard("default", &dedup_hashes[0])
.await
.unwrap()
.unwrap();
let shard_info =
xet_core_structures::metadata_shard::MDBShardInfo::load_from_reader(&mut std::io::Cursor::new(&shard_bytes))
.unwrap();
assert!(shard_info.metadata.shard_key_expiry > now_epoch);
assert!(shard_info.metadata.shard_key_expiry <= now_epoch + 3);
let minimal_shard = xet_core_structures::metadata_shard::streaming_shard::MDBMinimalShard::from_reader(
&mut std::io::Cursor::new(&shard_bytes),
true,
true,
)
.unwrap();
assert_eq!(minimal_shard.num_files(), 0);
assert_ne!(minimal_shard.num_xorb(), 0);
}
async fn test_global_dedup_shard_always_returned(client: Arc<dyn DirectAccessClient>) {
use std::io::Cursor;
use xet_core_structures::metadata_shard::MDBShardInfo;
client.set_global_dedup_shard_expiration(Some(tokio::time::Duration::from_secs(5)));
let dedup_hashes = upload_shard_and_get_dedup_hashes(&client).await;
let result = client.query_for_global_dedup_shard("default", &dedup_hashes[0]).await.unwrap();
assert!(result.is_some());
let shard_bytes = client
.query_for_global_dedup_shard("default", &dedup_hashes[0])
.await
.unwrap()
.unwrap();
let now_epoch = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
let shard_info = MDBShardInfo::load_from_reader(&mut Cursor::new(&shard_bytes)).unwrap();
assert_ne!(shard_info.metadata.shard_key_expiry, 0);
assert!(shard_info.metadata.shard_key_expiry >= now_epoch + 4);
assert!(shard_info.metadata.shard_key_expiry <= now_epoch + 6);
}
async fn test_global_dedup_shard_no_expiration_returns_full(client: Arc<dyn DirectAccessClient>) {
use std::io::Cursor;
use xet_core_structures::metadata_shard::streaming_shard::MDBMinimalShard;
let dedup_hashes = upload_shard_and_get_dedup_hashes(&client).await;
let shard_bytes = client
.query_for_global_dedup_shard("default", &dedup_hashes[0])
.await
.unwrap()
.unwrap();
let mut reader = Cursor::new(&shard_bytes);
let minimal_shard = MDBMinimalShard::from_reader(&mut reader, true, true).unwrap();
assert_ne!(minimal_shard.num_files(), 0);
assert_ne!(minimal_shard.num_xorb(), 0);
}
pub async fn test_global_dedup_shard_expiration_stress<Fut>(factory: impl Fn() -> Fut)
where
Fut: Future<Output = Arc<dyn DirectAccessClient>>,
{
let client = factory().await;
let dedup_hashes = upload_shard_and_get_dedup_hashes(&client).await;
let first_hash = dedup_hashes[0];
let mut tasks = tokio::task::JoinSet::new();
let start_time = std::time::Instant::now();
for worker_id in 0..12usize {
let client = client.clone();
tasks.spawn(async move {
for iteration in 0..50usize {
match (worker_id + iteration) % 3 {
0 => client.set_global_dedup_shard_expiration(None),
1 => client.set_global_dedup_shard_expiration(Some(tokio::time::Duration::from_millis(1))),
_ => client.set_global_dedup_shard_expiration(Some(tokio::time::Duration::from_secs(2))),
}
let shard_bytes = client
.query_for_global_dedup_shard("default", &first_hash)
.await
.unwrap()
.unwrap();
let minimal_shard = xet_core_structures::metadata_shard::streaming_shard::MDBMinimalShard::from_reader(
&mut std::io::Cursor::new(&shard_bytes),
true,
true,
)
.unwrap();
assert_ne!(minimal_shard.num_xorb(), 0);
}
});
}
while let Some(result) = tasks.join_next().await {
result.unwrap();
}
assert!(start_time.elapsed() < std::time::Duration::from_secs(10));
}
pub async fn test_url_expiration_functionality<Fut>(factory: impl Fn() -> Fut)
where
Fut: std::future::Future<Output = Arc<dyn DirectAccessClient>>,
{
test_url_expiration_within_window(factory().await).await;
test_url_expiration_after_window(factory().await).await;
test_url_expiration_default_infinite(factory().await).await;
test_url_expiration_exact_boundary(factory().await).await;
}
async fn test_url_expiration_within_window(client: Arc<dyn DirectAccessClient>) {
use tokio::time::Duration;
use crate::cas_types::HexMerkleHash;
client.set_fetch_term_url_expiration(Duration::from_secs(60));
let file = client.upload_random_file(&[(1, (0, 3))], 2048).await.unwrap();
let reconstruction = client.get_reconstruction_v1(&file.file_hash, None).await.unwrap().unwrap();
let xorb_hash = file.terms[0].xorb_hash;
let hex_hash: HexMerkleHash = xorb_hash.into();
let fetch_info = reconstruction.fetch_info.get(&hex_hash).unwrap().first().unwrap().clone();
tokio::time::advance(Duration::from_secs(30)).await;
let result = client.fetch_term_data(xorb_hash, fetch_info).await;
assert!(result.is_ok(), "URL should be valid within expiration window");
}
async fn test_url_expiration_after_window(client: Arc<dyn DirectAccessClient>) {
use tokio::time::Duration;
use crate::cas_types::HexMerkleHash;
client.set_fetch_term_url_expiration(Duration::from_secs(60));
let file = client.upload_random_file(&[(1, (0, 3))], 2048).await.unwrap();
let reconstruction = client.get_reconstruction_v1(&file.file_hash, None).await.unwrap().unwrap();
let xorb_hash = file.terms[0].xorb_hash;
let hex_hash: HexMerkleHash = xorb_hash.into();
let fetch_info = reconstruction.fetch_info.get(&hex_hash).unwrap().first().unwrap().clone();
tokio::time::advance(Duration::from_secs(61)).await;
let result = client.fetch_term_data(xorb_hash, fetch_info).await;
assert!(result.is_err(), "URL should be expired after expiration window");
assert!(matches!(result.unwrap_err(), ClientError::PresignedUrlExpirationError));
}
async fn test_url_expiration_default_infinite(client: Arc<dyn DirectAccessClient>) {
use tokio::time::Duration;
use crate::cas_types::HexMerkleHash;
let file = client.upload_random_file(&[(1, (0, 3))], 2048).await.unwrap();
let reconstruction = client.get_reconstruction_v1(&file.file_hash, None).await.unwrap().unwrap();
let xorb_hash = file.terms[0].xorb_hash;
let hex_hash: HexMerkleHash = xorb_hash.into();
let fetch_info = reconstruction.fetch_info.get(&hex_hash).unwrap().first().unwrap().clone();
tokio::time::advance(Duration::from_secs(365 * 24 * 60 * 60)).await;
let result = client.fetch_term_data(xorb_hash, fetch_info).await;
assert!(result.is_ok(), "URL should not expire with default infinite expiration");
}
async fn test_url_expiration_exact_boundary(client: Arc<dyn DirectAccessClient>) {
use tokio::time::Duration;
use crate::cas_types::HexMerkleHash;
client.set_fetch_term_url_expiration(Duration::from_secs(60));
let file = client.upload_random_file(&[(1, (0, 3))], 2048).await.unwrap();
let reconstruction = client.get_reconstruction_v1(&file.file_hash, None).await.unwrap().unwrap();
let xorb_hash = file.terms[0].xorb_hash;
let hex_hash: HexMerkleHash = xorb_hash.into();
let fetch_info = reconstruction.fetch_info.get(&hex_hash).unwrap().first().unwrap().clone();
tokio::time::advance(Duration::from_secs(59)).await;
let result = client.fetch_term_data(xorb_hash, fetch_info.clone()).await;
assert!(result.is_ok(), "URL should be valid inside expiration boundary");
tokio::time::advance(Duration::from_secs(2)).await;
let result = client.fetch_term_data(xorb_hash, fetch_info).await;
assert!(result.is_err(), "URL should be expired past boundary");
assert!(matches!(result.unwrap_err(), ClientError::PresignedUrlExpirationError));
}
pub async fn test_api_delay_functionality<Fut>(factory: impl Fn() -> Fut)
where
Fut: std::future::Future<Output = Arc<dyn DirectAccessClient>>,
{
test_api_delay_disabled_by_default(factory().await).await;
test_api_delay_fixed_delay(factory().await).await;
test_api_delay_range(factory().await).await;
test_api_delay_can_be_disabled(factory().await).await;
}
async fn test_api_delay_disabled_by_default(client: Arc<dyn DirectAccessClient>) {
use tokio::time::{Duration, Instant};
let file = client.upload_random_file(&[(1, (0, 3))], 2048).await.unwrap();
let start = Instant::now();
let _result = client.get_file_reconstruction_info(&file.file_hash).await.unwrap();
let elapsed = start.elapsed();
assert!(elapsed < Duration::from_millis(10), "No delay should be applied by default");
}
async fn test_api_delay_fixed_delay(client: Arc<dyn DirectAccessClient>) {
use tokio::time::Duration;
let delay = Duration::from_millis(100);
client.set_api_delay_range(Some(delay..delay));
let file = client.upload_random_file(&[(1, (0, 3))], 2048).await.unwrap();
let start = tokio::time::Instant::now();
let _result = client.get_file_reconstruction_info(&file.file_hash).await.unwrap();
let elapsed = start.elapsed();
assert!(elapsed >= Duration::from_millis(100), "Fixed delay should be applied: elapsed={elapsed:?}");
assert!(
elapsed < Duration::from_millis(150),
"Delay should not be much more than configured: elapsed={elapsed:?}"
);
}
async fn test_api_delay_range(client: Arc<dyn DirectAccessClient>) {
use tokio::time::Duration;
client.set_api_delay_range(Some(Duration::from_millis(50)..Duration::from_millis(150)));
let file = client.upload_random_file(&[(1, (0, 3))], 2048).await.unwrap();
for _ in 0..5 {
let start = tokio::time::Instant::now();
let _result = client.get_file_reconstruction_info(&file.file_hash).await.unwrap();
let elapsed = start.elapsed();
assert!(elapsed >= Duration::from_millis(50), "Delay should be at least 50ms: elapsed={elapsed:?}");
assert!(elapsed < Duration::from_millis(200), "Delay should be at most ~150ms: elapsed={elapsed:?}");
}
}
async fn test_api_delay_can_be_disabled(client: Arc<dyn DirectAccessClient>) {
use tokio::time::Duration;
client.set_api_delay_range(Some(Duration::from_millis(100)..Duration::from_millis(100)));
let file = client.upload_random_file(&[(1, (0, 3))], 2048).await.unwrap();
let start = tokio::time::Instant::now();
let _result = client.get_file_reconstruction_info(&file.file_hash).await.unwrap();
let elapsed = start.elapsed();
assert!(elapsed >= Duration::from_millis(100), "Delay should be applied");
client.set_api_delay_range(None);
let start = tokio::time::Instant::now();
let _result = client.get_file_reconstruction_info(&file.file_hash).await.unwrap();
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_millis(10),
"Delay should not be applied after disabling: elapsed={elapsed:?}"
);
}
async fn test_v2_reconstruction_basic(client: Arc<dyn DirectAccessClient>) {
let term_spec = &[(1, (0, 5))];
let file = client.upload_random_file(term_spec, 2048).await.unwrap();
let response = client.get_reconstruction_v2(&file.file_hash, None).await.unwrap().unwrap();
assert!(!response.terms.is_empty());
assert!(!response.xorbs.is_empty());
assert_eq!(response.offset_into_first_range, 0);
for term in &response.terms {
let xorb_descriptor = response.xorbs.get(&term.hash).expect("xorb descriptor missing for term");
assert!(!xorb_descriptor.is_empty());
for fetch in xorb_descriptor {
assert!(!fetch.url.is_empty());
assert!(!fetch.ranges.is_empty());
for range in &fetch.ranges {
assert!(range.bytes.start < range.bytes.end);
assert!(range.chunks.start < range.chunks.end);
}
}
}
}
async fn test_v2_reconstruction_ranges(client: Arc<dyn DirectAccessClient>) {
let term_spec = &[(1, (0, 3)), (2, (0, 3)), (1, (3, 6))];
let file = client.upload_random_file(term_spec, 2048).await.unwrap();
let file_size = file.data.len() as u64;
let range = FileRange::new(file_size / 4, file_size * 3 / 4);
let response = client
.get_reconstruction_v2(&file.file_hash, Some(range))
.await
.unwrap()
.unwrap();
assert!(!response.terms.is_empty());
assert!(!response.xorbs.is_empty());
let out_of_range = FileRange::new(file_size + 100, file_size + 200);
let none_result = client.get_reconstruction_v2(&file.file_hash, Some(out_of_range)).await.unwrap();
assert!(none_result.is_none());
}
async fn test_v2_reconstruction_matches_v1(client: Arc<dyn DirectAccessClient>) {
let term_spec = &[(1, (0, 3)), (2, (0, 2)), (1, (3, 5))];
let file = client.upload_random_file(term_spec, 2048).await.unwrap();
let v1 = client.get_reconstruction_v1(&file.file_hash, None).await.unwrap().unwrap();
let v2 = client.get_reconstruction_v2(&file.file_hash, None).await.unwrap().unwrap();
assert_eq!(v1.offset_into_first_range, v2.offset_into_first_range);
assert_eq!(v1.terms.len(), v2.terms.len());
for (t1, t2) in v1.terms.iter().zip(v2.terms.iter()) {
assert_eq!(t1.hash, t2.hash);
assert_eq!(t1.range, t2.range);
assert_eq!(t1.unpacked_length, t2.unpacked_length);
}
let mut v1_xorb_hashes: Vec<_> = v1.fetch_info.keys().map(|h| h.to_string()).collect();
let mut v2_xorb_hashes: Vec<_> = v2.xorbs.keys().map(|h| h.to_string()).collect();
v1_xorb_hashes.sort();
v2_xorb_hashes.sort();
assert_eq!(v1_xorb_hashes, v2_xorb_hashes);
let file_size = file.data.len() as u64;
let range = FileRange::new(file_size / 4, file_size * 3 / 4);
let v1r = client
.get_reconstruction_v1(&file.file_hash, Some(range))
.await
.unwrap()
.unwrap();
let v2r = client
.get_reconstruction_v2(&file.file_hash, Some(range))
.await
.unwrap()
.unwrap();
assert_eq!(v1r.offset_into_first_range, v2r.offset_into_first_range);
assert_eq!(v1r.terms.len(), v2r.terms.len());
}
async fn test_v2_max_ranges_per_fetch(client: Arc<dyn DirectAccessClient>) {
let term_spec = &[
(1, (0, 2)),
(2, (0, 1)),
(1, (2, 4)),
(2, (1, 2)),
(1, (4, 6)),
(2, (2, 3)),
(1, (6, 8)),
];
let file = client.upload_random_file(term_spec, 512).await.unwrap();
let response_unlimited = client.get_reconstruction_v2(&file.file_hash, None).await.unwrap().unwrap();
let xorb1_hash = &file.terms[0].xorb_hash;
let hex_hash: crate::cas_types::HexMerkleHash = (*xorb1_hash).into();
let desc_unlimited = response_unlimited.xorbs.get(&hex_hash).unwrap();
client.set_max_ranges_per_fetch(2);
let response_limited = client.get_reconstruction_v2(&file.file_hash, None).await.unwrap().unwrap();
let desc_limited = response_limited.xorbs.get(&hex_hash).unwrap();
assert!(
desc_limited.len() >= desc_unlimited.len(),
"Limited ({}) should have at least as many fetch entries as unlimited ({})",
desc_limited.len(),
desc_unlimited.len()
);
for fetch in desc_limited {
assert!(fetch.ranges.len() <= 2, "Expected at most 2 ranges per fetch, got {}", fetch.ranges.len());
}
let total_unlimited: usize = desc_unlimited.iter().map(|f| f.ranges.len()).sum();
let total_limited: usize = desc_limited.iter().map(|f| f.ranges.len()).sum();
assert_eq!(total_unlimited, total_limited, "Total ranges should be preserved");
client.set_max_ranges_per_fetch(usize::MAX);
}
async fn test_v2_url_encoding(client: Arc<dyn DirectAccessClient>) {
use base64::Engine;
use base64::engine::general_purpose::URL_SAFE_NO_PAD;
let term_spec = &[(1, (0, 3))];
let file = client.upload_random_file(term_spec, 2048).await.unwrap();
let response = client.get_reconstruction_v2(&file.file_hash, None).await.unwrap().unwrap();
for fetch_entries in response.xorbs.values() {
for fetch in fetch_entries {
assert!(!fetch.url.is_empty(), "URL should not be empty");
if fetch.url.starts_with("http://") || fetch.url.starts_with("https://") {
assert!(fetch.url.contains("/fetch_term"), "HTTP URL should contain /fetch_term: {}", fetch.url);
} else {
let decoded = URL_SAFE_NO_PAD.decode(&fetch.url);
assert!(decoded.is_ok(), "URL should be valid base64: {}", fetch.url);
let payload = String::from_utf8(decoded.unwrap()).unwrap();
let parts: Vec<&str> = payload.splitn(3, ':').collect();
assert_eq!(parts.len(), 3, "Payload should have 3 colon-separated parts");
let hash = xet_core_structures::merklehash::MerkleHash::from_hex(parts[0]);
assert!(hash.is_ok(), "Hash part should be valid hex");
let ts: std::result::Result<u64, _> = parts[1].parse();
assert!(ts.is_ok(), "Timestamp should be a valid u64");
for range_str in parts[2].split(',').filter(|s| !s.is_empty()) {
let range_parts: Vec<&str> = range_str.split('-').collect();
assert_eq!(range_parts.len(), 2, "Each range should be start-end");
assert!(range_parts[0].parse::<u64>().is_ok());
assert!(range_parts[1].parse::<u64>().is_ok());
}
}
}
}
}