xet_data/processing/
file_cleaner.rs1use std::future::{self, Future};
2use std::pin::Pin;
3use std::sync::Arc;
4
5use bytes::Bytes;
6use chrono::{DateTime, Utc};
7use tracing::{Instrument, debug_span, info, instrument};
8use xet_core_structures::metadata_shard::Sha256;
9use xet_core_structures::metadata_shard::file_structs::FileMetadataExt;
10use xet_runtime::core::{XetRuntime, xet_config};
11
12use super::XetFileInfo;
13use super::deduplication_interface::UploadSessionDataManager;
14use super::file_upload_session::FileUploadSession;
15use super::sha256::Sha256Generator;
16use crate::deduplication::{Chunk, Chunker, DeduplicationMetrics, FileDeduper};
17use crate::error::Result;
18use crate::progress_tracking::upload_tracking::CompletionTrackerFileId;
19
20#[derive(Clone, Copy)]
22pub enum Sha256Policy {
23 Compute,
25 Provided(Sha256),
27 Skip,
29}
30
31impl Sha256Policy {
32 pub fn from_skip(skip: bool) -> Self {
34 if skip { Self::Skip } else { Self::Compute }
35 }
36
37 pub fn from_hex(hex: &str) -> Self {
41 Sha256::from_hex(hex).ok().into()
42 }
43}
44
45impl From<Option<Sha256>> for Sha256Policy {
46 fn from(sha256: Option<Sha256>) -> Self {
47 match sha256 {
48 Some(hash) => Self::Provided(hash),
49 None => Self::Compute,
50 }
51 }
52}
53
54pub struct SingleFileCleaner {
56 file_name: Option<Arc<str>>,
58
59 file_id: CompletionTrackerFileId,
61
62 session: Arc<FileUploadSession>,
64
65 chunker: Chunker,
67
68 dedup_manager_fut: Pin<Box<dyn Future<Output = Result<FileDeduper<UploadSessionDataManager>>> + Send + 'static>>,
71
72 sha_generator: Option<Sha256Generator>,
74
75 provided_sha256: Option<Sha256>,
77
78 start_time: DateTime<Utc>,
80}
81
82impl SingleFileCleaner {
83 pub(crate) fn new(
84 file_name: Option<Arc<str>>,
85 file_id: CompletionTrackerFileId,
86 sha256: Sha256Policy,
87 session: Arc<FileUploadSession>,
88 ) -> Self {
89 let deduper = FileDeduper::new(UploadSessionDataManager::new(session.clone()), file_id);
90
91 let (sha_generator, provided_sha256) = match sha256 {
92 Sha256Policy::Compute => (Some(Sha256Generator::default()), None),
93 Sha256Policy::Provided(hash) => (None, Some(hash)),
94 Sha256Policy::Skip => (None, None),
95 };
96
97 Self {
98 file_name,
99 file_id,
100 dedup_manager_fut: Box::pin(async move { Ok(deduper) }),
101 session,
102 chunker: crate::deduplication::Chunker::default(),
103 sha_generator,
104 provided_sha256,
105 start_time: Utc::now(),
106 }
107 }
108
109 async fn deduper_process_chunks(&mut self, chunks: Arc<[Chunk]>) -> Result<()> {
113 let mut deduper = std::mem::replace(&mut self.dedup_manager_fut, Box::pin(future::pending())).await?;
115
116 let num_chunks = chunks.len();
117
118 let dedup_background = tokio::spawn(
119 async move {
120 deduper.process_chunks(&chunks).await?;
121 Ok(deduper)
122 }
123 .instrument(debug_span!("deduper::process_chunks_task", num_chunks).or_current()),
124 );
125
126 self.dedup_manager_fut = Box::pin(async move { dedup_background.await? });
127
128 Ok(())
129 }
130
131 pub async fn add_data(&mut self, data: &[u8]) -> Result<()> {
132 self.add_data_from_bytes(Bytes::copy_from_slice(data)).await
133 }
134
135 pub async fn add_data_from_bytes(&mut self, data: Bytes) -> Result<()> {
136 let block_size = *xet_config().data.ingestion_block_size as usize;
137 if data.len() > block_size {
138 let mut pos = 0;
139 while pos < data.len() {
140 let next_pos = usize::min(pos + block_size, data.len());
141 self.add_data_chunk_impl(data.slice(pos..next_pos)).await?;
142 pos = next_pos;
143 }
144 } else {
145 self.add_data_chunk_impl(data).await?;
146 }
147
148 Ok(())
149 }
150
151 #[instrument(skip_all, level="debug", name = "FileCleaner::add_data", fields(file_name=self.file_name.as_ref().map(|s|s.to_string()), len=data.len()))]
152 async fn add_data_chunk_impl(&mut self, data: Bytes) -> Result<()> {
153 self.session
156 .completion_tracker
157 .increment_file_size(self.file_id, data.len() as u64);
158
159 let chunk_data_jh = {
161 let mut chunker = std::mem::take(&mut self.chunker);
162 let data = data.clone();
163 let rt = XetRuntime::current();
164
165 rt.spawn_blocking(move || {
166 let chunks: Arc<[Chunk]> = Arc::from(chunker.next_block_bytes(&data, false));
167 (chunks, chunker)
168 })
169 };
170
171 if let Some(ref mut generator) = self.sha_generator {
173 generator.update(data.clone()).await?;
174 }
175
176 let (chunks, chunker) = chunk_data_jh.await?;
178
179 self.chunker = chunker;
181
182 if chunks.is_empty() {
184 return Ok(());
185 }
186
187 self.deduper_process_chunks(chunks).await?;
189
190 Ok(())
191 }
192
193 pub async fn checkpoint(&mut self) -> Result<()> {
195 self.deduper_process_chunks(Arc::new([])).await
197 }
198
199 #[instrument(skip_all, name = "FileCleaner::finish", fields(file_name=self.file_name.as_ref().map(|s|s.to_string())))]
201 pub async fn finish(mut self) -> Result<(XetFileInfo, DeduplicationMetrics)> {
202 if let Some(chunk) = self.chunker.finish() {
204 let data = Arc::new([chunk]);
205 self.deduper_process_chunks(data).await?;
206 }
207
208 let sha256 = if let Some(generator) = self.sha_generator.take() {
210 Some(generator.finalize().await?)
211 } else {
212 self.provided_sha256
213 };
214 let metadata_ext = sha256.map(FileMetadataExt::new);
215
216 let (file_hash, remaining_file_data, deduplication_metrics) =
217 self.dedup_manager_fut.await?.finalize(metadata_ext);
218
219 let file_info = XetFileInfo {
220 hash: file_hash.hex(),
221 file_size: Some(deduplication_metrics.total_bytes),
222 sha256: sha256.map(|s| s.hex()),
223 };
224
225 #[cfg(debug_assertions)]
227 {
228 debug_assert_eq!(remaining_file_data.pending_file_info.len(), 1);
230
231 debug_assert_eq!(remaining_file_data.pending_file_info[0].0.file_size(), deduplication_metrics.total_bytes)
233 }
234
235 self.session
237 .register_single_file_clean_completion(remaining_file_data, &deduplication_metrics)
238 .await?;
239
240 info!(
244 target: "client_telemetry",
245 action = "clean",
246 file_name = self.file_name.unwrap_or_default().to_string(),
247 file_size_count = deduplication_metrics.total_bytes,
248 new_bytes_count = deduplication_metrics.new_bytes,
249 start_ts = self.start_time.to_rfc3339(),
250 end_processing_ts = Utc::now().to_rfc3339(),
251 );
252
253 Ok((file_info, deduplication_metrics))
254 }
255}