use std::future::{self, Future};
use std::pin::Pin;
use std::sync::Arc;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use tracing::{Instrument, debug_span, info, instrument};
use xet_core_structures::metadata_shard::Sha256;
use xet_core_structures::metadata_shard::file_structs::FileMetadataExt;
use xet_runtime::core::{XetRuntime, xet_config};
use super::XetFileInfo;
use super::deduplication_interface::UploadSessionDataManager;
use super::file_upload_session::FileUploadSession;
use super::sha256::Sha256Generator;
use crate::deduplication::{Chunk, Chunker, DeduplicationMetrics, FileDeduper};
use crate::error::Result;
use crate::progress_tracking::upload_tracking::CompletionTrackerFileId;
#[derive(Clone, Copy)]
pub enum Sha256Policy {
Compute,
Provided(Sha256),
Skip,
}
impl Sha256Policy {
pub fn from_skip(skip: bool) -> Self {
if skip { Self::Skip } else { Self::Compute }
}
pub fn from_hex(hex: &str) -> Self {
Sha256::from_hex(hex).ok().into()
}
}
impl From<Option<Sha256>> for Sha256Policy {
fn from(sha256: Option<Sha256>) -> Self {
match sha256 {
Some(hash) => Self::Provided(hash),
None => Self::Compute,
}
}
}
pub struct SingleFileCleaner {
file_name: Option<Arc<str>>,
file_id: CompletionTrackerFileId,
session: Arc<FileUploadSession>,
chunker: Chunker,
dedup_manager_fut: Pin<Box<dyn Future<Output = Result<FileDeduper<UploadSessionDataManager>>> + Send + 'static>>,
sha_generator: Option<Sha256Generator>,
provided_sha256: Option<Sha256>,
start_time: DateTime<Utc>,
}
impl SingleFileCleaner {
pub(crate) fn new(
file_name: Option<Arc<str>>,
file_id: CompletionTrackerFileId,
sha256: Sha256Policy,
session: Arc<FileUploadSession>,
) -> Self {
let deduper = FileDeduper::new(UploadSessionDataManager::new(session.clone()), file_id);
let (sha_generator, provided_sha256) = match sha256 {
Sha256Policy::Compute => (Some(Sha256Generator::default()), None),
Sha256Policy::Provided(hash) => (None, Some(hash)),
Sha256Policy::Skip => (None, None),
};
Self {
file_name,
file_id,
dedup_manager_fut: Box::pin(async move { Ok(deduper) }),
session,
chunker: crate::deduplication::Chunker::default(),
sha_generator,
provided_sha256,
start_time: Utc::now(),
}
}
async fn deduper_process_chunks(&mut self, chunks: Arc<[Chunk]>) -> Result<()> {
let mut deduper = std::mem::replace(&mut self.dedup_manager_fut, Box::pin(future::pending())).await?;
let num_chunks = chunks.len();
let dedup_background = tokio::spawn(
async move {
deduper.process_chunks(&chunks).await?;
Ok(deduper)
}
.instrument(debug_span!("deduper::process_chunks_task", num_chunks).or_current()),
);
self.dedup_manager_fut = Box::pin(async move { dedup_background.await? });
Ok(())
}
pub async fn add_data(&mut self, data: &[u8]) -> Result<()> {
self.add_data_from_bytes(Bytes::copy_from_slice(data)).await
}
pub async fn add_data_from_bytes(&mut self, data: Bytes) -> Result<()> {
let block_size = *xet_config().data.ingestion_block_size as usize;
if data.len() > block_size {
let mut pos = 0;
while pos < data.len() {
let next_pos = usize::min(pos + block_size, data.len());
self.add_data_chunk_impl(data.slice(pos..next_pos)).await?;
pos = next_pos;
}
} else {
self.add_data_chunk_impl(data).await?;
}
Ok(())
}
#[instrument(skip_all, level="debug", name = "FileCleaner::add_data", fields(file_name=self.file_name.as_ref().map(|s|s.to_string()), len=data.len()))]
async fn add_data_chunk_impl(&mut self, data: Bytes) -> Result<()> {
self.session
.completion_tracker
.increment_file_size(self.file_id, data.len() as u64);
let chunk_data_jh = {
let mut chunker = std::mem::take(&mut self.chunker);
let data = data.clone();
let rt = XetRuntime::current();
rt.spawn_blocking(move || {
let chunks: Arc<[Chunk]> = Arc::from(chunker.next_block_bytes(&data, false));
(chunks, chunker)
})
};
if let Some(ref mut generator) = self.sha_generator {
generator.update(data.clone()).await?;
}
let (chunks, chunker) = chunk_data_jh.await?;
self.chunker = chunker;
if chunks.is_empty() {
return Ok(());
}
self.deduper_process_chunks(chunks).await?;
Ok(())
}
pub async fn checkpoint(&mut self) -> Result<()> {
self.deduper_process_chunks(Arc::new([])).await
}
#[instrument(skip_all, name = "FileCleaner::finish", fields(file_name=self.file_name.as_ref().map(|s|s.to_string())))]
pub async fn finish(mut self) -> Result<(XetFileInfo, DeduplicationMetrics)> {
if let Some(chunk) = self.chunker.finish() {
let data = Arc::new([chunk]);
self.deduper_process_chunks(data).await?;
}
let sha256 = if let Some(generator) = self.sha_generator.take() {
Some(generator.finalize().await?)
} else {
self.provided_sha256
};
let metadata_ext = sha256.map(FileMetadataExt::new);
let (file_hash, remaining_file_data, deduplication_metrics) =
self.dedup_manager_fut.await?.finalize(metadata_ext);
let file_info = XetFileInfo {
hash: file_hash.hex(),
file_size: Some(deduplication_metrics.total_bytes),
sha256: sha256.map(|s| s.hex()),
};
#[cfg(debug_assertions)]
{
debug_assert_eq!(remaining_file_data.pending_file_info.len(), 1);
debug_assert_eq!(remaining_file_data.pending_file_info[0].0.file_size(), deduplication_metrics.total_bytes)
}
self.session
.register_single_file_clean_completion(remaining_file_data, &deduplication_metrics)
.await?;
info!(
target: "client_telemetry",
action = "clean",
file_name = self.file_name.unwrap_or_default().to_string(),
file_size_count = deduplication_metrics.total_bytes,
new_bytes_count = deduplication_metrics.new_bytes,
start_ts = self.start_time.to_rfc3339(),
end_processing_ts = Utc::now().to_rfc3339(),
);
Ok((file_info, deduplication_metrics))
}
}