Skip to main content

xet_data/processing/
file_cleaner.rs

1use std::future::{self, Future};
2use std::pin::Pin;
3use std::sync::Arc;
4
5use bytes::Bytes;
6use chrono::{DateTime, Utc};
7use tracing::{Instrument, debug_span, info, instrument};
8use xet_core_structures::metadata_shard::Sha256;
9use xet_core_structures::metadata_shard::file_structs::FileMetadataExt;
10use xet_runtime::core::{XetRuntime, xet_config};
11
12use super::XetFileInfo;
13use super::deduplication_interface::UploadSessionDataManager;
14use super::file_upload_session::FileUploadSession;
15use super::sha256::Sha256Generator;
16use crate::deduplication::{Chunk, Chunker, DeduplicationMetrics, FileDeduper};
17use crate::error::Result;
18use crate::progress_tracking::upload_tracking::CompletionTrackerFileId;
19
20/// Controls how SHA-256 is handled during file cleaning.
21#[derive(Clone, Copy)]
22pub enum Sha256Policy {
23    /// Compute SHA-256 from the file data.
24    Compute,
25    /// Use a pre-computed SHA-256 value.
26    Provided(Sha256),
27    /// Skip SHA-256 entirely; no metadata_ext is written to the shard.
28    Skip,
29}
30
31impl Sha256Policy {
32    /// Returns `Skip` when `true`, `Compute` when `false`.
33    pub fn from_skip(skip: bool) -> Self {
34        if skip { Self::Skip } else { Self::Compute }
35    }
36
37    /// Parses a hex-encoded SHA-256 string into a policy.
38    ///
39    /// Returns `Provided(hash)` if the hex is valid, `Compute` otherwise.
40    pub fn from_hex(hex: &str) -> Self {
41        Sha256::from_hex(hex).ok().into()
42    }
43}
44
45impl From<Option<Sha256>> for Sha256Policy {
46    fn from(sha256: Option<Sha256>) -> Self {
47        match sha256 {
48            Some(hash) => Self::Provided(hash),
49            None => Self::Compute,
50        }
51    }
52}
53
54/// A class that encapsulates the clean and data task around a single file.
55pub struct SingleFileCleaner {
56    // File name, if known.
57    file_name: Option<Arc<str>>,
58
59    // Completion id
60    file_id: CompletionTrackerFileId,
61
62    // Common state.
63    session: Arc<FileUploadSession>,
64
65    // The chunker.
66    chunker: Chunker,
67
68    // The deduplication interface.  Use a future that always returns the dedup manager
69    // on await so that we can background this part.
70    dedup_manager_fut: Pin<Box<dyn Future<Output = Result<FileDeduper<UploadSessionDataManager>>> + Send + 'static>>,
71
72    // SHA-256 generator, present only when computing from file data.
73    sha_generator: Option<Sha256Generator>,
74
75    // Pre-computed or finalized SHA-256 value.
76    provided_sha256: Option<Sha256>,
77
78    // Start time
79    start_time: DateTime<Utc>,
80}
81
82impl SingleFileCleaner {
83    pub(crate) fn new(
84        file_name: Option<Arc<str>>,
85        file_id: CompletionTrackerFileId,
86        sha256: Sha256Policy,
87        session: Arc<FileUploadSession>,
88    ) -> Self {
89        let deduper = FileDeduper::new(UploadSessionDataManager::new(session.clone()), file_id);
90
91        let (sha_generator, provided_sha256) = match sha256 {
92            Sha256Policy::Compute => (Some(Sha256Generator::default()), None),
93            Sha256Policy::Provided(hash) => (None, Some(hash)),
94            Sha256Policy::Skip => (None, None),
95        };
96
97        Self {
98            file_name,
99            file_id,
100            dedup_manager_fut: Box::pin(async move { Ok(deduper) }),
101            session,
102            chunker: crate::deduplication::Chunker::default(),
103            sha_generator,
104            provided_sha256,
105            start_time: Utc::now(),
106        }
107    }
108
109    /// Gets the dedupe manager to process new chunks, by first
110    /// waiting for background operations to complete, then triggering a
111    /// new background task.
112    async fn deduper_process_chunks(&mut self, chunks: Arc<[Chunk]>) -> Result<()> {
113        // Handle the move out by replacing it with a dummy future discarded below.
114        let mut deduper = std::mem::replace(&mut self.dedup_manager_fut, Box::pin(future::pending())).await?;
115
116        let num_chunks = chunks.len();
117
118        let dedup_background = tokio::spawn(
119            async move {
120                deduper.process_chunks(&chunks).await?;
121                Ok(deduper)
122            }
123            .instrument(debug_span!("deduper::process_chunks_task", num_chunks).or_current()),
124        );
125
126        self.dedup_manager_fut = Box::pin(async move { dedup_background.await? });
127
128        Ok(())
129    }
130
131    pub async fn add_data(&mut self, data: &[u8]) -> Result<()> {
132        self.add_data_from_bytes(Bytes::copy_from_slice(data)).await
133    }
134
135    pub async fn add_data_from_bytes(&mut self, data: Bytes) -> Result<()> {
136        let block_size = *xet_config().data.ingestion_block_size as usize;
137        if data.len() > block_size {
138            let mut pos = 0;
139            while pos < data.len() {
140                let next_pos = usize::min(pos + block_size, data.len());
141                self.add_data_chunk_impl(data.slice(pos..next_pos)).await?;
142                pos = next_pos;
143            }
144        } else {
145            self.add_data_chunk_impl(data).await?;
146        }
147
148        Ok(())
149    }
150
151    #[instrument(skip_all, level="debug", name = "FileCleaner::add_data", fields(file_name=self.file_name.as_ref().map(|s|s.to_string()), len=data.len()))]
152    async fn add_data_chunk_impl(&mut self, data: Bytes) -> Result<()> {
153        // If the file size was not specified at the beginning, then incrementally update tho total size with
154        // how much data we know about.
155        self.session
156            .completion_tracker
157            .increment_file_size(self.file_id, data.len() as u64);
158
159        // Put the chunking on a compute thread so it doesn't tie up the async schedulers
160        let chunk_data_jh = {
161            let mut chunker = std::mem::take(&mut self.chunker);
162            let data = data.clone();
163            let rt = XetRuntime::current();
164
165            rt.spawn_blocking(move || {
166                let chunks: Arc<[Chunk]> = Arc::from(chunker.next_block_bytes(&data, false));
167                (chunks, chunker)
168            })
169        };
170
171        // Update the sha256 hasher, which hands this off to be done in the background.
172        if let Some(ref mut generator) = self.sha_generator {
173            generator.update(data.clone()).await?;
174        }
175
176        // Get the chunk data and start processing it.
177        let (chunks, chunker) = chunk_data_jh.await?;
178
179        // Restore the chunker state.
180        self.chunker = chunker;
181
182        // It's possible this didn't actually add any data in.
183        if chunks.is_empty() {
184            return Ok(());
185        }
186
187        // Run the deduplication interface here.
188        self.deduper_process_chunks(chunks).await?;
189
190        Ok(())
191    }
192
193    /// Ensures all current background work is completed.
194    pub async fn checkpoint(&mut self) -> Result<()> {
195        // Flush the background process by sending it a dummy bit of data.
196        self.deduper_process_chunks(Arc::new([])).await
197    }
198
199    /// Return the representation of the file after clean as a pointer file instance.
200    #[instrument(skip_all, name = "FileCleaner::finish", fields(file_name=self.file_name.as_ref().map(|s|s.to_string())))]
201    pub async fn finish(mut self) -> Result<(XetFileInfo, DeduplicationMetrics)> {
202        // Chunk the rest of the data.
203        if let Some(chunk) = self.chunker.finish() {
204            let data = Arc::new([chunk]);
205            self.deduper_process_chunks(data).await?;
206        }
207
208        // Resolve the SHA-256: computed, provided, or skipped.
209        let sha256 = if let Some(generator) = self.sha_generator.take() {
210            Some(generator.finalize().await?)
211        } else {
212            self.provided_sha256
213        };
214        let metadata_ext = sha256.map(FileMetadataExt::new);
215
216        let (file_hash, remaining_file_data, deduplication_metrics) =
217            self.dedup_manager_fut.await?.finalize(metadata_ext);
218
219        let file_info = XetFileInfo {
220            hash: file_hash.hex(),
221            file_size: Some(deduplication_metrics.total_bytes),
222            sha256: sha256.map(|s| s.hex()),
223        };
224
225        // Let's check some things that should be invariants
226        #[cfg(debug_assertions)]
227        {
228            // There should be exactly one file referenced in the remaining file data.
229            debug_assert_eq!(remaining_file_data.pending_file_info.len(), 1);
230
231            // The size should be total bytes
232            debug_assert_eq!(remaining_file_data.pending_file_info[0].0.file_size(), deduplication_metrics.total_bytes)
233        }
234
235        // Now, return all this information to the
236        self.session
237            .register_single_file_clean_completion(remaining_file_data, &deduplication_metrics)
238            .await?;
239
240        // NB: xorb upload is happening in the background, this number is optimistic since it does
241        // not count transfer time of the uploaded xorbs, which is why `end_processing_ts`
242
243        info!(
244            target: "client_telemetry",
245            action = "clean",
246            file_name = self.file_name.unwrap_or_default().to_string(),
247            file_size_count = deduplication_metrics.total_bytes,
248            new_bytes_count = deduplication_metrics.new_bytes,
249            start_ts = self.start_time.to_rfc3339(),
250            end_processing_ts = Utc::now().to_rfc3339(),
251        );
252
253        Ok((file_info, deduplication_metrics))
254    }
255}