use std::net::TcpListener as StdTcpListener;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use http::header::{self, HeaderMap, HeaderValue};
#[cfg(unix)]
use tempfile::TempDir;
use tokio::sync::oneshot;
use super::super::RemoteClient;
use super::super::interface::Client;
use super::super::progress_tracked_streams::ProgressCallback;
use super::local_server::{LocalServer, ServerLatencyProfile};
use super::network_simulation::{NetworkProfile, NetworkSimulationProxy};
#[cfg(unix)]
use super::socket_proxy::UnixSocketProxy;
use super::{DeletionControlableClient, DirectAccessClient, LocalClient, MemoryClient, RemoteSimulationClient};
use crate::error::Result;
pub struct LocalTestServerBuilder {
in_memory: bool,
disk_location: Option<PathBuf>,
socket_path: Option<PathBuf>,
ephemeral_socket: bool,
ephemeral_disk: bool,
client: Option<Arc<dyn DirectAccessClient>>,
server_latency_profile: Option<ServerLatencyProfile>,
network_profile: Option<NetworkProfile>,
}
#[allow(dead_code)]
impl LocalTestServerBuilder {
pub fn new() -> Self {
Self {
in_memory: true,
disk_location: None,
socket_path: None,
ephemeral_socket: false,
ephemeral_disk: false,
client: None,
server_latency_profile: None,
network_profile: None,
}
}
#[cfg(unix)]
pub fn with_socket(mut self, path: PathBuf) -> Self {
self.socket_path = Some(path);
self.ephemeral_socket = false;
self
}
pub fn with_disk_location(mut self, path: impl Into<PathBuf>) -> Self {
self.disk_location = Some(path.into());
self.in_memory = false;
self.ephemeral_disk = false;
self
}
pub fn with_ephemeral_disk(mut self) -> Self {
self.ephemeral_disk = true;
self.in_memory = false;
self.disk_location = None;
self
}
#[cfg(unix)]
pub fn with_ephemeral_socket(mut self) -> Self {
self.ephemeral_socket = true;
self.socket_path = None;
self
}
pub fn with_client(mut self, client: Arc<dyn DirectAccessClient>) -> Self {
self.client = Some(client);
self
}
pub fn with_server_latency_profile(mut self, profile: ServerLatencyProfile) -> Self {
self.server_latency_profile = Some(profile);
self
}
pub fn with_network_profile(mut self, provider: NetworkProfile) -> Self {
self.network_profile = Some(provider);
self
}
pub async fn start(self) -> LocalTestServer {
#[cfg(unix)]
let (socket_path, ephemeral_tempdir) = if self.ephemeral_socket {
let tempdir = TempDir::new().expect("Failed to create temporary directory for ephemeral socket");
let socket_path = tempdir.path().join("socket.sock");
(Some(socket_path), Some(tempdir))
} else {
(self.socket_path, None)
};
#[cfg(not(unix))]
let _socket_path = if self.ephemeral_socket { None } else { self.socket_path };
let (client, deletion_client): (Arc<dyn DirectAccessClient>, Option<Arc<dyn DeletionControlableClient>>) =
if let Some(client) = self.client {
(client, None)
} else if self.in_memory {
(MemoryClient::new(), None)
} else if self.ephemeral_disk {
let lc = LocalClient::temporary()
.await
.expect("Failed to create LocalClient with temporary directory");
let dc: Arc<dyn DeletionControlableClient> = lc.clone();
(lc, Some(dc))
} else {
let disk_path = self.disk_location.unwrap_or_else(|| {
panic!("with_disk_location must be called when in_memory is false and no client is provided")
});
let lc = LocalClient::new(&disk_path).await.expect("Failed to create LocalClient");
let dc: Arc<dyn DeletionControlableClient> = lc.clone();
(lc, Some(dc))
};
let port = LocalTestServer::find_available_port();
let host = "127.0.0.1".to_string();
let tcp_endpoint = format!("http://{}:{}", host, port);
let server_host_port = format!("{}:{}", host, port);
let dynamic_provider = self.network_profile.clone();
let (proxy_guard, client_endpoint): (Option<Arc<NetworkSimulationProxy>>, String) =
if let Some(provider) = dynamic_provider {
let (proxy, listen_addr) = NetworkSimulationProxy::new(server_host_port.clone(), provider)
.await
.expect("Failed to create network simulation proxy");
Arc::clone(&proxy).run_refill_loop();
tokio::spawn({
let proxy = Arc::clone(&proxy);
async move {
if let Err(e) = proxy.run_accept_loop().await {
tracing::error!(error = %e, "bandwidth proxy accept loop failed");
}
}
});
(Some(proxy), format!("http://{}", listen_addr))
} else {
(None, tcp_endpoint.clone())
};
let server = LocalServer::from_client(client.clone(), deletion_client.clone(), host, port);
let (shutdown_tx, shutdown_rx) = oneshot::channel();
tokio::spawn(async move {
let _ = server.run_until_stopped(shutdown_rx).await;
});
tokio::time::sleep(Duration::from_millis(50)).await;
let mut headers = HeaderMap::new();
headers.insert(header::USER_AGENT, HeaderValue::from_static("test-agent"));
let custom_headers = Some(Arc::new(headers));
#[cfg(unix)]
let (remote_client, socket_proxy) = {
if let Some(socket_path) = socket_path {
let tcp_addr = client_endpoint.strip_prefix("http://").unwrap_or(&client_endpoint).to_string();
let proxy = UnixSocketProxy::new(socket_path.clone(), tcp_addr)
.await
.expect("Failed to create Unix socket proxy");
tokio::time::sleep(Duration::from_millis(500)).await;
let socket_path_str = socket_path.to_string_lossy().to_string();
let client = RemoteClient::new_with_socket(
&client_endpoint,
&None,
"test-session",
false,
Some(&socket_path_str),
custom_headers.clone(),
);
(client, Some(proxy))
} else {
let client = RemoteClient::new(&client_endpoint, &None, "test-session", false, custom_headers.clone());
(client, None)
}
};
#[cfg(not(unix))]
let remote_client = RemoteClient::new(&client_endpoint, &None, "test-session", false, custom_headers.clone());
let remote_simulation_client = Arc::new(RemoteSimulationClient::new(remote_client));
#[cfg(unix)]
let server = LocalTestServer {
http_endpoint: client_endpoint,
server_shutdown_tx: Some(shutdown_tx),
remote_simulation_client,
client,
deletion_client,
socket_proxy,
_ephemeral_socket_tempdir: ephemeral_tempdir,
network_simulation_proxy: proxy_guard.clone(),
};
#[cfg(not(unix))]
let server = LocalTestServer {
http_endpoint: client_endpoint,
server_shutdown_tx: Some(shutdown_tx),
remote_simulation_client,
client,
deletion_client,
network_simulation_proxy: proxy_guard,
};
if let Some(profile) = self.server_latency_profile {
server
.remote_simulation_client()
.simulation_set_latency_profile(profile)
.await
.expect("Failed to set server latency profile");
}
server
}
}
impl Default for LocalTestServerBuilder {
fn default() -> Self {
Self::new()
}
}
pub struct LocalTestServer {
http_endpoint: String,
server_shutdown_tx: Option<oneshot::Sender<()>>,
remote_simulation_client: Arc<RemoteSimulationClient>,
client: Arc<dyn DirectAccessClient>,
deletion_client: Option<Arc<dyn DeletionControlableClient>>,
network_simulation_proxy: Option<Arc<NetworkSimulationProxy>>,
#[cfg(unix)]
socket_proxy: Option<UnixSocketProxy>,
#[cfg(unix)]
_ephemeral_socket_tempdir: Option<TempDir>,
}
impl LocalTestServer {
pub fn http_endpoint(&self) -> &str {
&self.http_endpoint
}
pub fn total_upload_bytes_possible(&self) -> u64 {
self.network_simulation_proxy
.as_ref()
.map(|p| p.total_upload_bytes_possible())
.unwrap_or(0)
}
pub fn total_download_bytes_possible(&self) -> u64 {
self.network_simulation_proxy
.as_ref()
.map(|p| p.total_download_bytes_possible())
.unwrap_or(0)
}
pub async fn current_bandwidth(&self) -> Option<u64> {
match &self.network_simulation_proxy {
Some(proxy) => proxy.current_bandwidth().await,
None => None,
}
}
pub async fn current_latency_ms(&self) -> f64 {
match &self.network_simulation_proxy {
Some(proxy) => proxy.current_latency_ms().await,
None => 0.0,
}
}
#[cfg(unix)]
pub fn socket_endpoint(&self) -> Option<PathBuf> {
self.socket_proxy.as_ref().map(|proxy| proxy.socket_path().clone())
}
pub fn remote_client(&self) -> Arc<dyn Client> {
self.remote_simulation_client.clone() as Arc<dyn Client>
}
pub fn remote_simulation_client(&self) -> &Arc<RemoteSimulationClient> {
&self.remote_simulation_client
}
pub fn client(&self) -> &Arc<dyn DirectAccessClient> {
&self.client
}
pub async fn set_server_latency_profile(&self, profile: ServerLatencyProfile) -> Result<()> {
self.remote_simulation_client().simulation_set_latency_profile(profile).await
}
pub async fn verify_integrity(&self) -> Result<()> {
match &self.deletion_client {
Some(dc) => dc.verify_integrity().await,
None => Ok(()),
}
}
pub async fn verify_all_reachable(&self) -> Result<()> {
match &self.deletion_client {
Some(dc) => dc.verify_all_reachable().await,
None => Ok(()),
}
}
fn find_available_port() -> u16 {
StdTcpListener::bind("127.0.0.1:0").unwrap().local_addr().unwrap().port()
}
}
#[async_trait]
impl Client for LocalTestServer {
async fn get_file_reconstruction_info(
&self,
file_hash: &xet_core_structures::merklehash::MerkleHash,
) -> Result<
Option<(
xet_core_structures::metadata_shard::file_structs::MDBFileInfo,
Option<xet_core_structures::merklehash::MerkleHash>,
)>,
> {
self.remote_simulation_client.get_file_reconstruction_info(file_hash).await
}
async fn get_reconstruction(
&self,
file_id: &xet_core_structures::merklehash::MerkleHash,
bytes_range: Option<crate::cas_types::FileRange>,
) -> Result<Option<crate::cas_types::QueryReconstructionResponseV2>> {
self.remote_simulation_client.get_reconstruction(file_id, bytes_range).await
}
async fn batch_get_reconstruction(
&self,
file_ids: &[xet_core_structures::merklehash::MerkleHash],
) -> Result<crate::cas_types::BatchQueryReconstructionResponse> {
self.remote_simulation_client.batch_get_reconstruction(file_ids).await
}
async fn acquire_download_permit(&self) -> Result<super::super::adaptive_concurrency::ConnectionPermit> {
self.remote_simulation_client.acquire_download_permit().await
}
async fn get_file_term_data(
&self,
url_info: Box<dyn super::super::interface::URLProvider>,
download_permit: super::super::adaptive_concurrency::ConnectionPermit,
progress_callback: Option<ProgressCallback>,
uncompressed_size_if_known: Option<usize>,
) -> Result<(bytes::Bytes, Vec<u32>)> {
self.remote_simulation_client
.get_file_term_data(url_info, download_permit, progress_callback, uncompressed_size_if_known)
.await
}
async fn query_for_global_dedup_shard(
&self,
prefix: &str,
chunk_hash: &xet_core_structures::merklehash::MerkleHash,
) -> Result<Option<bytes::Bytes>> {
self.remote_simulation_client
.query_for_global_dedup_shard(prefix, chunk_hash)
.await
}
async fn acquire_upload_permit(&self) -> Result<super::super::adaptive_concurrency::ConnectionPermit> {
self.remote_simulation_client.acquire_upload_permit().await
}
async fn upload_shard(
&self,
shard_data: bytes::Bytes,
upload_permit: super::super::adaptive_concurrency::ConnectionPermit,
) -> Result<bool> {
self.remote_simulation_client.upload_shard(shard_data, upload_permit).await
}
async fn upload_xorb(
&self,
prefix: &str,
serialized_xorb_object: xet_core_structures::xorb_object::SerializedXorbObject,
progress_callback: Option<ProgressCallback>,
upload_permit: super::super::adaptive_concurrency::ConnectionPermit,
) -> Result<u64> {
self.remote_simulation_client
.upload_xorb(prefix, serialized_xorb_object, progress_callback, upload_permit)
.await
}
}
#[async_trait]
impl DirectAccessClient for LocalTestServer {
fn set_fetch_term_url_expiration(&self, expiration: std::time::Duration) {
self.client.set_fetch_term_url_expiration(expiration);
}
fn set_global_dedup_shard_expiration(&self, expiration: Option<std::time::Duration>) {
self.client.set_global_dedup_shard_expiration(expiration);
}
fn set_api_delay_range(&self, delay_range: Option<std::ops::Range<std::time::Duration>>) {
self.client.set_api_delay_range(delay_range);
}
fn set_max_ranges_per_fetch(&self, max_ranges: usize) {
self.client.set_max_ranges_per_fetch(max_ranges);
}
fn disable_v2_reconstruction(&self, status_code: u16) {
self.client.disable_v2_reconstruction(status_code);
}
async fn get_reconstruction_v1(
&self,
file_id: &xet_core_structures::merklehash::MerkleHash,
bytes_range: Option<crate::cas_types::FileRange>,
) -> Result<Option<crate::cas_types::QueryReconstructionResponse>> {
self.client.get_reconstruction_v1(file_id, bytes_range).await
}
async fn get_reconstruction_v2(
&self,
file_id: &xet_core_structures::merklehash::MerkleHash,
bytes_range: Option<crate::cas_types::FileRange>,
) -> Result<Option<crate::cas_types::QueryReconstructionResponseV2>> {
self.client.get_reconstruction_v2(file_id, bytes_range).await
}
async fn apply_api_delay(&self) {
self.client.apply_api_delay().await;
}
async fn list_xorbs(&self) -> Result<Vec<xet_core_structures::merklehash::MerkleHash>> {
self.client.list_xorbs().await
}
async fn delete_xorb(&self, hash: &xet_core_structures::merklehash::MerkleHash) {
self.client.delete_xorb(hash).await;
}
async fn get_full_xorb(&self, hash: &xet_core_structures::merklehash::MerkleHash) -> Result<bytes::Bytes> {
self.client.get_full_xorb(hash).await
}
async fn get_xorb_ranges(
&self,
hash: &xet_core_structures::merklehash::MerkleHash,
chunk_ranges: Vec<(u32, u32)>,
) -> Result<Vec<bytes::Bytes>> {
self.client.get_xorb_ranges(hash, chunk_ranges).await
}
async fn xorb_length(&self, hash: &xet_core_structures::merklehash::MerkleHash) -> Result<u32> {
self.client.xorb_length(hash).await
}
async fn xorb_exists(&self, hash: &xet_core_structures::merklehash::MerkleHash) -> Result<bool> {
self.client.xorb_exists(hash).await
}
async fn xorb_footer(
&self,
hash: &xet_core_structures::merklehash::MerkleHash,
) -> Result<xet_core_structures::xorb_object::XorbObject> {
self.client.xorb_footer(hash).await
}
async fn get_file_size(&self, hash: &xet_core_structures::merklehash::MerkleHash) -> Result<u64> {
self.client.get_file_size(hash).await
}
async fn get_file_data(
&self,
hash: &xet_core_structures::merklehash::MerkleHash,
byte_range: Option<crate::cas_types::FileRange>,
) -> Result<bytes::Bytes> {
self.client.get_file_data(hash, byte_range).await
}
async fn get_xorb_raw_bytes(
&self,
hash: &xet_core_structures::merklehash::MerkleHash,
byte_range: Option<crate::cas_types::FileRange>,
) -> Result<bytes::Bytes> {
self.client.get_xorb_raw_bytes(hash, byte_range).await
}
async fn xorb_raw_length(&self, hash: &xet_core_structures::merklehash::MerkleHash) -> Result<u64> {
self.client.xorb_raw_length(hash).await
}
async fn fetch_term_data(
&self,
hash: xet_core_structures::merklehash::MerkleHash,
fetch_term: crate::cas_types::XorbReconstructionFetchInfo,
) -> Result<(bytes::Bytes, Vec<u32>)> {
self.client.fetch_term_data(hash, fetch_term).await
}
}
impl Drop for LocalTestServer {
fn drop(&mut self) {
if let Some(tx) = self.server_shutdown_tx.take() {
let _ = tx.send(());
}
}
}
#[cfg(test)]
mod tests {
use super::super::ClientTestingUtils;
use super::*;
use crate::cas_types::FileRange;
const CHUNK_SIZE: usize = 123;
async fn check_basic_correctness(server: &LocalTestServer) {
let file = server
.remote_client()
.upload_random_file(&[(1, (0, 5))], CHUNK_SIZE)
.await
.unwrap();
let local_data = server.client().get_file_data(&file.file_hash, None).await.unwrap();
assert_eq!(file.data, local_data);
let remote_recon = server
.remote_client()
.get_reconstruction(&file.file_hash, None)
.await
.unwrap()
.unwrap();
let local_recon = server
.client()
.get_reconstruction(&file.file_hash, None)
.await
.unwrap()
.unwrap();
assert_eq!(remote_recon.terms.len(), local_recon.terms.len());
assert_eq!(remote_recon.offset_into_first_range, local_recon.offset_into_first_range);
for (remote_term, local_term) in remote_recon.terms.iter().zip(local_recon.terms.iter()) {
assert_eq!(remote_term.hash, local_term.hash);
assert_eq!(remote_term.range, local_term.range);
}
let file_size = file.data.len() as u64;
let range = FileRange::new(file_size / 4, file_size * 3 / 4);
let range_recon = server
.remote_client()
.get_reconstruction(&file.file_hash, Some(range))
.await
.unwrap();
assert!(range_recon.is_some());
let term_spec = &[(1, (0, 3)), (2, (0, 2)), (1, (3, 5))];
let multi_file = server.client().upload_random_file(term_spec, CHUNK_SIZE).await.unwrap();
let multi_recon = server
.remote_client()
.get_reconstruction(&multi_file.file_hash, None)
.await
.unwrap()
.unwrap();
assert_eq!(multi_recon.terms.len(), 3);
let file_ids = vec![file.file_hash, multi_file.file_hash];
let batch_result = server.remote_client().batch_get_reconstruction(&file_ids).await.unwrap();
assert_eq!(batch_result.files.len(), 2);
let (remote_mdb, _) = server
.remote_client()
.get_file_reconstruction_info(&file.file_hash)
.await
.unwrap()
.unwrap();
let (local_mdb, _) = server
.client()
.get_file_reconstruction_info(&file.file_hash)
.await
.unwrap()
.unwrap();
assert_eq!(remote_mdb.file_size(), local_mdb.file_size());
let first_chunk_hash = file.terms[0].chunk_hashes[0];
let shard_result = server
.remote_client()
.query_for_global_dedup_shard("default", &first_chunk_hash)
.await
.unwrap();
let local_shard = server
.client()
.query_for_global_dedup_shard("default", &first_chunk_hash)
.await
.unwrap();
assert!(shard_result.is_some());
assert_eq!(shard_result.unwrap(), local_shard.unwrap());
let http_client = reqwest::Client::new();
for multi_range_fetches in remote_recon.xorbs.values() {
for mrf in multi_range_fetches {
assert!(mrf.url.starts_with("http://"));
assert!(mrf.url.contains("/fetch_term?term="));
let response = http_client.get(&mrf.url).send().await.unwrap();
assert!(response.status().is_success());
assert!(!response.bytes().await.unwrap().is_empty());
}
}
let first_mrf = &remote_recon.xorbs.values().next().unwrap()[0];
let data_1 = http_client.get(&first_mrf.url).send().await.unwrap().bytes().await.unwrap();
let data_2 = http_client.get(&first_mrf.url).send().await.unwrap().bytes().await.unwrap();
assert_eq!(data_1, data_2);
assert!(!data_1.is_empty());
}
async fn check_error_handling(server: &LocalTestServer) {
let http_client = reqwest::Client::new();
let fake_hash = xet_core_structures::merklehash::MerkleHash::from_hex(
"d760aaf4beb07581956e24c847c47f1abd2e419166aa68259035bc412232e9da",
)
.unwrap();
let result = server.remote_client().get_reconstruction(&fake_hash, None).await;
assert!(result.is_err() || result.unwrap().is_none());
let file_info = server.remote_client().get_file_reconstruction_info(&fake_hash).await;
assert!(file_info.is_err() || file_info.unwrap().is_none());
let invalid_term_url = format!("{}/v1/fetch_term?term=aW52YWxpZF9wYXRo", server.http_endpoint());
let response = http_client.get(&invalid_term_url).send().await.unwrap();
assert!(response.status().is_client_error() || response.status().is_server_error());
let malformed_url = format!("{}/v1/fetch_term?term=not-valid-base64!!!", server.http_endpoint());
let response = http_client.get(&malformed_url).send().await.unwrap();
assert_eq!(response.status(), reqwest::StatusCode::BAD_REQUEST);
}
async fn check_url_transformation(server: &LocalTestServer) {
let http_client = reqwest::Client::new();
let file1 = server.client().upload_random_file(&[(1, (0, 5))], CHUNK_SIZE).await.unwrap();
let term_spec = &[(1, (0, 3)), (2, (0, 2)), (1, (3, 5))];
let multi_file = server.client().upload_random_file(term_spec, CHUNK_SIZE).await.unwrap();
let recon1 = server
.remote_client()
.get_reconstruction(&file1.file_hash, None)
.await
.unwrap()
.unwrap();
for (hash, multi_range_fetches) in &recon1.xorbs {
for mrf in multi_range_fetches {
assert!(
mrf.url.starts_with("http://") || mrf.url.starts_with("https://"),
"URL for hash {} should be HTTP, got: {}",
hash,
mrf.url
);
assert!(mrf.url.contains("/fetch_term?term="));
assert!(!mrf.url.contains("\":"));
}
}
let multi_recon = server
.remote_client()
.get_reconstruction(&multi_file.file_hash, None)
.await
.unwrap()
.unwrap();
assert!(multi_recon.xorbs.len() >= 2);
for multi_range_fetches in multi_recon.xorbs.values() {
for mrf in multi_range_fetches {
assert!(mrf.url.starts_with("http://"));
}
}
let file_size = multi_file.data.len() as u64;
let range = FileRange::new(file_size / 4, file_size * 3 / 4);
let range_recon = server
.remote_client()
.get_reconstruction(&multi_file.file_hash, Some(range))
.await
.unwrap()
.unwrap();
for multi_range_fetches in range_recon.xorbs.values() {
for mrf in multi_range_fetches {
assert!(mrf.url.starts_with("http://"));
assert!(mrf.url.contains("/fetch_term?term="));
}
}
for term in &recon1.terms {
let multi_range_fetches = recon1.xorbs.get(&term.hash).unwrap();
for mrf in multi_range_fetches {
let response = http_client.get(&mrf.url).send().await.unwrap();
assert!(response.status().is_success());
assert!(!response.bytes().await.unwrap().is_empty());
}
}
}
async fn check_reconstruction_term_hashes_match(server: &LocalTestServer) {
let term_spec = &[(1, (0, 3)), (2, (0, 2)), (1, (3, 5))];
let file = server.client().upload_random_file(term_spec, CHUNK_SIZE).await.unwrap();
let recon = server
.remote_client()
.get_reconstruction(&file.file_hash, None)
.await
.unwrap()
.unwrap();
assert_eq!(recon.terms.len(), file.terms.len());
for (i, recon_term) in recon.terms.iter().enumerate() {
let expected_term = &file.terms[i];
assert_eq!(recon_term.hash.0, expected_term.xorb_hash, "Term {} XORB hash mismatch", i);
}
}
async fn check_downloaded_terms_match_expected_data(server: &LocalTestServer) {
let term_spec = &[(1, (0, 4)), (2, (0, 3))];
let file = server.client().upload_random_file(term_spec, CHUNK_SIZE).await.unwrap();
let recon = server
.remote_client()
.get_reconstruction(&file.file_hash, None)
.await
.unwrap()
.unwrap();
assert_eq!(recon.terms.len(), file.terms.len());
for (term_idx, recon_term) in recon.terms.iter().enumerate() {
let expected_term = &file.terms[term_idx];
assert_eq!(recon_term.hash.0, expected_term.xorb_hash);
let multi_range_fetches = recon.xorbs.get(&recon_term.hash).unwrap();
assert!(!multi_range_fetches.is_empty());
}
let retrieved_data = server.client().get_file_data(&file.file_hash, None).await.unwrap();
assert_eq!(retrieved_data, file.data);
for (term_idx, term) in file.terms.iter().enumerate() {
assert!(file.term_matches(term_idx, &term.data));
}
}
async fn check_complete_file_reconstruction(server: &LocalTestServer) {
let term_spec = &[(1, (0, 3)), (2, (0, 2)), (1, (3, 5))];
let file = server.client().upload_random_file(term_spec, CHUNK_SIZE).await.unwrap();
let mut reconstructed = Vec::new();
for term in &file.terms {
reconstructed.extend_from_slice(&term.data);
}
assert_eq!(reconstructed, file.data);
assert!(file.term_matches(0, &file.terms[0].data));
assert!(file.term_matches(1, &file.terms[1].data));
assert!(file.term_matches(2, &file.terms[2].data));
assert!(!file.term_matches(0, &file.terms[1].data));
}
async fn check_chunk_hashes_correctness(server: &LocalTestServer) {
let file = server.client().upload_random_file(&[(1, (0, 3))], CHUNK_SIZE).await.unwrap();
assert_eq!(file.terms.len(), 1);
assert_eq!(file.terms[0].chunk_hashes.len(), 3);
let xorb_hash = file.terms[0].xorb_hash;
let raw_xorb = file.xorbs.get(&xorb_hash).unwrap();
assert_eq!(raw_xorb.xorb_info.chunks.len(), 3);
for (i, chunk_hash) in file.terms[0].chunk_hashes.iter().enumerate() {
assert_eq!(*chunk_hash, raw_xorb.xorb_info.chunks[i].chunk_hash);
}
}
async fn post_set_config(server: &LocalTestServer, config: &str, value: &str) -> reqwest::StatusCode {
let http_client = reqwest::Client::new();
let url = format!(
"{}/simulation/set_config?config={}&value={}",
server.http_endpoint(),
urlencoding::encode(config),
urlencoding::encode(value),
);
http_client.post(&url).send().await.unwrap().status()
}
async fn check_simulation_set_config(server: &LocalTestServer) {
let http_client = reqwest::Client::new();
assert_eq!(post_set_config(server, "random_delay", "(10ms,100ms)").await, reqwest::StatusCode::OK);
assert_eq!(post_set_config(server, "RANDOM_DELAY", "(5ms,50ms)").await, reqwest::StatusCode::OK);
assert_eq!(post_set_config(server, "random_delay", "invalid").await, reqwest::StatusCode::BAD_REQUEST);
assert_eq!(post_set_config(server, "unknown_config", "test").await, reqwest::StatusCode::BAD_REQUEST);
let response = http_client
.post(format!("{}/simulation/set_config?value=test", server.http_endpoint()))
.send()
.await
.unwrap();
assert_eq!(response.status(), reqwest::StatusCode::BAD_REQUEST);
let response = http_client
.post(format!("{}/simulation/set_config?config=random_delay", server.http_endpoint()))
.send()
.await
.unwrap();
assert_eq!(response.status(), reqwest::StatusCode::BAD_REQUEST);
assert_eq!(post_set_config(server, "random_delay", "(0ms,0ms)").await, reqwest::StatusCode::OK);
assert_eq!(post_set_config(server, "global_dedup_shard_expiration", "300").await, reqwest::StatusCode::OK);
{
use std::io::Cursor;
use xet_core_structures::metadata_shard::MDBShardInfo;
use xet_core_structures::metadata_shard::streaming_shard::MDBMinimalShard;
let file = server.client().upload_random_file(&[(1, (0, 5))], CHUNK_SIZE).await.unwrap();
let first_chunk = file.terms[0].chunk_hashes[0];
let shard_bytes = server
.client()
.query_for_global_dedup_shard("default", &first_chunk)
.await
.unwrap()
.unwrap();
let minimal_shard = MDBMinimalShard::from_reader(&mut Cursor::new(&shard_bytes), true, true).unwrap();
assert_eq!(minimal_shard.num_files(), 0);
assert_ne!(minimal_shard.num_xorb(), 0);
let shard_info = MDBShardInfo::load_from_reader(&mut Cursor::new(&shard_bytes)).unwrap();
let now_epoch = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
assert_ne!(shard_info.metadata.shard_key_expiry, 0);
assert!(shard_info.metadata.shard_key_expiry >= now_epoch + 295);
assert!(shard_info.metadata.shard_key_expiry <= now_epoch + 305);
}
assert_eq!(post_set_config(server, "global_dedup_shard_expiration", "0").await, reqwest::StatusCode::OK);
assert_eq!(
post_set_config(server, "global_dedup_shard_expiration", "not_a_number").await,
reqwest::StatusCode::BAD_REQUEST
);
assert_eq!(post_set_config(server, "max_ranges_per_fetch", "10").await, reqwest::StatusCode::OK);
assert_eq!(post_set_config(server, "max_ranges_per_fetch", "abc").await, reqwest::StatusCode::BAD_REQUEST);
assert_eq!(
post_set_config(server, "max_ranges_per_fetch", &usize::MAX.to_string()).await,
reqwest::StatusCode::OK
);
assert_eq!(post_set_config(server, "disable_v2_reconstruction", "503").await, reqwest::StatusCode::OK);
{
let file = server.client().upload_random_file(&[(1, (0, 3))], CHUNK_SIZE).await.unwrap();
let v2_url = format!("{}/v2/reconstructions/{:?}", server.http_endpoint(), file.file_hash);
let resp = http_client.get(&v2_url).send().await.unwrap();
assert_eq!(resp.status(), reqwest::StatusCode::SERVICE_UNAVAILABLE);
}
assert_eq!(post_set_config(server, "disable_v2_reconstruction", "0").await, reqwest::StatusCode::OK);
assert_eq!(post_set_config(server, "disable_v2_reconstruction", "xyz").await, reqwest::StatusCode::BAD_REQUEST);
assert_eq!(post_set_config(server, "api_delay", "(50ms, 50ms)").await, reqwest::StatusCode::OK);
{
let file = server.client().upload_random_file(&[(1, (0, 3))], CHUNK_SIZE).await.unwrap();
let first_chunk = file.terms[0].chunk_hashes[0];
let start = std::time::Instant::now();
let _ = server.client().query_for_global_dedup_shard("default", &first_chunk).await;
assert!(
start.elapsed() >= std::time::Duration::from_millis(40),
"Expected delay of ~50ms, but got {:?}",
start.elapsed()
);
}
assert_eq!(post_set_config(server, "api_delay", "(0ms, 0ms)").await, reqwest::StatusCode::OK);
assert_eq!(post_set_config(server, "api_delay", "invalid").await, reqwest::StatusCode::BAD_REQUEST);
assert_eq!(post_set_config(server, "url_expiration", "5000").await, reqwest::StatusCode::OK);
assert_eq!(post_set_config(server, "url_expiration", "not_a_number").await, reqwest::StatusCode::BAD_REQUEST);
assert_eq!(post_set_config(server, "url_expiration", &u64::MAX.to_string()).await, reqwest::StatusCode::OK);
}
async fn check_simulation_dummy_upload(server: &LocalTestServer) {
let http_client = reqwest::Client::new();
let test_data = vec![0u8; 1024];
let start = std::time::Instant::now();
let response = http_client
.post(format!("{}/simulation/dummy_upload", server.http_endpoint()))
.body(test_data.clone())
.send()
.await
.unwrap();
assert_eq!(response.status(), reqwest::StatusCode::OK);
let no_delay_duration = start.elapsed();
let response = http_client
.post(format!("{}/simulation/set_config?config=random_delay&value=(50ms,50ms)", server.http_endpoint()))
.send()
.await
.unwrap();
assert_eq!(response.status(), reqwest::StatusCode::OK);
let start = std::time::Instant::now();
let response = http_client
.post(format!("{}/simulation/dummy_upload", server.http_endpoint()))
.body(test_data.clone())
.send()
.await
.unwrap();
assert_eq!(response.status(), reqwest::StatusCode::OK);
let with_delay_duration = start.elapsed();
assert!(
with_delay_duration >= Duration::from_millis(40),
"Expected delay of ~50ms, but got {:?}",
with_delay_duration
);
assert!(
with_delay_duration > no_delay_duration,
"Upload with delay ({:?}) should be slower than without ({:?})",
with_delay_duration,
no_delay_duration
);
let response = http_client
.post(format!("{}/simulation/dummy_upload", server.http_endpoint()))
.send()
.await
.unwrap();
assert_eq!(response.status(), reqwest::StatusCode::OK);
let response = http_client
.post(format!("{}/simulation/set_config?config=random_delay&value=(0ms,0ms)", server.http_endpoint()))
.send()
.await
.unwrap();
assert_eq!(response.status(), reqwest::StatusCode::OK);
}
async fn run_all_server_checks(server: &LocalTestServer) {
check_basic_correctness(server).await;
check_error_handling(server).await;
check_url_transformation(server).await;
check_reconstruction_term_hashes_match(server).await;
check_downloaded_terms_match_expected_data(server).await;
check_complete_file_reconstruction(server).await;
check_chunk_hashes_correctness(server).await;
check_simulation_set_config(server).await;
check_simulation_dummy_upload(server).await;
}
#[tokio::test]
async fn test_local_server() {
{
tracing::info!("Testing with in-memory storage");
let server = LocalTestServerBuilder::new().start().await;
assert!(server.client().list_xorbs().await.unwrap().is_empty());
run_all_server_checks(&server).await;
}
{
tracing::info!("Testing with disk-backed storage");
let server = LocalTestServerBuilder::new().with_ephemeral_disk().start().await;
assert!(server.client().list_xorbs().await.unwrap().is_empty());
run_all_server_checks(&server).await;
}
}
}