Skip to main content

xet_data/processing/
file_upload_session.rs

1use std::borrow::Cow;
2use std::collections::HashMap;
3use std::fs::File;
4use std::io::Read;
5use std::mem::{swap, take};
6use std::path::{Path, PathBuf};
7use std::sync::Arc;
8use std::sync::atomic::{AtomicBool, Ordering};
9
10use bytes::Bytes;
11use more_asserts::*;
12use tokio::sync::Mutex;
13use tokio::task::{JoinHandle, JoinSet};
14use tracing::{Instrument, Span, info_span, instrument};
15use xet_client::cas_client::{Client, ProgressCallback};
16use xet_core_structures::metadata_shard::file_structs::MDBFileInfo;
17use xet_core_structures::xorb_object::SerializedXorbObject;
18use xet_runtime::core::{XetRuntime, xet_config};
19
20use super::XetFileInfo;
21use super::configurations::TranslatorConfig;
22use super::file_cleaner::{Sha256Policy, SingleFileCleaner};
23use super::remote_client_interface::create_remote_client;
24use super::shard_interface::SessionShardInterface;
25use crate::deduplication::constants::{
26    MAX_XORB_BYTES, MAX_XORB_CHUNKS, XORB_CUT_THRESHOLD_BYTES, XORB_CUT_THRESHOLD_CHUNKS,
27};
28use crate::deduplication::{DataAggregator, DeduplicationMetrics, RawXorbData};
29use crate::error::{DataError, Result};
30use crate::progress_tracking::upload_tracking::{CompletionTracker, FileXorbDependency};
31use crate::progress_tracking::{GroupProgress, GroupProgressReport, ItemProgressReport, UniqueID};
32
33/// Manages the translation of files between the
34/// MerkleDB / pointer file format and the materialized version.
35///
36/// This class handles the clean operations.  It's meant to be a single atomic session
37/// that succeeds or fails as a unit;  i.e. all files get uploaded on finalization, and all shards
38/// and xorbs needed to reconstruct those files are properly uploaded and registered.
39pub struct FileUploadSession {
40    pub(crate) client: Arc<dyn Client + Send + Sync>,
41    pub(crate) shard_interface: SessionShardInterface,
42
43    /// Tracking upload completion between xorbs and files.
44    pub(crate) completion_tracker: Arc<CompletionTracker>,
45
46    /// Aggregate progress across all files in this upload session.
47    progress: Arc<GroupProgress>,
48
49    /// Deduplicated data shared across files.
50    current_session_data: Mutex<DataAggregator>,
51
52    /// Metrics for deduplication
53    deduplication_metrics: Mutex<DeduplicationMetrics>,
54
55    /// Internal worker
56    xorb_upload_tasks: Mutex<JoinSet<Result<()>>>,
57
58    /// Set to true after finalize() has been called.
59    finalized: AtomicBool,
60}
61
62// Constructors
63impl FileUploadSession {
64    pub async fn new(config: Arc<TranslatorConfig>) -> Result<Arc<FileUploadSession>> {
65        FileUploadSession::new_impl(config, false).await
66    }
67
68    pub async fn dry_run(config: Arc<TranslatorConfig>) -> Result<Arc<FileUploadSession>> {
69        FileUploadSession::new_impl(config, true).await
70    }
71
72    async fn new_impl(config: Arc<TranslatorConfig>, dry_run: bool) -> Result<Arc<FileUploadSession>> {
73        let session_id = config
74            .session
75            .session_id
76            .as_ref()
77            .map(Cow::Borrowed)
78            .unwrap_or_else(|| Cow::Owned(UniqueID::new().to_string()));
79
80        let progress = GroupProgress::with_speed_config(
81            xet_config().data.progress_update_speed_sampling_window,
82            xet_config().data.progress_update_speed_min_observations,
83        );
84        let completion_tracker = Arc::new(CompletionTracker::new(progress.clone()));
85
86        let client = create_remote_client(&config, &session_id, dry_run).await?;
87
88        let shard_interface = SessionShardInterface::new(config.clone(), client.clone(), dry_run).await?;
89
90        Ok(Arc::new(Self {
91            shard_interface,
92            client,
93            completion_tracker,
94            progress,
95            current_session_data: Mutex::new(DataAggregator::default()),
96            deduplication_metrics: Mutex::new(DeduplicationMetrics::default()),
97            xorb_upload_tasks: Mutex::new(JoinSet::new()),
98            finalized: AtomicBool::new(false),
99        }))
100    }
101
102    pub async fn upload_files(
103        self: &Arc<Self>,
104        files_and_sha256: impl IntoIterator<Item = (impl AsRef<Path>, Sha256Policy)> + Send,
105    ) -> Result<Vec<XetFileInfo>> {
106        self.check_not_finalized()?;
107        let mut cleaning_tasks: Vec<JoinHandle<_>> = vec![];
108
109        for (f, sha256) in files_and_sha256.into_iter() {
110            let file_path = f.as_ref().to_owned();
111            let file_name: Arc<str> = Arc::from(file_path.to_string_lossy());
112
113            let file_size = std::fs::metadata(&file_path)?.len();
114
115            let updater = self.progress.new_item(UniqueID::new(), file_name.clone());
116            let file_id = self.completion_tracker.register_new_file(updater, Some(file_size));
117
118            let ingestion_concurrency_limiter = XetRuntime::current().common().file_ingestion_semaphore.clone();
119            let session = self.clone();
120
121            cleaning_tasks.push(tokio::spawn(async move {
122                // Enable tracing to record this file's ingestion speed.
123                let span = info_span!(
124                    "clean_file_task",
125                    "file.name" = file_name.to_string(),
126                    "file.len" = file_size,
127                    "file.new_bytes" = tracing::field::Empty,
128                    "file.deduped_bytes" = tracing::field::Empty,
129                    "file.defrag_prevented_dedup_bytes" = tracing::field::Empty,
130                    "file.new_chunks" = tracing::field::Empty,
131                    "file.deduped_chunks" = tracing::field::Empty,
132                    "file.defrag_prevented_dedup_chunks" = tracing::field::Empty,
133                );
134                // First, get a permit to process this file.
135                let _processing_permit = ingestion_concurrency_limiter.acquire().await?;
136
137                async move {
138                    let mut reader = File::open(&file_path)?;
139
140                    // Start the clean process for each file.
141                    let mut cleaner = SingleFileCleaner::new(Some(file_name), file_id, sha256, session);
142                    let mut bytes_read = 0;
143
144                    while bytes_read < file_size {
145                        // Allocate a block of bytes, read into it.
146                        let bytes_left = file_size - bytes_read;
147                        let n_bytes_read = (*xet_config().data.ingestion_block_size).min(bytes_left) as usize;
148
149                        // Read in the data here; we are assuming the file doesn't change size
150                        // on the disk while we are reading it.
151
152                        // We allocate the buffer anew on each loop as it's converted without copying
153                        // to a Bytes object, and thus we avoid further copies downstream.  We also
154                        // guarantee that the buffer is filled completely with the read_exact. Therefore,
155                        // we can use an unsafe trick here to allocate the vector without initializing it
156                        // to a specific value and avoid that clearing.
157                        let mut buffer = Vec::with_capacity(n_bytes_read);
158                        #[allow(clippy::uninit_vec)]
159                        unsafe {
160                            buffer.set_len(n_bytes_read);
161                        }
162
163                        // Read it in.
164                        reader.read_exact(&mut buffer)?;
165
166                        bytes_read += buffer.len() as u64;
167
168                        cleaner.add_data_from_bytes(Bytes::from(buffer)).await?;
169                    }
170
171                    // Finish and return the result.
172                    let (xfi, metrics) = cleaner.finish().await?;
173
174                    // Record dedup information.
175                    let span = Span::current();
176                    span.record("file.new_bytes", metrics.new_bytes);
177                    span.record("file.deduped_bytes", metrics.deduped_bytes);
178                    span.record("file.defrag_prevented_dedup_bytes", metrics.defrag_prevented_dedup_bytes);
179                    span.record("file.new_chunks", metrics.new_chunks);
180                    span.record("file.deduped_chunks", metrics.deduped_chunks);
181                    span.record("file.defrag_prevented_dedup_chunks", metrics.defrag_prevented_dedup_chunks);
182
183                    Result::Ok(xfi)
184                }
185                .instrument(span)
186                .await
187            }));
188        }
189
190        // Join all the cleaning tasks.
191        let mut ret = Vec::with_capacity(cleaning_tasks.len());
192
193        for task in cleaning_tasks {
194            ret.push(task.await??);
195        }
196
197        Ok(ret)
198    }
199
200    /// Start to clean one file. When cleaning multiple files, each file should
201    /// be associated with one Cleaner. This allows to launch multiple clean task
202    /// simultaneously.
203    ///
204    /// The caller is responsible for memory usage management, the parameter "buffer_size"
205    /// indicates the maximum number of Vec<u8> in the internal buffer.
206    ///
207    /// If a sha256 is provided via [`Sha256Policy::Provided`], the value will be directly
208    /// used in shard upload to avoid redundant computation. [`Sha256Policy::Skip`] skips
209    /// SHA-256 computation entirely and no metadata_ext is included in the shard.
210    pub fn start_clean(
211        self: &Arc<Self>,
212        tracking_name: Option<Arc<str>>,
213        size: Option<u64>,
214        sha256: Sha256Policy,
215    ) -> Result<(UniqueID, SingleFileCleaner)> {
216        self.check_not_finalized()?;
217        let id = UniqueID::new();
218        let cleaner = self.start_clean_with_id(id, tracking_name, size, sha256);
219        Ok((id, cleaner))
220    }
221
222    fn start_clean_with_id(
223        self: &Arc<Self>,
224        id: UniqueID,
225        tracking_name: Option<Arc<str>>,
226        size: Option<u64>,
227        sha256: Sha256Policy,
228    ) -> SingleFileCleaner {
229        let updater = self.progress.new_item(id, tracking_name.clone().unwrap_or_default());
230        let file_id = self.completion_tracker.register_new_file(updater, size);
231        SingleFileCleaner::new(tracking_name, file_id, sha256, self.clone())
232    }
233
234    /// Spawns a task that reads `file_path` and uploads it.
235    ///
236    /// Returns the tracking ID and a join handle for the spawned task.
237    pub async fn spawn_upload_from_path(
238        self: &Arc<Self>,
239        file_path: PathBuf,
240        sha256: Sha256Policy,
241    ) -> Result<(UniqueID, JoinHandle<Result<(XetFileInfo, DeduplicationMetrics)>>)> {
242        self.check_not_finalized()?;
243        let file_size = std::fs::metadata(&file_path)?.len();
244        let tracking_name: Arc<str> = Arc::from(file_path.to_string_lossy().as_ref());
245        let (id, cleaner) = self.start_clean(Some(tracking_name), Some(file_size), sha256)?;
246
247        let rt = XetRuntime::current();
248        let semaphore = rt.common().file_ingestion_semaphore.clone();
249        let handle = rt.spawn(async move {
250            let _permit = semaphore.acquire().await?;
251            Self::feed_file_to_cleaner(cleaner, &file_path).await
252        });
253
254        Ok((id, handle))
255    }
256
257    /// Spawns a task that uploads `bytes` as a single file.
258    ///
259    /// Returns the tracking ID and a join handle for the spawned task.
260    pub async fn spawn_upload_bytes(
261        self: &Arc<Self>,
262        bytes: Vec<u8>,
263        sha256: Sha256Policy,
264        tracking_name: Option<Arc<str>>,
265    ) -> Result<(UniqueID, JoinHandle<Result<(XetFileInfo, DeduplicationMetrics)>>)> {
266        self.check_not_finalized()?;
267        let (id, mut cleaner) = self.start_clean(tracking_name, Some(bytes.len() as u64), sha256)?;
268
269        let rt = XetRuntime::current();
270        let semaphore = rt.common().file_ingestion_semaphore.clone();
271        let handle = rt.spawn(async move {
272            let _permit = semaphore.acquire().await?;
273            cleaner.add_data(&bytes).await?;
274            cleaner.finish().await
275        });
276
277        Ok((id, handle))
278    }
279
280    async fn feed_file_to_cleaner(
281        mut cleaner: SingleFileCleaner,
282        file_path: &Path,
283    ) -> Result<(XetFileInfo, DeduplicationMetrics)> {
284        let mut reader = File::open(file_path)?;
285        let filesize = reader.metadata()?.len();
286        let mut buffer = vec![0u8; u64::min(filesize, *xet_config().data.ingestion_block_size) as usize];
287
288        loop {
289            let n = reader.read(&mut buffer)?;
290            if n == 0 {
291                break;
292            }
293            cleaner.add_data(&buffer[..n]).await?;
294        }
295        cleaner.finish().await
296    }
297
298    /// Registers a new xorb for upload, returning true if the xorb was added to the upload queue and false
299    /// if it was already in the queue and didn't need to be uploaded again.
300    #[instrument(skip_all, name="FileUploadSession::register_new_xorb_for_upload", fields(xorb_len = xorb.num_bytes()))]
301    pub(crate) async fn register_new_xorb(
302        self: &Arc<Self>,
303        xorb: RawXorbData,
304        file_dependencies: &[FileXorbDependency],
305    ) -> Result<bool> {
306        // First check the current xorb upload tasks to see if any can be cleaned up.
307        {
308            let mut upload_tasks = self.xorb_upload_tasks.lock().await;
309            while let Some(result) = upload_tasks.try_join_next() {
310                result??;
311            }
312        }
313
314        let xorb_hash = xorb.hash();
315
316        // Register that this xorb is part of this session and set up completion tracking.
317        //
318        // In some circumstances, we can cut to instances of the same xorb, namely when there are two files
319        // with the same starting data that get processed simultaneously.  When this happens, we only upload
320        // the first one, returning early here.
321        let xorb_is_new = self.completion_tracker.register_new_xorb(xorb_hash, xorb.num_bytes() as u64);
322
323        // Make sure we add in all the dependencies.  This should happen after the xorb is registered but before
324        // we start the upload.
325        self.completion_tracker.register_dependencies(file_dependencies);
326
327        if !xorb_is_new {
328            return Ok(false);
329        }
330
331        // No need to process an empty xorb.  But check this after the session_xorbs tracker
332        // to make sure the reporting is correct.
333        if xorb.num_bytes() == 0 {
334            self.completion_tracker.register_xorb_upload_completion(xorb_hash);
335            return Ok(true);
336        }
337
338        // This xorb is in the session upload queue, so other threads can go ahead and dedup against it.
339        // No session shard data gets uploaded until all the xorbs have been successfully uploaded, so
340        // this is safe.
341        let xorb_info = Arc::new(xorb.xorb_info.clone());
342        self.shard_interface.add_xorb_block(xorb_info.clone()).await?;
343
344        // Serialize the object; this can be relatively expensive, so run it on a compute thread.
345        // XORBs are sent without footer - the server/client reconstructs it from chunk data.
346        let xorb_obj = XetRuntime::current()
347            .spawn_blocking(move || SerializedXorbObject::from_xorb(xorb, false))
348            .await??;
349
350        let session = self.clone();
351        let upload_permit = self.client.acquire_upload_permit().await?;
352        let cas_prefix = xet_config().data.default_prefix.clone();
353        let completion_tracker = self.completion_tracker.clone();
354        let xorb_hash = xorb_obj.hash;
355        let raw_num_bytes = xorb_obj.raw_num_bytes;
356        let progress_callback: ProgressCallback = Arc::new(move |delta, _completed, total| {
357            let raw_delta = (delta * raw_num_bytes).checked_div(total).unwrap_or(0);
358            if raw_delta > 0 {
359                completion_tracker
360                    .clone()
361                    .register_xorb_upload_progress_background(xorb_hash, raw_delta);
362            }
363        });
364
365        self.xorb_upload_tasks.lock().await.spawn(
366            async move {
367                let n_bytes_transmitted = session
368                    .client
369                    .upload_xorb(&cas_prefix, xorb_obj, Some(progress_callback), upload_permit)
370                    .await?;
371
372                // Register that the xorb has been uploaded.
373                session.completion_tracker.register_xorb_upload_completion(xorb_hash);
374
375                // Record the number of bytes uploaded.
376                session.deduplication_metrics.lock().await.xorb_bytes_uploaded += n_bytes_transmitted;
377
378                // Add this as a completed cas block so that future sessions can resume quickly.
379                session.shard_interface.add_uploaded_xorb_block(xorb_info).await?;
380
381                Ok(())
382            }
383            .instrument(info_span!("FileUploadSession::upload_xorb_task", xorb.hash = xorb_hash.hex())),
384        );
385
386        Ok(true)
387    }
388
389    /// Meant to be called by the finalize() method of the SingleFileCleaner
390    #[instrument(skip_all, name="FileUploadSession::register_single_file_clean_completion", fields(num_bytes = file_data.num_bytes(), num_chunks = file_data.num_chunks()))]
391    pub(crate) async fn register_single_file_clean_completion(
392        self: &Arc<Self>,
393        mut file_data: DataAggregator,
394        dedup_metrics: &DeduplicationMetrics,
395    ) -> Result<()> {
396        // Merge in the remaining file data; uploading a new xorb if need be.
397        {
398            let mut current_session_data = self.current_session_data.lock().await;
399
400            // Do we need to cut one of these to a xorb?
401            if current_session_data.num_bytes() + file_data.num_bytes() > *XORB_CUT_THRESHOLD_BYTES
402                || current_session_data.num_chunks() + file_data.num_chunks() > *XORB_CUT_THRESHOLD_CHUNKS
403            {
404                // Cut the larger one as a xorb, uploading it and registering the files.
405                if current_session_data.num_bytes() > file_data.num_bytes() {
406                    swap(&mut *current_session_data, &mut file_data);
407                }
408
409                // Now file data is larger
410                debug_assert_le!(current_session_data.num_bytes(), file_data.num_bytes());
411
412                // Actually upload this outside the lock
413                drop(current_session_data);
414
415                self.process_aggregated_data_as_xorb(file_data).await?;
416            } else {
417                current_session_data.merge_in(file_data);
418            }
419        }
420
421        #[cfg(debug_assertions)]
422        {
423            let current_session_data = self.current_session_data.lock().await;
424            debug_assert_le!(current_session_data.num_bytes(), *MAX_XORB_BYTES);
425            debug_assert_le!(current_session_data.num_chunks(), *MAX_XORB_CHUNKS);
426        }
427
428        // Now, aggregate the new dedup metrics.
429        self.deduplication_metrics.lock().await.merge_in(dedup_metrics);
430
431        Ok(())
432    }
433
434    /// Process the aggregated data, uploading the data as a xorb and registering the files
435    async fn process_aggregated_data_as_xorb(self: &Arc<Self>, data_agg: DataAggregator) -> Result<()> {
436        let (xorb, new_files) = data_agg.finalize();
437        let xorb_hash = xorb.hash();
438
439        debug_assert_le!(xorb.num_bytes(), *MAX_XORB_BYTES);
440        debug_assert_le!(xorb.data.len(), *MAX_XORB_CHUNKS);
441
442        // Now, we need to scan all the file dependencies for dependencies on this xorb, as
443        // these would not have been registered yet as we just got the xorb hash.
444        let mut new_dependencies = Vec::with_capacity(new_files.len());
445
446        {
447            for (file_id, fi, bytes_in_xorb) in new_files {
448                new_dependencies.push(FileXorbDependency {
449                    file_id,
450                    xorb_hash,
451                    n_bytes: bytes_in_xorb,
452                    is_external: false,
453                });
454
455                // Record the reconstruction.
456                self.shard_interface.add_file_reconstruction_info(fi).await?;
457            }
458        }
459
460        // Register the xorb and start the upload process.
461        self.register_new_xorb(xorb, &new_dependencies).await?;
462
463        Ok(())
464    }
465
466    /// Register a xorb dependencies that is given as part of the dedup process.
467    pub(crate) fn register_xorb_dependencies(self: &Arc<Self>, xorb_dependencies: &[FileXorbDependency]) {
468        self.completion_tracker.register_dependencies(xorb_dependencies);
469    }
470
471    /// Finalize everything.
472    #[instrument(skip_all, name="FileUploadSession::finalize", fields(session.id))]
473    async fn finalize_impl(
474        self: Arc<Self>,
475        return_files: bool,
476    ) -> Result<(DeduplicationMetrics, Vec<MDBFileInfo>, GroupProgressReport)> {
477        if self.finalized.swap(true, Ordering::AcqRel) {
478            return Err(DataError::InvalidOperation("FileUploadSession already finalized".to_string()));
479        }
480
481        // Register the remaining xorbs for upload.
482        let data_agg = take(&mut *self.current_session_data.lock().await);
483        self.process_aggregated_data_as_xorb(data_agg).await?;
484
485        // Now, make sure all the remaining xorbs are uploaded.
486        let mut metrics = take(&mut *self.deduplication_metrics.lock().await);
487
488        // Finalize the xorb uploads.
489        let mut upload_tasks = take(&mut *self.xorb_upload_tasks.lock().await);
490
491        while let Some(result) = upload_tasks.join_next().await {
492            result??;
493        }
494
495        let all_file_info = if return_files {
496            self.shard_interface.session_file_info_list().await?
497        } else {
498            Vec::new()
499        };
500
501        // Upload and register the current shards in the session, moving them
502        // to the cache.
503        metrics.shard_bytes_uploaded = self.shard_interface.upload_and_register_session_shards().await?;
504        metrics.total_bytes_uploaded = metrics.shard_bytes_uploaded + metrics.xorb_bytes_uploaded;
505
506        #[cfg(debug_assertions)]
507        {
508            self.completion_tracker.assert_complete();
509            self.progress.assert_complete();
510        }
511
512        let report = self.report();
513        Ok((metrics, all_file_info, report))
514    }
515
516    // Wait until everything currently in process is completed and uploaded, cutting a xorb for the remaining bit.
517    // However, does not clean up the session so add_data can be called again.  Finalize must be called later.
518    //
519    // Used for testing.  Should be called only after all add_data calls have completed.
520    pub async fn checkpoint(self: &Arc<Self>) -> Result<()> {
521        // Cut the current data present as a xorb, upload it.
522        let data_agg = take(&mut *self.current_session_data.lock().await);
523        self.process_aggregated_data_as_xorb(data_agg).await?;
524
525        // Wait for all inflight xorb uploads to complete.
526        {
527            let mut upload_tasks = self.xorb_upload_tasks.lock().await;
528
529            while let Some(result) = upload_tasks.join_next().await {
530                result??;
531            }
532        }
533
534        Ok(())
535    }
536
537    fn check_not_finalized(&self) -> Result<()> {
538        if self.finalized.load(Ordering::Acquire) {
539            return Err(DataError::InvalidOperation("FileUploadSession already finalized".to_string()));
540        }
541        Ok(())
542    }
543
544    pub fn progress(&self) -> &Arc<GroupProgress> {
545        &self.progress
546    }
547
548    pub fn report(&self) -> GroupProgressReport {
549        self.progress.report()
550    }
551
552    pub fn item_report(&self, id: UniqueID) -> Option<ItemProgressReport> {
553        self.progress.item_report(id)
554    }
555
556    pub fn item_reports(&self) -> HashMap<UniqueID, ItemProgressReport> {
557        self.progress.item_reports()
558    }
559
560    pub async fn finalize(self: Arc<Self>) -> Result<DeduplicationMetrics> {
561        Ok(self.finalize_impl(false).await?.0)
562    }
563
564    pub async fn finalize_with_report(self: Arc<Self>) -> Result<(DeduplicationMetrics, GroupProgressReport)> {
565        let (metrics, _file_info, report) = self.finalize_impl(false).await?;
566        Ok((metrics, report))
567    }
568
569    pub async fn finalize_with_file_info(self: Arc<Self>) -> Result<(DeduplicationMetrics, Vec<MDBFileInfo>)> {
570        let (metrics, file_info, _report) = self.finalize_impl(true).await?;
571        Ok((metrics, file_info))
572    }
573}
574
575#[cfg(test)]
576mod tests {
577    use std::fs::{File, OpenOptions};
578    use std::io::{Read, Write};
579    use std::path::Path;
580    use std::sync::{Arc, OnceLock};
581
582    use xet_runtime::core::XetRuntime;
583
584    use crate::processing::{FileDownloadSession, FileUploadSession, XetFileInfo};
585
586    /// Return a shared threadpool to be reused as needed.
587    fn get_threadpool() -> Arc<XetRuntime> {
588        static THREADPOOL: OnceLock<Arc<XetRuntime>> = OnceLock::new();
589        THREADPOOL
590            .get_or_init(|| XetRuntime::new().expect("Error starting multithreaded runtime."))
591            .clone()
592    }
593
594    /// Cleans (converts) a regular file into a pointer file.
595    ///
596    /// * `input_path`: path to the original file
597    /// * `output_path`: path to write the pointer file
598    async fn test_clean_file(cas_path: &Path, input_path: &Path, output_path: &Path) {
599        let read_data = read(input_path).unwrap().to_vec();
600
601        let mut pf_out = Box::new(
602            OpenOptions::new()
603                .create(true)
604                .write(true)
605                .truncate(true)
606                .open(output_path)
607                .unwrap(),
608        );
609
610        let upload_session = FileUploadSession::new(TranslatorConfig::local_config(cas_path).unwrap().into())
611            .await
612            .unwrap();
613
614        let (_id, mut cleaner) = upload_session
615            .start_clean(Some("test".into()), Some(read_data.len() as u64), Sha256Policy::Compute)
616            .unwrap();
617
618        // Read blocks from the source file and hand them to the cleaning handle
619        cleaner.add_data(&read_data[..]).await.unwrap();
620
621        let (xet_file_info, _metrics) = cleaner.finish().await.unwrap();
622        upload_session.finalize().await.unwrap();
623
624        pf_out
625            .write_all(serde_json::to_string(&xet_file_info).unwrap().as_bytes())
626            .unwrap();
627    }
628
629    /// Smudges (hydrates) a pointer file back into the original data.
630    ///
631    /// * `pointer_path`: path to the pointer file
632    /// * `output_path`: path to write the hydrated/original file
633    async fn test_smudge_file(cas_path: &Path, pointer_path: &Path, output_path: &Path) {
634        let mut reader = File::open(pointer_path).unwrap();
635
636        let mut input = String::new();
637        reader.read_to_string(&mut input).unwrap();
638
639        let xet_file = serde_json::from_str::<XetFileInfo>(&input).unwrap();
640
641        let config = TranslatorConfig::local_config(cas_path).unwrap();
642        let session = FileDownloadSession::new(config.into(), None).await.unwrap();
643
644        let (_id, _n_bytes) = session.download_file(&xet_file, output_path).await.unwrap();
645    }
646
647    use std::fs::{read, write};
648
649    use tempfile::tempdir;
650
651    use super::*;
652
653    #[test]
654    fn test_clean_smudge_round_trip() {
655        let temp = tempdir().unwrap();
656        let original_data = b"Hello, world!";
657
658        let runtime = get_threadpool();
659
660        runtime
661            .clone()
662            .bridge_sync(async move {
663                let cas_path = temp.path().join("cas");
664
665                // 1. Write an original file in the temp directory
666                let original_path = temp.path().join("original.txt");
667                write(&original_path, original_data).unwrap();
668
669                // 2. Clean it (convert it to a pointer file)
670                let pointer_path = temp.path().join("pointer.txt");
671                test_clean_file(&cas_path, &original_path, &pointer_path).await;
672
673                // 3. Smudge it (hydrate the pointer file) to a new file
674                let hydrated_path = temp.path().join("hydrated.txt");
675                test_smudge_file(&cas_path, &pointer_path, &hydrated_path).await;
676
677                // 4. Verify that the round-tripped file matches the original
678                let result_data = read(hydrated_path).unwrap();
679                assert_eq!(original_data.to_vec(), result_data);
680            })
681            .unwrap();
682    }
683
684    #[test]
685    fn test_clean_skip_sha256_no_metadata_ext() {
686        let temp = tempdir().unwrap();
687        let data = b"Hello, skip sha256!";
688
689        let runtime = get_threadpool();
690
691        runtime
692            .clone()
693            .bridge_sync(async move {
694                let cas_path = temp.path().join("cas");
695
696                let upload_session = FileUploadSession::new(TranslatorConfig::local_config(&cas_path).unwrap().into())
697                    .await
698                    .unwrap();
699
700                let (_id, mut cleaner) = upload_session
701                    .start_clean(Some("test".into()), Some(data.len() as u64), Sha256Policy::Skip)
702                    .unwrap();
703                cleaner.add_data(data).await.unwrap();
704                cleaner.finish().await.unwrap();
705
706                // Verify that the shard has no metadata_ext (no SHA-256).
707                let (_metrics, file_infos) = upload_session.finalize_with_file_info().await.unwrap();
708                assert_eq!(file_infos.len(), 1);
709                assert!(file_infos[0].metadata_ext.is_none(), "Skip should produce no metadata_ext");
710            })
711            .unwrap();
712    }
713}