use std::collections::HashMap;
use std::fs::{File, metadata};
use std::io::{BufReader, Cursor, Seek, SeekFrom, Write};
use std::mem::size_of;
use std::ops::Range;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU16, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, LazyLock, Mutex, Weak};
use anyhow::anyhow;
use async_trait::async_trait;
use bytes::Bytes;
use rand::RngExt;
use redb::{ReadableDatabase, ReadableTable, TableDefinition};
use tempfile::TempDir;
use tokio::time::{Duration, Instant};
use tracing::{error, info, warn};
use xet_core_structures::merklehash::MerkleHash;
use xet_core_structures::metadata_shard::file_structs::MDBFileInfo;
use xet_core_structures::metadata_shard::shard_file_reconstructor::FileReconstructor;
use xet_core_structures::metadata_shard::shard_in_memory::MDBInMemoryShard;
use xet_core_structures::metadata_shard::streaming_shard::MDBMinimalShard;
use xet_core_structures::metadata_shard::utils::{parse_shard_filename, shard_file_name};
use xet_core_structures::metadata_shard::xorb_structs::MDBXorbInfo;
use xet_core_structures::metadata_shard::{MDBShardFile, ShardFileManager};
use xet_core_structures::serialization_utils::read_u32;
use xet_core_structures::xorb_object::{SerializedXorbObject, XorbObject};
#[cfg(feature = "fd-track")]
use xet_runtime::fd_diagnostics::{report_fd_count, track_fd_scope};
use xet_runtime::file_utils::SafeFileCreator;
use super::direct_access_client::DirectAccessClient;
use super::xorb_utils::{self, REFERENCE_INSTANT, duration_to_expiration_secs_ceil};
use crate::cas_client::Client;
use crate::cas_client::adaptive_concurrency::AdaptiveConcurrencyController;
use crate::cas_client::progress_tracked_streams::ProgressCallback;
use crate::cas_types::{
BatchQueryReconstructionResponse, FileRange, HexMerkleHash, HttpRange, QueryReconstructionResponse,
QueryReconstructionResponseV2, XorbMultiRangeFetch, XorbRangeDescriptor, XorbReconstructionFetchInfo,
};
use crate::error::{ClientError, Result};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct RedbHash(MerkleHash);
impl From<MerkleHash> for RedbHash {
fn from(h: MerkleHash) -> Self {
RedbHash(h)
}
}
impl From<RedbHash> for MerkleHash {
fn from(h: RedbHash) -> Self {
h.0
}
}
impl redb::Value for RedbHash {
type SelfType<'a> = RedbHash;
type AsBytes<'a> = [u8; 32];
fn fixed_width() -> Option<usize> {
Some(32)
}
fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
where
Self: 'a,
{
let mut hash = MerkleHash::default();
let u64s: &mut [u64; 4] = &mut hash;
for (i, chunk) in data.chunks_exact(8).enumerate() {
u64s[i] = u64::from_le_bytes(chunk.try_into().unwrap());
}
RedbHash(hash)
}
fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
where
Self: 'a + 'b,
{
let mut bytes = [0u8; 32];
let u64s: &[u64; 4] = &value.0;
for (i, &val) in u64s.iter().enumerate() {
bytes[i * 8..(i + 1) * 8].copy_from_slice(&val.to_le_bytes());
}
bytes
}
fn type_name() -> redb::TypeName {
redb::TypeName::new("MerkleHash")
}
}
impl redb::Key for RedbHash {
fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering {
data1.cmp(data2)
}
}
const GLOBAL_DEDUP_TABLE: TableDefinition<RedbHash, RedbHash> = TableDefinition::new("global_dedup");
const FILE_STATUS_TABLE: TableDefinition<RedbHash, bool> = TableDefinition::new("file_status");
type CachedDbWeak = Weak<redb::Database>;
static DB_CACHE: LazyLock<Mutex<HashMap<PathBuf, CachedDbWeak>>> = LazyLock::new(|| Mutex::new(HashMap::new()));
fn get_or_open_db(db_path: &Path) -> std::result::Result<Arc<redb::Database>, redb::DatabaseError> {
#[cfg(feature = "fd-track")]
let _fd_scope = track_fd_scope(format!("LocalClient::get_or_open_db({})", db_path.display()));
let mut map = DB_CACHE.lock().unwrap();
if let Some(weak) = map.get(db_path)
&& let Some(db) = Weak::upgrade(weak)
{
tracing::trace!(target: "xet_client::local_cas_redb", path = %db_path.display(), "DB_CACHE hit");
#[cfg(feature = "fd-track")]
report_fd_count("LocalClient::get_or_open_db cache hit");
return Ok(db);
}
map.retain(|_, weak| weak.strong_count() > 0);
tracing::trace!(target: "xet_client::local_cas_redb", path = %db_path.display(), "DB_CACHE miss");
let db = Arc::new(redb::Database::create(db_path)?);
map.insert(db_path.to_owned(), Arc::downgrade(&db));
#[cfg(feature = "fd-track")]
report_fd_count("LocalClient::get_or_open_db opened new DB");
Ok(db)
}
pub struct LocalClient {
db: Arc<redb::Database>,
shard_manager: Arc<ShardFileManager>,
xorb_dir: PathBuf,
shard_dir: PathBuf,
upload_concurrency_controller: Arc<AdaptiveConcurrencyController>,
url_expiration_ms: AtomicU64,
global_dedup_expiration_secs: AtomicU64,
random_ms_delay_window: (AtomicU64, AtomicU64),
max_ranges_per_fetch: AtomicUsize,
v2_disabled_status: AtomicU16,
_tmp_dir: Option<TempDir>,
}
impl LocalClient {
pub async fn temporary() -> Result<Arc<Self>> {
let tmp_dir = TempDir::new().unwrap();
let path = tmp_dir.path().to_owned();
let s = Self::new_internal(path, Some(tmp_dir)).await?;
Ok(Arc::new(s))
}
pub async fn new(path: impl AsRef<Path>) -> Result<Arc<Self>> {
let path = path.as_ref().to_owned();
Ok(Arc::new(Self::new_internal(path, None).await?))
}
async fn new_internal(path: impl AsRef<Path>, tmp_dir: Option<TempDir>) -> Result<Self> {
let base_dir = std::path::absolute(path)?;
if !base_dir.exists() {
std::fs::create_dir_all(&base_dir)?;
}
let base_dir = std::fs::canonicalize(&base_dir).unwrap_or(base_dir);
#[cfg(feature = "fd-track")]
let _fd_scope = track_fd_scope(format!("LocalClient::new_internal({})", base_dir.display()));
#[cfg(feature = "fd-track")]
report_fd_count("LocalClient::new_internal start");
let shard_dir = base_dir.join("shards");
if !shard_dir.exists() {
std::fs::create_dir_all(&shard_dir)?;
}
let xorb_dir = base_dir.join("xorbs");
if !xorb_dir.exists() {
std::fs::create_dir_all(&xorb_dir)?;
}
let db_path = base_dir.join("global_dedup_lookup.redb");
let db =
get_or_open_db(&db_path).map_err(|e| ClientError::Other(format!("Error opening redb database: {e}")))?;
#[cfg(feature = "fd-track")]
report_fd_count("LocalClient::new_internal after DB open");
{
let write_txn = db.begin_write().map_err(map_redb_db_error)?;
let _ = write_txn.open_table(GLOBAL_DEDUP_TABLE).map_err(map_redb_db_error)?;
let _ = write_txn.open_table(FILE_STATUS_TABLE).map_err(map_redb_db_error)?;
write_txn.commit().map_err(map_redb_db_error)?;
}
let shard_manager = ShardFileManager::new_in_session_directory(shard_dir.clone(), true).await?;
#[cfg(feature = "fd-track")]
report_fd_count("LocalClient::new_internal after shard manager init");
Ok(Self {
db,
shard_manager,
xorb_dir,
shard_dir,
upload_concurrency_controller: AdaptiveConcurrencyController::new_upload("local_uploads"),
url_expiration_ms: AtomicU64::new(u64::MAX),
global_dedup_expiration_secs: AtomicU64::new(0),
random_ms_delay_window: (AtomicU64::new(0), AtomicU64::new(0)),
max_ranges_per_fetch: AtomicUsize::new(usize::MAX),
v2_disabled_status: AtomicU16::new(0),
_tmp_dir: tmp_dir,
})
}
fn get_path_for_entry(&self, hash: &MerkleHash) -> PathBuf {
self.xorb_dir.join(format!("default.{hash:?}"))
}
fn is_file_deleted(&self, file_hash: &MerkleHash) -> bool {
let Ok(read_txn) = self.db.begin_read() else {
return false;
};
let Ok(table) = read_txn.open_table(FILE_STATUS_TABLE) else {
return false;
};
table
.get(&RedbHash::from(*file_hash))
.ok()
.flatten()
.map(|v| v.value())
.unwrap_or(false)
}
fn shard_file_paths(&self) -> Result<Vec<(MerkleHash, PathBuf)>> {
let mut result = Vec::new();
for entry in std::fs::read_dir(&self.shard_dir).map_err(ClientError::internal)? {
let entry = entry.map_err(ClientError::internal)?;
let path = entry.path();
if let Some(hash) = parse_shard_filename(&path)
&& path.is_file()
{
result.push((hash, path));
}
}
Ok(result)
}
fn shard_path_for_hash(&self, hash: &MerkleHash) -> Result<PathBuf> {
let path = self.shard_dir.join(shard_file_name(hash));
if path.exists() {
Ok(path)
} else {
Err(ClientError::Other(format!("Shard file not found for hash {}", hash.hex())))
}
}
#[cfg(test)]
fn load_all_shard_data(&self) -> Result<MDBInMemoryShard> {
let mut in_memory = MDBInMemoryShard::default();
for (_, path) in self.shard_file_paths()? {
let shard_bytes = std::fs::read(&path)?;
let minimal_shard = MDBMinimalShard::from_reader(&mut Cursor::new(&shard_bytes), true, true)?;
for i in 0..minimal_shard.num_files() {
in_memory.add_file_reconstruction_info(MDBFileInfo::from(minimal_shard.file(i).unwrap()))?;
}
for i in 0..minimal_shard.num_xorb() {
in_memory.add_xorb_block(MDBXorbInfo::from(minimal_shard.xorb(i).unwrap()))?;
}
}
Ok(in_memory)
}
#[cfg(test)]
async fn write_shard_data_and_register(&self, in_memory: &MDBInMemoryShard) -> Result<()> {
for (_, path) in self.shard_file_paths()? {
std::fs::remove_file(&path)?;
}
if !in_memory.is_empty() {
let shard_path = in_memory.write_to_directory(&self.shard_dir, None)?;
let shard = MDBShardFile::load_from_file(&shard_path)?;
self.shard_manager.register_shards(&[shard]).await?;
}
Ok(())
}
}
impl Drop for LocalClient {
fn drop(&mut self) {
#[cfg(feature = "fd-track")]
let _fd_scope = track_fd_scope(format!("LocalClient::drop({})", self.xorb_dir.display()));
#[cfg(feature = "fd-track")]
{
report_fd_count("LocalClient::drop start");
if let Ok(mut map) = DB_CACHE.lock() {
map.retain(|_, weak| weak.strong_count() > 0);
}
report_fd_count("LocalClient::drop end");
}
}
}
#[async_trait]
impl DirectAccessClient for LocalClient {
fn set_fetch_term_url_expiration(&self, expiration: Duration) {
self.url_expiration_ms.store(expiration.as_millis() as u64, Ordering::Relaxed);
}
fn set_global_dedup_shard_expiration(&self, expiration: Option<Duration>) {
self.global_dedup_expiration_secs
.store(duration_to_expiration_secs_ceil(expiration), Ordering::Relaxed);
}
fn set_max_ranges_per_fetch(&self, max_ranges: usize) {
self.max_ranges_per_fetch.store(max_ranges, Ordering::Relaxed);
}
fn disable_v2_reconstruction(&self, status_code: u16) {
self.v2_disabled_status.store(status_code, Ordering::Relaxed);
}
fn v2_disabled_status_code(&self) -> u16 {
self.v2_disabled_status.load(Ordering::Relaxed)
}
async fn get_reconstruction_v1(
&self,
file_id: &MerkleHash,
bytes_range: Option<FileRange>,
) -> Result<Option<QueryReconstructionResponse>> {
LocalClient::get_reconstruction_v1(self, file_id, bytes_range).await
}
async fn get_reconstruction_v2(
&self,
file_id: &MerkleHash,
bytes_range: Option<FileRange>,
) -> Result<Option<QueryReconstructionResponseV2>> {
LocalClient::get_reconstruction_v2(self, file_id, bytes_range).await
}
fn set_api_delay_range(&self, delay_range: Option<Range<Duration>>) {
match delay_range {
Some(range) => {
self.random_ms_delay_window
.0
.store(range.start.as_millis() as u64, Ordering::Relaxed);
self.random_ms_delay_window
.1
.store(range.end.as_millis() as u64, Ordering::Relaxed);
},
None => {
self.random_ms_delay_window.0.store(0, Ordering::Relaxed);
self.random_ms_delay_window.1.store(0, Ordering::Relaxed);
},
}
}
async fn apply_api_delay(&self) {
let min_ms = self.random_ms_delay_window.0.load(Ordering::Relaxed);
let max_ms = self.random_ms_delay_window.1.load(Ordering::Relaxed);
if min_ms == 0 && max_ms == 0 {
return;
}
let delay_ms = if min_ms == max_ms {
min_ms
} else {
rand::rng().random_range(min_ms..max_ms)
};
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
}
async fn list_xorbs(&self) -> Result<Vec<MerkleHash>> {
let mut ret = Vec::new();
self.xorb_dir
.read_dir()
.map_err(ClientError::internal)?
.filter_map(|x| x.ok())
.filter_map(|x| x.file_name().into_string().ok())
.for_each(|x| {
if let Some(pos) = x.rfind('.') {
let hash = &x[(pos + 1)..];
if let Ok(hash) = MerkleHash::from_hex(hash) {
ret.push(hash);
}
}
});
Ok(ret)
}
async fn delete_xorb(&self, hash: &MerkleHash) {
let file_path = self.get_path_for_entry(hash);
#[cfg(windows)]
{
if let Ok(metadata) = std::fs::metadata(&file_path) {
let mut permissions = metadata.permissions();
#[allow(clippy::permissions_set_readonly_false)]
permissions.set_readonly(false);
let _ = std::fs::set_permissions(&file_path, permissions);
}
}
let _ = std::fs::remove_file(file_path);
}
async fn get_full_xorb(&self, hash: &MerkleHash) -> Result<Bytes> {
let file_path = self.get_path_for_entry(hash);
let file = File::open(&file_path).map_err(|_| {
error!("Unable to find file in local CAS {:?}", file_path);
ClientError::XORBNotFound(*hash)
})?;
let mut reader = BufReader::new(file);
let xorb_obj = XorbObject::deserialize(&mut reader)?;
let result = xorb_obj.get_all_bytes(&mut reader)?;
Ok(Bytes::from(result))
}
async fn get_xorb_ranges(&self, hash: &MerkleHash, chunk_ranges: Vec<(u32, u32)>) -> Result<Vec<Bytes>> {
if chunk_ranges.is_empty() {
return Ok(vec![Bytes::new()]);
}
let file_path = self.get_path_for_entry(hash);
let file = File::open(&file_path).map_err(|_| {
error!("Unable to find file in local CAS {:?}", file_path);
ClientError::XORBNotFound(*hash)
})?;
let mut reader = BufReader::new(file);
let xorb_obj = XorbObject::deserialize(&mut reader)?;
let mut ret: Vec<Bytes> = Vec::new();
for r in chunk_ranges {
if r.0 >= r.1 {
ret.push(Bytes::new());
continue;
}
let data = xorb_obj.get_bytes_by_chunk_range(&mut reader, r.0, r.1)?;
ret.push(Bytes::from(data));
}
Ok(ret)
}
async fn xorb_length(&self, hash: &MerkleHash) -> Result<u32> {
let file_path = self.get_path_for_entry(hash);
match File::open(file_path) {
Ok(file) => {
let mut reader = BufReader::new(file);
let xorb_obj = XorbObject::deserialize(&mut reader)?;
let length = xorb_obj.get_all_bytes(&mut reader)?.len();
Ok(length as u32)
},
Err(_) => Err(ClientError::XORBNotFound(*hash)),
}
}
async fn xorb_exists(&self, hash: &MerkleHash) -> Result<bool> {
let file_path = self.get_path_for_entry(hash);
let Ok(md) = metadata(&file_path) else {
return Ok(false);
};
if !md.is_file() {
return Err(ClientError::InternalError(anyhow!(
"Attempting to write to {file_path:?}, but it is not a file"
)));
}
let Ok(file) = File::open(&file_path) else {
return Err(ClientError::XORBNotFound(*hash));
};
let mut reader = BufReader::new(file);
XorbObject::deserialize(&mut reader)?;
Ok(true)
}
async fn xorb_footer(&self, hash: &MerkleHash) -> Result<XorbObject> {
let file_path = self.get_path_for_entry(hash);
let mut file = File::open(&file_path).map_err(|_| {
error!("Unable to find xorb in local CAS {:?}", file_path);
ClientError::XORBNotFound(*hash)
})?;
file.seek(SeekFrom::End(-(size_of::<u32>() as i64)))?;
let info_length = read_u32(&mut file)?;
file.seek(SeekFrom::End(-(info_length as i64)))?;
let mut reader = BufReader::new(file);
let xorb_obj = XorbObject::deserialize(&mut reader)?;
Ok(xorb_obj)
}
async fn get_file_size(&self, hash: &MerkleHash) -> Result<u64> {
if self.is_file_deleted(hash) {
return Err(ClientError::FileNotFound(*hash));
}
let Some((file_info, _)) = self.shard_manager.get_file_reconstruction_info(hash).await? else {
return Err(ClientError::FileNotFound(*hash));
};
Ok(file_info.file_size())
}
async fn get_file_data(&self, hash: &MerkleHash, byte_range: Option<FileRange>) -> Result<Bytes> {
if self.is_file_deleted(hash) {
return Err(ClientError::FileNotFound(*hash));
}
let Some((file_info, _)) = self
.shard_manager
.get_file_reconstruction_info(hash)
.await
.map_err(ClientError::internal)?
else {
return Err(ClientError::FileNotFound(*hash));
};
let mut file_vec = Vec::new();
for entry in &file_info.segments {
let entry_bytes = self
.get_xorb_ranges(&entry.xorb_hash, vec![(entry.chunk_index_start, entry.chunk_index_end)])
.await?
.pop()
.unwrap();
file_vec.extend_from_slice(&entry_bytes);
}
let file_size = file_vec.len();
let start = byte_range.as_ref().map(|range| range.start as usize).unwrap_or(0);
if byte_range.is_some() && start >= file_size {
return Err(ClientError::InvalidRange);
}
let end = byte_range
.as_ref()
.map(|range| range.end as usize)
.unwrap_or(file_size)
.min(file_size);
Ok(Bytes::from(file_vec[start..end].to_vec()))
}
async fn get_xorb_raw_bytes(&self, hash: &MerkleHash, byte_range: Option<FileRange>) -> Result<Bytes> {
let file_path = self.get_path_for_entry(hash);
let data = std::fs::read(&file_path).map_err(|_| ClientError::XORBNotFound(*hash))?;
let start = byte_range.as_ref().map(|r| r.start as usize).unwrap_or(0);
let end = byte_range
.as_ref()
.map(|r| r.end as usize)
.unwrap_or(data.len())
.min(data.len());
if start >= data.len() {
return Err(ClientError::InvalidRange);
}
Ok(Bytes::from(data[start..end].to_vec()))
}
async fn xorb_raw_length(&self, hash: &MerkleHash) -> Result<u64> {
let file_path = self.get_path_for_entry(hash);
let metadata = std::fs::metadata(&file_path).map_err(|_| ClientError::XORBNotFound(*hash))?;
Ok(metadata.len())
}
async fn fetch_term_data(
&self,
hash: MerkleHash,
fetch_term: XorbReconstructionFetchInfo,
) -> Result<(Bytes, Vec<u32>)> {
self.apply_api_delay().await;
let (file_path, url_byte_range, url_timestamp) = parse_fetch_url(&fetch_term.url)?;
let expiration_ms = self.url_expiration_ms.load(Ordering::Relaxed);
let elapsed_ms = Instant::now().saturating_duration_since(url_timestamp).as_millis() as u64;
if elapsed_ms > expiration_ms {
return Err(ClientError::PresignedUrlExpirationError);
}
let fetch_byte_range = FileRange::from(fetch_term.url_range);
if url_byte_range.start != fetch_byte_range.start || url_byte_range.end != fetch_byte_range.end {
return Err(ClientError::InvalidArguments);
}
let file = File::open(&file_path).map_err(|_| {
error!("Unable to find xorb in local CAS {:?}", file_path);
ClientError::XORBNotFound(hash)
})?;
let mut reader = BufReader::new(file);
let xorb_obj = XorbObject::deserialize(&mut reader)?;
let data = xorb_obj.get_bytes_by_chunk_range(&mut reader, fetch_term.range.start, fetch_term.range.end)?;
let chunk_byte_indices = {
let mut indices = Vec::new();
let mut cumulative = 0u32;
indices.push(0);
for chunk_idx in fetch_term.range.start..fetch_term.range.end {
let chunk_len = xorb_obj
.uncompressed_chunk_length(chunk_idx)
.map_err(|e| ClientError::Other(format!("Failed to get chunk length: {e}")))?;
cumulative += chunk_len;
indices.push(cumulative);
}
indices
};
Ok((data.into(), chunk_byte_indices))
}
}
#[async_trait]
impl super::DeletionControlableClient for LocalClient {
async fn list_shard_entries(&self) -> Result<Vec<MerkleHash>> {
Ok(self.shard_file_paths()?.into_iter().map(|(h, _)| h).collect())
}
async fn get_shard_bytes(&self, hash: &MerkleHash) -> Result<Bytes> {
let path = self.shard_path_for_hash(hash)?;
let data = std::fs::read(&path)?;
Ok(Bytes::from(data))
}
async fn delete_shard_entry(&self, hash: &MerkleHash) -> Result<()> {
let path = self.shard_path_for_hash(hash)?;
std::fs::remove_file(&path)?;
Ok(())
}
async fn list_file_shard_entries(&self) -> Result<Vec<(MerkleHash, MerkleHash)>> {
let mut entries = Vec::new();
for (shard_hash, path) in self.shard_file_paths()? {
let shard_bytes = std::fs::read(&path)?;
let minimal_shard = MDBMinimalShard::from_reader(&mut Cursor::new(&shard_bytes), true, false)?;
for i in 0..minimal_shard.num_files() {
let file_view = minimal_shard.file(i).unwrap();
let fh = file_view.file_hash();
if !self.is_file_deleted(&fh) {
entries.push((fh, shard_hash));
}
}
}
Ok(entries)
}
async fn delete_file_entry(&self, file_hash: &MerkleHash) -> Result<()> {
let write_txn = self.db.begin_write().map_err(map_redb_db_error)?;
{
let mut table = write_txn.open_table(FILE_STATUS_TABLE).map_err(map_redb_db_error)?;
table.insert(&RedbHash::from(*file_hash), &true).map_err(map_redb_db_error)?;
}
write_txn.commit().map_err(map_redb_db_error)?;
Ok(())
}
async fn remove_shard_dedup_entries(&self, shard_hash: &MerkleHash) -> Result<()> {
let shard_redb = RedbHash::from(*shard_hash);
for _ in 0..4 {
let to_delete: Vec<RedbHash> = {
let read_txn = self.db.begin_read().map_err(map_redb_db_error)?;
let table = read_txn.open_table(GLOBAL_DEDUP_TABLE).map_err(map_redb_db_error)?;
table
.iter()
.map_err(map_redb_db_error)?
.filter_map(|entry| entry.ok())
.filter(|(_, v)| v.value() == shard_redb)
.map(|(k, _)| k.value())
.collect()
};
if to_delete.is_empty() {
return Ok(());
}
let write_txn = self.db.begin_write().map_err(map_redb_db_error)?;
{
let mut table = write_txn.open_table(GLOBAL_DEDUP_TABLE).map_err(map_redb_db_error)?;
for chunk_hash in &to_delete {
table.remove(chunk_hash).map_err(map_redb_db_error)?;
}
}
write_txn.commit().map_err(map_redb_db_error)?;
}
let still_present = {
let read_txn = self.db.begin_read().map_err(map_redb_db_error)?;
let table = read_txn.open_table(GLOBAL_DEDUP_TABLE).map_err(map_redb_db_error)?;
table
.iter()
.map_err(map_redb_db_error)?
.filter_map(|entry| entry.ok())
.any(|(_, v)| v.value() == shard_redb)
};
if still_present {
return Err(ClientError::Other(format!(
"Unable to fully remove dedup entries for shard {} due to concurrent updates",
shard_hash.hex()
)));
}
Ok(())
}
async fn verify_all_reachable(&self) -> Result<()> {
let shard_files = self.shard_file_paths()?;
let mut xorbs_in_shard_entries: std::collections::HashSet<MerkleHash> = std::collections::HashSet::new();
let mut xorbs_in_active_file_entries: std::collections::HashSet<MerkleHash> = std::collections::HashSet::new();
let mut shards_with_active_files: std::collections::HashSet<MerkleHash> = std::collections::HashSet::new();
let mut shard_xorbs: std::collections::HashMap<MerkleHash, Vec<MerkleHash>> = std::collections::HashMap::new();
for (shard_hash, path) in &shard_files {
let shard_bytes = std::fs::read(path)?;
let minimal_shard = MDBMinimalShard::from_reader(&mut Cursor::new(&shard_bytes), true, true)?;
for i in 0..minimal_shard.num_xorb() {
let xorb_hash = minimal_shard.xorb(i).unwrap().xorb_hash();
xorbs_in_shard_entries.insert(xorb_hash);
shard_xorbs.entry(*shard_hash).or_default().push(xorb_hash);
}
let mut has_active_file = false;
for i in 0..minimal_shard.num_files() {
let file_view = minimal_shard.file(i).unwrap();
if !self.is_file_deleted(&file_view.file_hash()) {
has_active_file = true;
for seg_idx in 0..file_view.num_entries() {
xorbs_in_active_file_entries.insert(file_view.entry(seg_idx).xorb_hash);
}
}
}
if has_active_file {
shards_with_active_files.insert(*shard_hash);
}
}
let mut errors: Vec<String> = Vec::new();
for (shard_hash, _) in &shard_files {
if !shards_with_active_files.contains(shard_hash) {
let has_file_referenced_xorb = shard_xorbs
.get(shard_hash)
.is_some_and(|xorbs| xorbs.iter().any(|x| xorbs_in_active_file_entries.contains(x)));
if !has_file_referenced_xorb {
errors.push(format!(
"Reachability error: shard {} has no active file entries and no \
xorbs referenced by any active file (GC should have deleted it)",
shard_hash.hex()
));
}
}
}
for xorb_hash in self.list_xorbs().await? {
if !xorbs_in_shard_entries.contains(&xorb_hash) && !xorbs_in_active_file_entries.contains(&xorb_hash) {
errors.push(format!(
"Reachability error: xorb {} exists on disk but is not referenced by \
any shard xorb entry or active file entry (GC should have deleted it)",
xorb_hash.hex()
));
}
}
if errors.is_empty() {
Ok(())
} else {
Err(ClientError::Other(errors.join("\n")))
}
}
async fn verify_integrity(&self) -> Result<()> {
let shard_files = self.shard_file_paths()?;
let mut global_xorb_chunk_counts: HashMap<MerkleHash, usize> = HashMap::new();
for (shard_hash, path) in &shard_files {
let shard_bytes = std::fs::read(path)?;
let minimal_shard = MDBMinimalShard::from_reader(&mut Cursor::new(&shard_bytes), true, true)?;
for i in 0..minimal_shard.num_xorb() {
let xorb_view = minimal_shard.xorb(i).unwrap();
let xorb_hash = xorb_view.xorb_hash();
let xorb_path = self.get_path_for_entry(&xorb_hash);
if !xorb_path.exists() {
return Err(ClientError::Other(format!(
"Integrity error: shard {} references non-existent XORB {}",
shard_hash.hex(),
xorb_hash.hex()
)));
}
global_xorb_chunk_counts.entry(xorb_hash).or_insert(xorb_view.num_entries());
}
}
for (shard_hash, path) in &shard_files {
let shard_bytes = std::fs::read(path)?;
let minimal_shard = MDBMinimalShard::from_reader(&mut Cursor::new(&shard_bytes), true, true)?;
for i in 0..minimal_shard.num_files() {
let file_view = minimal_shard.file(i).unwrap();
let fh = file_view.file_hash();
if self.is_file_deleted(&fh) {
continue;
}
for seg_idx in 0..file_view.num_entries() {
let segment = file_view.entry(seg_idx);
let xorb_path = self.get_path_for_entry(&segment.xorb_hash);
if let Some(&chunk_count) = global_xorb_chunk_counts.get(&segment.xorb_hash) {
if segment.chunk_index_end as usize > chunk_count {
return Err(ClientError::Other(format!(
"Integrity error: file {} references chunk range {}..{} \
but XORB block {} only has {} chunks",
fh.hex(),
segment.chunk_index_start,
segment.chunk_index_end,
segment.xorb_hash.hex(),
chunk_count
)));
}
} else if xorb_path.exists() {
} else {
return Err(ClientError::Other(format!(
"Integrity error: file {} in shard {} references XORB {} \
that has no shard index entry and no XORB file on disk",
fh.hex(),
shard_hash.hex(),
segment.xorb_hash.hex()
)));
}
}
}
}
Ok(())
}
}
impl LocalClient {
async fn compute_reconstruction_ranges(
&self,
file_id: &MerkleHash,
bytes_range: Option<FileRange>,
) -> Result<xorb_utils::ReconstructionRangesResult> {
if self.is_file_deleted(file_id) {
return Ok(None);
}
let Some((file_info, _)) = self.shard_manager.get_file_reconstruction_info(file_id).await? else {
return Ok(None);
};
xorb_utils::compute_reconstruction_ranges(&file_info, bytes_range, &mut |hash| self.xorb_footer_sync(hash))
}
fn xorb_footer_sync(&self, hash: &MerkleHash) -> Result<XorbObject> {
let file_path = self.get_path_for_entry(hash);
let mut file = File::open(&file_path).map_err(|_| {
error!("Unable to find file in local CAS {:?}", file_path);
ClientError::XORBNotFound(*hash)
})?;
XorbObject::deserialize(&mut file).map_err(Into::into)
}
pub async fn get_reconstruction_v1(
&self,
file_id: &MerkleHash,
bytes_range: Option<FileRange>,
) -> Result<Option<QueryReconstructionResponse>> {
self.apply_api_delay().await;
let result = self.compute_reconstruction_ranges(file_id, bytes_range).await?;
let Some((offset_into_first_range, terms, merged_ranges)) = result else {
return Ok(None);
};
if terms.is_empty() {
return Ok(Some(QueryReconstructionResponse {
offset_into_first_range,
terms,
fetch_info: HashMap::new(),
}));
}
let timestamp = Instant::now();
let mut fetch_info: HashMap<HexMerkleHash, Vec<XorbReconstructionFetchInfo>> = HashMap::new();
for (hash, ranges) in merged_ranges {
let file_path = self.get_path_for_entry(&hash);
let entries = ranges
.into_iter()
.map(|r| XorbReconstructionFetchInfo {
range: r.chunk_range,
url: generate_fetch_url(&file_path, &r.byte_range, timestamp),
url_range: HttpRange::from(r.byte_range),
})
.collect();
fetch_info.insert(hash.into(), entries);
}
Ok(Some(QueryReconstructionResponse {
offset_into_first_range,
terms,
fetch_info,
}))
}
pub async fn get_reconstruction_v2(
&self,
file_id: &MerkleHash,
bytes_range: Option<FileRange>,
) -> Result<Option<QueryReconstructionResponseV2>> {
self.apply_api_delay().await;
let result = self.compute_reconstruction_ranges(file_id, bytes_range).await?;
let Some((offset_into_first_range, terms, merged_ranges)) = result else {
return Ok(None);
};
if terms.is_empty() {
return Ok(Some(QueryReconstructionResponseV2 {
offset_into_first_range,
terms,
xorbs: HashMap::new(),
}));
}
let timestamp = Instant::now();
let max_ranges = self.max_ranges_per_fetch.load(Ordering::Relaxed);
let mut xorbs: HashMap<HexMerkleHash, Vec<XorbMultiRangeFetch>> = HashMap::new();
for (hash, ranges) in merged_ranges {
let mut fetch_entries = Vec::new();
for chunk in ranges.chunks(max_ranges) {
let range_descriptors: Vec<XorbRangeDescriptor> = chunk
.iter()
.map(|r| XorbRangeDescriptor {
chunks: r.chunk_range,
bytes: HttpRange::from(r.byte_range),
})
.collect();
let url = generate_v2_fetch_url(&hash, &range_descriptors, timestamp);
fetch_entries.push(XorbMultiRangeFetch {
url,
ranges: range_descriptors,
});
}
xorbs.insert(hash.into(), fetch_entries);
}
Ok(Some(QueryReconstructionResponseV2 {
offset_into_first_range,
terms,
xorbs,
}))
}
}
#[async_trait]
impl Client for LocalClient {
async fn get_file_reconstruction_info(
&self,
file_hash: &MerkleHash,
) -> Result<Option<(MDBFileInfo, Option<MerkleHash>)>> {
self.apply_api_delay().await;
if self.is_file_deleted(file_hash) {
return Ok(None);
}
Ok(self.shard_manager.get_file_reconstruction_info(file_hash).await?)
}
async fn query_for_global_dedup_shard(&self, _prefix: &str, chunk_hash: &MerkleHash) -> Result<Option<Bytes>> {
self.apply_api_delay().await;
let read_txn = self.db.begin_read().map_err(map_redb_db_error)?;
let table = read_txn.open_table(GLOBAL_DEDUP_TABLE).map_err(map_redb_db_error)?;
if let Some(shard) = table.get(&RedbHash::from(*chunk_hash)).map_err(map_redb_db_error)? {
let shard_hash: MerkleHash = shard.value().into();
let filename = self.shard_dir.join(shard_file_name(&shard_hash));
let expiration_secs = self.global_dedup_expiration_secs.load(Ordering::Relaxed);
if expiration_secs == 0 {
return Ok(Some(std::fs::read(filename)?.into()));
}
let expiry = std::time::SystemTime::now() + Duration::from_secs(expiration_secs);
let shard_bytes = std::fs::read(filename)?;
let mut reader = Cursor::new(&shard_bytes);
let minimal_shard = MDBMinimalShard::from_reader(&mut reader, true, true)?;
let mut out = Vec::new();
minimal_shard.serialize_xorb_subset_with_expiry(&mut out, Some(expiry), |_| true)?;
Ok(Some(out.into()))
} else {
Ok(None)
}
}
async fn acquire_upload_permit(&self) -> Result<super::super::adaptive_concurrency::ConnectionPermit> {
self.apply_api_delay().await;
self.upload_concurrency_controller.acquire_connection_permit().await
}
async fn upload_shard(
&self,
shard_data: Bytes,
_permit: super::super::adaptive_concurrency::ConnectionPermit,
) -> Result<bool> {
self.apply_api_delay().await;
let mut reader = Cursor::new(&shard_data);
let minimal_shard = MDBMinimalShard::from_reader(&mut reader, true, true)?;
let mut in_memory_shard = MDBInMemoryShard::default();
for i in 0..minimal_shard.num_files() {
let file_view = minimal_shard.file(i).unwrap();
in_memory_shard.add_file_reconstruction_info(MDBFileInfo::from(file_view))?;
}
for i in 0..minimal_shard.num_xorb() {
let xorb_view = minimal_shard.xorb(i).unwrap();
in_memory_shard.add_xorb_block(MDBXorbInfo::from(xorb_view))?;
}
let shard_path = in_memory_shard.write_to_directory(&self.shard_dir, None)?;
let shard = MDBShardFile::load_from_file(&shard_path)?;
let shard_hash = shard.shard_hash;
self.shard_manager.register_shards(&[shard]).await?;
let chunk_hashes = minimal_shard.global_dedup_eligible_chunks();
let shard_hash_redb = RedbHash::from(shard_hash);
let write_txn = self.db.begin_write().map_err(map_redb_db_error)?;
{
let mut dedup_table = write_txn.open_table(GLOBAL_DEDUP_TABLE).map_err(map_redb_db_error)?;
for chunk in chunk_hashes {
dedup_table
.insert(&RedbHash::from(chunk), &shard_hash_redb)
.map_err(map_redb_db_error)?;
}
let mut status_table = write_txn.open_table(FILE_STATUS_TABLE).map_err(map_redb_db_error)?;
for i in 0..minimal_shard.num_files() {
let file_hash = minimal_shard.file(i).unwrap().file_hash();
status_table
.insert(&RedbHash::from(file_hash), &false)
.map_err(map_redb_db_error)?;
}
}
write_txn.commit().map_err(map_redb_db_error)?;
Ok(true)
}
async fn upload_xorb(
&self,
_prefix: &str,
serialized_xorb_object: SerializedXorbObject,
progress_callback: Option<ProgressCallback>,
_permit: super::super::adaptive_concurrency::ConnectionPermit,
) -> Result<u64> {
self.apply_api_delay().await;
let hash = serialized_xorb_object.hash;
let footer_start = serialized_xorb_object.footer_start;
let serialized_data = serialized_xorb_object.serialized_data;
if self.xorb_exists(&hash).await? {
info!("object {hash:?} already exists in Local CAS; returning.");
return Ok(0);
}
let data_to_write = if footer_start.is_some() {
serialized_data
} else {
let mut data_with_footer = Vec::new();
let (_, computed_hash) = xet_core_structures::xorb_object::reconstruct_xorb_with_footer(
&mut data_with_footer,
&serialized_data,
)?;
if computed_hash != hash {
return Err(ClientError::Other(format!(
"XORB hash mismatch: expected {}, got {}",
hash.hex(),
computed_hash.hex(),
)));
}
data_with_footer
};
let file_path = self.get_path_for_entry(&hash);
info!("Writing XORB {hash:?} to local path {file_path:?}");
let total = data_to_write.len() as u64;
let mut file = SafeFileCreator::new(&file_path)?;
for i in 0..10 {
let start = (i * data_to_write.len()) / 10;
let end = ((i + 1) * data_to_write.len()) / 10;
let chunk_len = end - start;
file.write_all(&data_to_write[start..end])?;
if let Some(ref cb) = progress_callback {
let completed = end as u64;
let delta = chunk_len as u64;
cb(delta, completed, total);
}
}
let bytes_written = data_to_write.len();
file.close()?;
#[cfg(unix)]
if let Ok(metadata) = metadata(&file_path) {
let mut permissions = metadata.permissions();
permissions.set_readonly(true);
let _ = std::fs::set_permissions(&file_path, permissions);
}
info!("{file_path:?} successfully written with {bytes_written} bytes.");
Ok(bytes_written as u64)
}
async fn get_reconstruction(
&self,
file_id: &MerkleHash,
bytes_range: Option<FileRange>,
) -> Result<Option<QueryReconstructionResponseV2>> {
self.get_reconstruction_v2(file_id, bytes_range).await
}
async fn batch_get_reconstruction(&self, file_ids: &[MerkleHash]) -> Result<BatchQueryReconstructionResponse> {
self.apply_api_delay().await;
let mut files = HashMap::new();
let mut fetch_info_map: HashMap<HexMerkleHash, Vec<XorbReconstructionFetchInfo>> = HashMap::new();
for file_id in file_ids {
if let Some(response) = self.get_reconstruction_v1(file_id, None).await? {
let hex_hash: HexMerkleHash = (*file_id).into();
files.insert(hex_hash, response.terms);
for (hash, fetch_infos) in response.fetch_info {
fetch_info_map.entry(hash).or_default().extend(fetch_infos);
}
}
}
Ok(BatchQueryReconstructionResponse {
files,
fetch_info: fetch_info_map,
})
}
async fn acquire_download_permit(&self) -> Result<super::super::adaptive_concurrency::ConnectionPermit> {
self.apply_api_delay().await;
self.upload_concurrency_controller.acquire_connection_permit().await
}
async fn get_file_term_data(
&self,
url_info: Box<dyn super::super::interface::URLProvider>,
_download_permit: super::super::adaptive_concurrency::ConnectionPermit,
progress_callback: Option<ProgressCallback>,
uncompressed_size_if_known: Option<usize>,
) -> Result<(Bytes, Vec<u32>)> {
for attempt in 0..2 {
self.apply_api_delay().await;
let (url, http_ranges) = url_info.retrieve_url().await?;
let (file_path, url_timestamp) = if let Ok((path, _, ts)) = parse_fetch_url(&url) {
(path, ts)
} else {
let (hash, ts, _) = xorb_utils::parse_v2_fetch_url(&url)?;
(self.get_path_for_entry(&hash), ts)
};
let expiration_ms = self.url_expiration_ms.load(Ordering::Relaxed);
let elapsed_ms = Instant::now().saturating_duration_since(url_timestamp).as_millis() as u64;
if elapsed_ms > expiration_ms {
if attempt == 0 {
url_info.refresh_url().await?;
continue;
}
return Err(ClientError::PresignedUrlExpirationError);
}
let mut file = File::open(&file_path).map_err(|_| ClientError::XORBNotFound(MerkleHash::default()))?;
let mut all_decompressed = Vec::new();
let mut all_chunk_indices = Vec::<u32>::new();
let mut total_transfer = 0u64;
for http_range in &http_ranges {
let len = http_range.length() as usize;
total_transfer += http_range.length();
file.seek(SeekFrom::Start(http_range.start))?;
let mut data = vec![0u8; len];
std::io::Read::read_exact(&mut file, &mut data)?;
let (decompressed, chunk_indices) =
xet_core_structures::xorb_object::deserialize_chunks(&mut Cursor::new(&data))?;
xet_core_structures::xorb_object::append_chunk_segment(
&mut all_decompressed,
&mut all_chunk_indices,
&decompressed,
&chunk_indices,
);
}
if let Some(expected) = uncompressed_size_if_known {
debug_assert_eq!(
all_decompressed.len(),
expected,
"get_file_term_data: expected {} bytes, got {}",
expected,
all_decompressed.len()
);
}
if let Some(ref cb) = progress_callback {
cb(total_transfer, total_transfer, total_transfer);
}
return Ok((Bytes::from(all_decompressed), all_chunk_indices));
}
Err(ClientError::PresignedUrlExpirationError)
}
}
fn map_redb_db_error(e: impl std::fmt::Debug) -> ClientError {
let msg = format!("Global shard dedup database error: {e:?}");
warn!("{msg}");
ClientError::Other(msg)
}
fn generate_fetch_url(file_path: &Path, byte_range: &FileRange, timestamp: Instant) -> String {
let timestamp_ms = timestamp.saturating_duration_since(*REFERENCE_INSTANT).as_millis() as u64;
format!("{}:{}:{}:{}", file_path.display(), byte_range.start, byte_range.end, timestamp_ms)
}
fn parse_fetch_url(url: &str) -> Result<(PathBuf, FileRange, Instant)> {
let mut parts = url.rsplitn(4, ':').collect::<Vec<_>>();
parts.reverse();
if parts.len() != 4 {
return Err(ClientError::InvalidArguments);
}
let file_path_str = parts[0];
let start_pos: u64 = parts[1].parse().map_err(|_| ClientError::InvalidArguments)?;
let end_pos: u64 = parts[2].parse().map_err(|_| ClientError::InvalidArguments)?;
let timestamp_ms: u64 = parts[3].parse().map_err(|_| ClientError::InvalidArguments)?;
let file_path: PathBuf = file_path_str.into();
let byte_range = FileRange::new(start_pos, end_pos);
let timestamp = *REFERENCE_INSTANT + Duration::from_millis(timestamp_ms);
Ok((file_path, byte_range, timestamp))
}
fn generate_v2_fetch_url(hash: &MerkleHash, ranges: &[XorbRangeDescriptor], timestamp: Instant) -> String {
xorb_utils::generate_v2_fetch_url(hash, ranges, timestamp)
}
#[cfg(test)]
mod tests {
use xet_core_structures::xorb_object::CompressionScheme;
use xet_core_structures::xorb_object::xorb_format_test_utils::{
ChunkSize, build_and_verify_xorb_object, build_raw_xorb,
};
use super::*;
use crate::cas_client::simulation::DeletionControlableClient;
use crate::cas_client::simulation::client_testing_utils::ClientTestingUtils;
use crate::cas_types::{ChunkRange, XorbReconstructionFetchInfo};
#[tokio::test]
async fn test_common_client_suite() {
crate::cas_client::simulation::client_unit_testing::test_client_functionality(|| async {
LocalClient::temporary().await.unwrap()
as std::sync::Arc<dyn crate::cas_client::simulation::DirectAccessClient>
})
.await;
}
#[cfg(unix)]
#[tokio::test]
async fn db_cache_unifies_symlink_equivalent_paths() {
let tmp = tempfile::tempdir().unwrap();
let real = tmp.path().join("real");
std::fs::create_dir_all(&real).unwrap();
let link = tmp.path().join("link");
std::os::unix::fs::symlink(&real, &link).unwrap();
let c1 = LocalClient::new(&link).await.unwrap();
let c2 = LocalClient::new(&real).await.unwrap();
assert!(Arc::ptr_eq(&c1.db, &c2.db));
}
#[tokio::test]
async fn test_download_fetch_term_data_validation() {
let xorb = build_raw_xorb(3, ChunkSize::Fixed(2048));
let xorb_obj = build_and_verify_xorb_object(xorb, CompressionScheme::Auto);
let hash = xorb_obj.hash;
let client = LocalClient::temporary().await.unwrap();
let permit = client.acquire_upload_permit().await.unwrap();
client.upload_xorb("default", xorb_obj, None, permit).await.unwrap();
let file_path = client.get_path_for_entry(&hash);
let file = File::open(&file_path).unwrap();
let mut reader = BufReader::new(file);
let xorb_obj = XorbObject::deserialize(&mut reader).unwrap();
let (fetch_byte_start, fetch_byte_end) = xorb_obj.get_byte_offset(0, 1).unwrap();
let timestamp = Instant::now();
let byte_range = FileRange::new(fetch_byte_start as u64, fetch_byte_end as u64);
let valid_url = generate_fetch_url(&file_path, &byte_range, timestamp);
let valid_url_range = HttpRange::from(byte_range);
let valid_fetch_term = XorbReconstructionFetchInfo {
range: ChunkRange::new(0, 1),
url: valid_url.clone(),
url_range: valid_url_range,
};
let result = client.fetch_term_data(hash, valid_fetch_term).await;
assert!(result.is_ok(), "Valid fetch_term should succeed");
let too_few_parts = "filename:123:456";
let invalid_fetch_term = XorbReconstructionFetchInfo {
range: ChunkRange::new(0, 1),
url: too_few_parts.to_string(),
url_range: valid_url_range,
};
let result = client.fetch_term_data(hash, invalid_fetch_term).await;
assert!(result.is_err(), "URL with too few parts should fail");
assert!(matches!(result.unwrap_err(), ClientError::InvalidArguments));
let wrong_byte_range = FileRange::new(fetch_byte_start as u64 + 1, fetch_byte_end as u64);
let wrong_start_pos = generate_fetch_url(&file_path, &wrong_byte_range, timestamp);
let invalid_fetch_term = XorbReconstructionFetchInfo {
range: ChunkRange::new(0, 1),
url: wrong_start_pos,
url_range: valid_url_range,
};
let result = client.fetch_term_data(hash, invalid_fetch_term).await;
assert!(result.is_err(), "Wrong start_pos should fail");
assert!(matches!(result.unwrap_err(), ClientError::InvalidArguments));
let wrong_byte_range = FileRange::new(fetch_byte_start as u64, fetch_byte_end as u64 + 1);
let wrong_end_pos = generate_fetch_url(&file_path, &wrong_byte_range, timestamp);
let invalid_fetch_term = XorbReconstructionFetchInfo {
range: ChunkRange::new(0, 1),
url: wrong_end_pos,
url_range: valid_url_range,
};
let result = client.fetch_term_data(hash, invalid_fetch_term).await;
assert!(result.is_err(), "Wrong end_pos should fail");
assert!(matches!(result.unwrap_err(), ClientError::InvalidArguments));
let timestamp_ms = timestamp.saturating_duration_since(*REFERENCE_INSTANT).as_millis() as u64;
let non_numeric_start = format!("{}:not_a_number:{}:{}", file_path.display(), fetch_byte_end, timestamp_ms);
let invalid_fetch_term = XorbReconstructionFetchInfo {
range: ChunkRange::new(0, 1),
url: non_numeric_start,
url_range: valid_url_range,
};
let result = client.fetch_term_data(hash, invalid_fetch_term).await;
assert!(result.is_err(), "Non-numeric start_pos should fail");
assert!(matches!(result.unwrap_err(), ClientError::InvalidArguments));
let non_numeric_end = format!("{}:{}:not_a_number:{}", file_path.display(), fetch_byte_start, timestamp_ms);
let invalid_fetch_term = XorbReconstructionFetchInfo {
range: ChunkRange::new(0, 1),
url: non_numeric_end,
url_range: valid_url_range,
};
let result = client.fetch_term_data(hash, invalid_fetch_term).await;
assert!(result.is_err(), "Non-numeric end_pos should fail");
assert!(matches!(result.unwrap_err(), ClientError::InvalidArguments));
let invalid_fetch_term = XorbReconstructionFetchInfo {
range: ChunkRange::new(0, 1),
url: String::new(),
url_range: valid_url_range,
};
let result = client.fetch_term_data(hash, invalid_fetch_term).await;
assert!(result.is_err(), "Empty URL should fail");
assert!(matches!(result.unwrap_err(), ClientError::InvalidArguments));
let non_numeric_timestamp =
format!("{}:{}:{}:not_a_number", file_path.display(), fetch_byte_start, fetch_byte_end);
let invalid_fetch_term = XorbReconstructionFetchInfo {
range: ChunkRange::new(0, 1),
url: non_numeric_timestamp,
url_range: valid_url_range,
};
let result = client.fetch_term_data(hash, invalid_fetch_term).await;
assert!(result.is_err(), "Non-numeric timestamp should fail");
assert!(matches!(result.unwrap_err(), ClientError::InvalidArguments));
let non_existent_path = PathBuf::from("/nonexistent/path/file.xorb");
let non_existent_url = generate_fetch_url(&non_existent_path, &byte_range, timestamp);
let invalid_fetch_term = XorbReconstructionFetchInfo {
range: ChunkRange::new(0, 1),
url: non_existent_url,
url_range: valid_url_range,
};
let result = client.fetch_term_data(hash, invalid_fetch_term).await;
assert!(result.is_err(), "Non-existent file should fail");
}
#[tokio::test(start_paused = true)]
async fn test_url_expiration() {
super::super::client_unit_testing::test_url_expiration_functionality(|| async {
LocalClient::temporary().await.unwrap()
as std::sync::Arc<dyn crate::cas_client::simulation::DirectAccessClient>
})
.await;
}
#[tokio::test(start_paused = true)]
async fn test_api_delay() {
super::super::client_unit_testing::test_api_delay_functionality(|| async {
LocalClient::temporary().await.unwrap()
as std::sync::Arc<dyn crate::cas_client::simulation::DirectAccessClient>
})
.await;
}
#[tokio::test(start_paused = true)]
async fn test_global_dedup_shard_expiration() {
super::super::client_unit_testing::test_global_dedup_shard_expiration_functionality(|| async {
LocalClient::temporary().await.unwrap()
as std::sync::Arc<dyn crate::cas_client::simulation::DirectAccessClient>
})
.await;
}
#[tokio::test]
async fn test_global_dedup_shard_expiration_stress() {
super::super::client_unit_testing::test_global_dedup_shard_expiration_stress(|| async {
LocalClient::temporary().await.unwrap()
as std::sync::Arc<dyn crate::cas_client::simulation::DirectAccessClient>
})
.await;
}
#[tokio::test]
async fn test_deletion_suite() {
super::super::deletion_unit_testing::test_deletion_functionality(|| async {
LocalClient::temporary().await.unwrap()
})
.await;
}
#[tokio::test]
async fn test_verify_integrity_detects_missing_cas_block_reference() {
let client = LocalClient::temporary().await.unwrap();
client.upload_random_file(&[(3, (0, 3)), (4, (0, 2))], 2048).await.unwrap();
client.verify_integrity().await.unwrap();
let mut in_memory = client.load_all_shard_data().unwrap();
let removed_hash = *in_memory.xorb_content.keys().next().unwrap();
in_memory.xorb_content.remove(&removed_hash);
client.delete_xorb(&removed_hash).await;
client.write_shard_data_and_register(&in_memory).await.unwrap();
assert!(client.verify_integrity().await.is_err());
}
#[tokio::test]
async fn test_verify_integrity_detects_invalid_chunk_range() {
let client = LocalClient::temporary().await.unwrap();
client.upload_random_file(&[(5, (0, 3))], 2048).await.unwrap();
client.verify_integrity().await.unwrap();
let mut in_memory = client.load_all_shard_data().unwrap();
let file_info = in_memory.file_content.values_mut().next().unwrap();
let segment = file_info.segments.first_mut().unwrap();
let xorb_entry_count = in_memory.xorb_content.get(&segment.xorb_hash).unwrap().metadata.num_entries;
segment.chunk_index_end = xorb_entry_count + 1;
client.write_shard_data_and_register(&in_memory).await.unwrap();
assert!(client.verify_integrity().await.is_err());
}
#[tokio::test]
async fn test_delete_file_entry_does_not_rewrite_shards() {
let client = LocalClient::temporary().await.unwrap();
client.upload_random_file(&[(1, (0, 3))], 2048).await.unwrap();
let shard_hashes_before: Vec<_> = client.shard_file_paths().unwrap().into_iter().map(|(h, _)| h).collect();
assert!(!shard_hashes_before.is_empty());
client
.delete_file_entry(&client.list_file_shard_entries().await.unwrap()[0].0)
.await
.unwrap();
let shard_hashes_after: Vec<_> = client.shard_file_paths().unwrap().into_iter().map(|(h, _)| h).collect();
assert_eq!(shard_hashes_before, shard_hashes_after, "Shard file hashes must not change after soft-delete");
}
#[tokio::test]
async fn test_deletion_status_persists_across_restart() {
let tmp_dir = TempDir::new().unwrap();
let path = tmp_dir.path().to_owned();
let file_hash;
{
let client = LocalClient::new(&path).await.unwrap();
let file = client.upload_random_file(&[(1, (0, 3)), (2, (0, 2))], 2048).await.unwrap();
file_hash = file.file_hash;
assert!(!client.list_file_shard_entries().await.unwrap().is_empty());
client.delete_file_entry(&file_hash).await.unwrap();
assert!(client.list_file_shard_entries().await.unwrap().is_empty());
}
{
let client = LocalClient::new(&path).await.unwrap();
assert!(client.is_file_deleted(&file_hash), "Deletion status should persist across restart");
assert!(
client.list_file_shard_entries().await.unwrap().is_empty(),
"Deleted files should remain hidden after restart"
);
}
}
#[tokio::test]
async fn test_verify_integrity_cross_shard_dedup_ok() {
let client = LocalClient::temporary().await.unwrap();
client.upload_random_file(&[(1, (0, 3))], 2048).await.unwrap();
client.verify_integrity().await.unwrap();
let mut in_memory = client.load_all_shard_data().unwrap();
in_memory.xorb_content.clear();
client.write_shard_data_and_register(&in_memory).await.unwrap();
client
.verify_integrity()
.await
.expect("Integrity should pass: XORB files exist on disk even though no shard indexes them");
}
#[tokio::test]
async fn test_verify_integrity_skips_soft_deleted_files() {
let client = LocalClient::temporary().await.unwrap();
let deleted_file = client.upload_random_file(&[(1, (0, 3))], 2048).await.unwrap();
let live_file = client.upload_random_file(&[(2, (0, 2))], 2048).await.unwrap();
client.verify_integrity().await.unwrap();
client.delete_file_entry(&deleted_file.file_hash).await.unwrap();
for t in &deleted_file.terms {
client.delete_xorb(&t.xorb_hash).await;
}
let mut in_memory = client.load_all_shard_data().unwrap();
for t in &deleted_file.terms {
in_memory.xorb_content.remove(&t.xorb_hash);
}
client.write_shard_data_and_register(&in_memory).await.unwrap();
client
.verify_integrity()
.await
.expect("Integrity should pass: missing XORBs are only referenced by a soft-deleted file");
let live_data = client.get_file_data(&live_file.file_hash, None).await.unwrap();
assert_eq!(live_data, live_file.data);
}
}