use std::sync::Arc;
use async_trait::async_trait;
use tokio::task::JoinSet;
use tracing::Instrument;
use xet_core_structures::merklehash::MerkleHash;
use xet_core_structures::metadata_shard::file_structs::FileDataSequenceEntry;
use super::file_upload_session::FileUploadSession;
use crate::deduplication::{DeduplicationDataInterface, RawXorbData};
use crate::error::{DataError, Result};
use crate::progress_tracking::upload_tracking::FileXorbDependency;
pub struct UploadSessionDataManager {
session: Arc<FileUploadSession>,
active_global_dedup_queries: JoinSet<Result<bool>>,
}
impl UploadSessionDataManager {
pub fn new(session: Arc<FileUploadSession>) -> Self {
Self {
session,
active_global_dedup_queries: Default::default(),
}
}
fn global_dedup_queries_enabled(&self) -> bool {
xet_runtime::core::xet_config().deduplication.global_dedup_query_enabled
}
}
#[async_trait]
impl DeduplicationDataInterface for UploadSessionDataManager {
type ErrorType = DataError;
async fn chunk_hash_dedup_query(
&self,
query_hashes: &[MerkleHash],
) -> Result<Option<(usize, FileDataSequenceEntry, bool)>> {
Ok(self.session.shard_interface.chunk_hash_dedup_query(query_hashes).await?)
}
async fn register_global_dedup_query(&mut self, chunk_hash: MerkleHash) -> Result<()> {
if !self.global_dedup_queries_enabled() {
return Ok(());
}
let session: Arc<FileUploadSession> = self.session.clone();
self.active_global_dedup_queries.spawn(
async move {
session.shard_interface.query_dedup_shard_by_chunk(&chunk_hash).await?;
Ok(true)
}
.instrument(tracing::info_span!("UploadSessionDataManager::dedup_task")),
);
Ok(())
}
async fn complete_global_dedup_queries(&mut self) -> Result<bool> {
if !self.global_dedup_queries_enabled() {
return Ok(false);
}
let mut any_result = false;
while let Some(result) = self.active_global_dedup_queries.join_next().await {
any_result |= result??;
}
Ok(any_result)
}
async fn register_new_xorb(&mut self, xorb: RawXorbData) -> Result<()> {
self.session.register_new_xorb(xorb, &[]).await?;
Ok(())
}
async fn register_xorb_dependencies(&mut self, dependencies: &[FileXorbDependency]) {
self.session.register_xorb_dependencies(dependencies);
}
}