use std::fs::File;
use std::io::Read;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::SystemTime;
use bytes::Bytes;
use tempfile::TempDir;
use tokio::sync::Mutex;
use tokio::task::JoinSet;
use tracing::{Instrument, debug, info, info_span};
use xet_client::cas_client::Client;
use xet_core_structures::merklehash::MerkleHash;
use xet_core_structures::metadata_shard::file_structs::{FileDataSequenceEntry, MDBFileInfo};
use xet_core_structures::metadata_shard::session_directory::{
ShardMergeResult, consolidate_shards_in_directory, merge_shards_background,
};
use xet_core_structures::metadata_shard::shard_in_memory::MDBInMemoryShard;
use xet_core_structures::metadata_shard::xorb_structs::MDBXorbInfo;
use xet_core_structures::metadata_shard::{
MDB_SHARD_LOCAL_CACHE_EXPIRATION, MDBShardFile, MDBShardFileHeader, ShardFileManager,
};
use xet_runtime::core::xet_config;
use xet_runtime::error_printer::ErrorPrinter;
use super::configurations::TranslatorConfig;
use crate::error::Result;
pub struct SessionShardInterface {
session_shard_manager: Arc<ShardFileManager>,
cache_shard_manager: Arc<ShardFileManager>,
client: Arc<dyn Client + Send + Sync>,
dry_run: bool,
xorb_metadata_staging_dir: PathBuf,
resumed_session_shard_manager: Option<Arc<ShardFileManager>>,
staged_shards_to_remove_on_success: Vec<PathBuf>,
xorb_metadata_staging: Mutex<(SystemTime, MDBInMemoryShard)>,
_shard_session_dir: TempDir,
}
impl SessionShardInterface {
pub async fn new(
config: Arc<TranslatorConfig>,
client: Arc<dyn Client + Send + Sync>,
dry_run: bool,
) -> Result<Self> {
std::fs::create_dir_all(&config.shard_session_directory)?;
let shard_session_tempdir = TempDir::new_in(&config.shard_session_directory)?;
let session_dir = shard_session_tempdir.path().to_owned();
let cache_dir = &config.shard_cache_directory;
std::fs::create_dir_all(cache_dir)?;
let xorb_metadata_staging_dir = config.shard_session_directory.join("xorb_metadata");
std::fs::create_dir_all(&xorb_metadata_staging_dir)?;
let shard_merge_jh = {
if !dry_run {
Some(merge_shards_background(
&xorb_metadata_staging_dir,
&session_dir,
xet_config().shard.max_target_size,
true,
))
} else {
None
}
};
let cache_shard_manager = ShardFileManager::new_in_cache_directory(cache_dir).await?;
let session_shard_manager = ShardFileManager::new_in_session_directory(&session_dir, false).await?;
let shard_merge_result = {
if let Some(jh) = shard_merge_jh {
jh.await??
} else {
ShardMergeResult::default()
}
};
let resumed_session_shard_manager = {
if !shard_merge_result.merged_shards.is_empty() {
let resumed_session_shard_manager =
ShardFileManager::new_in_session_directory(&session_dir, false).await?;
resumed_session_shard_manager
.register_shards(&shard_merge_result.merged_shards)
.await?;
Some(resumed_session_shard_manager)
} else {
None
}
};
let staged_shards_to_remove_on_success =
shard_merge_result.obsolete_shards.iter().map(|sfi| sfi.path.clone()).collect();
Ok(Self {
session_shard_manager,
cache_shard_manager,
xorb_metadata_staging_dir,
staged_shards_to_remove_on_success,
xorb_metadata_staging: Mutex::new((SystemTime::now(), MDBInMemoryShard::default())),
resumed_session_shard_manager,
dry_run,
_shard_session_dir: shard_session_tempdir,
client,
})
}
pub async fn query_dedup_shard_by_chunk(&self, chunk_hash: &MerkleHash) -> Result<bool> {
let Ok(Some(new_shard)) = self
.client
.query_for_global_dedup_shard(&xet_config().data.default_prefix, chunk_hash)
.await
.info_error("Error attempting to query global dedup lookup.")
else {
return Ok(false);
};
self.cache_shard_manager.import_shard_from_bytes(&new_shard).await?;
Ok(true)
}
pub async fn chunk_hash_dedup_query(
&self,
query_hashes: &[MerkleHash],
) -> Result<Option<(usize, FileDataSequenceEntry, bool)>> {
if let Some(resumed_session_sfm) = &self.resumed_session_shard_manager
&& let Some((n_entries, fse)) = resumed_session_sfm.chunk_hash_dedup_query(query_hashes).await?
{
return Ok(Some((n_entries, fse, true)));
}
let res = self.session_shard_manager.chunk_hash_dedup_query(query_hashes).await?;
if let Some((n_entries, fse)) = res {
return Ok(Some((n_entries, fse, false)));
}
if let Some((n_entries, fse)) = self.cache_shard_manager.chunk_hash_dedup_query(query_hashes).await? {
Ok(Some((n_entries, fse, true)))
} else {
Ok(None)
}
}
pub async fn add_xorb_block(&self, xorb_block_contents: Arc<MDBXorbInfo>) -> Result<()> {
self.session_shard_manager.add_xorb_block(xorb_block_contents).await?;
Ok(())
}
pub async fn add_uploaded_xorb_block(&self, xorb_block_contents: Arc<MDBXorbInfo>) -> Result<()> {
if self.dry_run {
return Ok(());
}
let mut lg = self.xorb_metadata_staging.lock().await;
let (last_flush, xorb_shard) = &mut *lg;
xorb_shard.add_xorb_block(xorb_block_contents)?;
let time_now = SystemTime::now();
let flush_interval = xet_config().data.session_xorb_metadata_flush_interval;
if *last_flush + flush_interval < time_now
|| xorb_shard.num_xorb_entries() >= xet_config().data.session_xorb_metadata_flush_max_count
{
xorb_shard.write_to_directory(&self.xorb_metadata_staging_dir, Some(*MDB_SHARD_LOCAL_CACHE_EXPIRATION))?;
*last_flush = time_now + flush_interval;
*xorb_shard = MDBInMemoryShard::default();
}
Ok(())
}
pub async fn add_file_reconstruction_info(&self, file_info: MDBFileInfo) -> Result<()> {
self.session_shard_manager.add_file_reconstruction_info(file_info).await?;
Ok(())
}
pub async fn session_file_info_list(&self) -> Result<Vec<MDBFileInfo>> {
Ok(self.session_shard_manager.all_file_info().await?)
}
pub async fn upload_and_register_session_shards(&self) -> Result<u64> {
self.session_shard_manager.flush().await?;
let shard_list = consolidate_shards_in_directory(
self.session_shard_manager.shard_directory(),
xet_config().shard.max_target_size,
false,
)?;
let mut shard_uploads = JoinSet::<Result<()>>::new();
let shard_bytes_uploaded = Arc::new(AtomicU64::new(0));
for si in shard_list {
let shard_client = self.client.clone();
let cache_shard_manager = self.cache_shard_manager.clone();
let shard_bytes_uploaded = shard_bytes_uploaded.clone();
let dry_run = self.dry_run;
let upload_permit = shard_client.acquire_upload_permit().await?;
shard_uploads.spawn(
async move {
debug!("Uploading shard {:?} from staging area to CAS.", &si.shard_hash);
let data: Bytes = read_shard_to_bytes_remove_footer(&si)?;
shard_bytes_uploaded.fetch_add(data.len() as u64, Ordering::Relaxed);
if dry_run {
return Ok(());
}
shard_client.upload_shard(data, upload_permit).await?;
info!("Shard {:?} upload + sync completed successfully.", &si.shard_hash);
let new_shard_path = si.export_with_expiration(
cache_shard_manager.shard_directory(),
*MDB_SHARD_LOCAL_CACHE_EXPIRATION,
)?;
cache_shard_manager.register_shards(&[new_shard_path]).await?;
Ok(())
}
.instrument(info_span!("shard_session::upload_shard_task")),
);
}
while let Some(jh) = shard_uploads.join_next().await {
jh??;
}
for obsolete_shard in self.staged_shards_to_remove_on_success.iter() {
let _ = std::fs::remove_file(obsolete_shard);
}
Ok(shard_bytes_uploaded.load(Ordering::Relaxed))
}
}
fn read_shard_to_bytes_remove_footer(si: &Arc<MDBShardFile>) -> Result<Bytes> {
let split_off_index = si.shard.metadata.file_lookup_offset as usize;
let mut file = File::open(&si.path)?;
let mut buf = vec![0u8; split_off_index];
file.read_exact(&mut buf)?;
let mut header = si.shard.header.clone();
header.footer_size = 0;
header.serialize(&mut (&mut buf[..size_of::<MDBShardFileHeader>()]))?;
#[cfg(debug_assertions)]
{
let new_header =
MDBShardFileHeader::deserialize(&mut std::io::Cursor::new(&buf[..size_of::<MDBShardFileHeader>()]))?;
debug_assert_eq!(new_header.footer_size, 0);
}
Ok(Bytes::from(buf))
}
#[cfg(test)]
mod tests {
use std::io::Cursor;
use super::*;
#[test]
fn test_read_shard_to_bytes_remove_footer() -> Result<()> {
let tmp_dir = TempDir::with_prefix("test_read_shard_to_bytes_remove_footer")?;
let tmp_dir_path = tmp_dir.path();
let mdb_in_mem = MDBInMemoryShard::default();
let temp_shard_file_path = mdb_in_mem.write_to_directory(tmp_dir_path, None)?;
let shard_file = MDBShardFile::load_from_file(&temp_shard_file_path)?;
assert_eq!(
shard_file.shard.header.footer_size,
size_of::<xet_core_structures::metadata_shard::MDBShardFileFooter>() as u64
);
let no_footer_shard_buf = read_shard_to_bytes_remove_footer(&shard_file)?;
let buf_shard_header =
MDBShardFileHeader::deserialize(&mut Cursor::new(&no_footer_shard_buf[..size_of::<MDBShardFileHeader>()]))?;
assert_eq!(buf_shard_header.footer_size, 0);
Ok(())
}
}