1use std::borrow::Cow;
2use std::collections::HashMap;
3use std::fs::File;
4use std::io::Read;
5use std::mem::{swap, take};
6use std::path::{Path, PathBuf};
7use std::sync::Arc;
8use std::sync::atomic::{AtomicBool, Ordering};
9
10use bytes::Bytes;
11use more_asserts::*;
12use tokio::sync::Mutex;
13use tokio::task::{JoinHandle, JoinSet};
14use tracing::{Instrument, Span, info_span, instrument};
15use xet_client::cas_client::{Client, ProgressCallback};
16use xet_core_structures::metadata_shard::file_structs::MDBFileInfo;
17use xet_core_structures::xorb_object::SerializedXorbObject;
18use xet_runtime::core::{XetRuntime, xet_config};
19
20use super::XetFileInfo;
21use super::configurations::TranslatorConfig;
22use super::file_cleaner::{Sha256Policy, SingleFileCleaner};
23use super::remote_client_interface::create_remote_client;
24use super::shard_interface::SessionShardInterface;
25use crate::deduplication::constants::{
26 MAX_XORB_BYTES, MAX_XORB_CHUNKS, XORB_CUT_THRESHOLD_BYTES, XORB_CUT_THRESHOLD_CHUNKS,
27};
28use crate::deduplication::{DataAggregator, DeduplicationMetrics, RawXorbData};
29use crate::error::{DataError, Result};
30use crate::progress_tracking::upload_tracking::{CompletionTracker, FileXorbDependency};
31use crate::progress_tracking::{GroupProgress, GroupProgressReport, ItemProgressReport, UniqueID};
32
33pub struct FileUploadSession {
40 pub(crate) client: Arc<dyn Client + Send + Sync>,
41 pub(crate) shard_interface: SessionShardInterface,
42
43 pub(crate) completion_tracker: Arc<CompletionTracker>,
45
46 progress: Arc<GroupProgress>,
48
49 current_session_data: Mutex<DataAggregator>,
51
52 deduplication_metrics: Mutex<DeduplicationMetrics>,
54
55 xorb_upload_tasks: Mutex<JoinSet<Result<()>>>,
57
58 finalized: AtomicBool,
60}
61
62impl FileUploadSession {
64 pub async fn new(config: Arc<TranslatorConfig>) -> Result<Arc<FileUploadSession>> {
65 FileUploadSession::new_impl(config, false).await
66 }
67
68 pub async fn dry_run(config: Arc<TranslatorConfig>) -> Result<Arc<FileUploadSession>> {
69 FileUploadSession::new_impl(config, true).await
70 }
71
72 async fn new_impl(config: Arc<TranslatorConfig>, dry_run: bool) -> Result<Arc<FileUploadSession>> {
73 let session_id = config
74 .session
75 .session_id
76 .as_ref()
77 .map(Cow::Borrowed)
78 .unwrap_or_else(|| Cow::Owned(UniqueID::new().to_string()));
79
80 let progress = GroupProgress::with_speed_config(
81 xet_config().data.progress_update_speed_sampling_window,
82 xet_config().data.progress_update_speed_min_observations,
83 );
84 let completion_tracker = Arc::new(CompletionTracker::new(progress.clone()));
85
86 let client = create_remote_client(&config, &session_id, dry_run).await?;
87
88 let shard_interface = SessionShardInterface::new(config.clone(), client.clone(), dry_run).await?;
89
90 Ok(Arc::new(Self {
91 shard_interface,
92 client,
93 completion_tracker,
94 progress,
95 current_session_data: Mutex::new(DataAggregator::default()),
96 deduplication_metrics: Mutex::new(DeduplicationMetrics::default()),
97 xorb_upload_tasks: Mutex::new(JoinSet::new()),
98 finalized: AtomicBool::new(false),
99 }))
100 }
101
102 pub async fn upload_files(
103 self: &Arc<Self>,
104 files_and_sha256: impl IntoIterator<Item = (impl AsRef<Path>, Sha256Policy)> + Send,
105 ) -> Result<Vec<XetFileInfo>> {
106 self.check_not_finalized()?;
107 let mut cleaning_tasks: Vec<JoinHandle<_>> = vec![];
108
109 for (f, sha256) in files_and_sha256.into_iter() {
110 let file_path = f.as_ref().to_owned();
111 let file_name: Arc<str> = Arc::from(file_path.to_string_lossy());
112
113 let file_size = std::fs::metadata(&file_path)?.len();
114
115 let updater = self.progress.new_item(UniqueID::new(), file_name.clone());
116 let file_id = self.completion_tracker.register_new_file(updater, Some(file_size));
117
118 let ingestion_concurrency_limiter = XetRuntime::current().common().file_ingestion_semaphore.clone();
119 let session = self.clone();
120
121 cleaning_tasks.push(tokio::spawn(async move {
122 let span = info_span!(
124 "clean_file_task",
125 "file.name" = file_name.to_string(),
126 "file.len" = file_size,
127 "file.new_bytes" = tracing::field::Empty,
128 "file.deduped_bytes" = tracing::field::Empty,
129 "file.defrag_prevented_dedup_bytes" = tracing::field::Empty,
130 "file.new_chunks" = tracing::field::Empty,
131 "file.deduped_chunks" = tracing::field::Empty,
132 "file.defrag_prevented_dedup_chunks" = tracing::field::Empty,
133 );
134 let _processing_permit = ingestion_concurrency_limiter.acquire().await?;
136
137 async move {
138 let mut reader = File::open(&file_path)?;
139
140 let mut cleaner = SingleFileCleaner::new(Some(file_name), file_id, sha256, session);
142 let mut bytes_read = 0;
143
144 while bytes_read < file_size {
145 let bytes_left = file_size - bytes_read;
147 let n_bytes_read = (*xet_config().data.ingestion_block_size).min(bytes_left) as usize;
148
149 let mut buffer = Vec::with_capacity(n_bytes_read);
158 #[allow(clippy::uninit_vec)]
159 unsafe {
160 buffer.set_len(n_bytes_read);
161 }
162
163 reader.read_exact(&mut buffer)?;
165
166 bytes_read += buffer.len() as u64;
167
168 cleaner.add_data_from_bytes(Bytes::from(buffer)).await?;
169 }
170
171 let (xfi, metrics) = cleaner.finish().await?;
173
174 let span = Span::current();
176 span.record("file.new_bytes", metrics.new_bytes);
177 span.record("file.deduped_bytes", metrics.deduped_bytes);
178 span.record("file.defrag_prevented_dedup_bytes", metrics.defrag_prevented_dedup_bytes);
179 span.record("file.new_chunks", metrics.new_chunks);
180 span.record("file.deduped_chunks", metrics.deduped_chunks);
181 span.record("file.defrag_prevented_dedup_chunks", metrics.defrag_prevented_dedup_chunks);
182
183 Result::Ok(xfi)
184 }
185 .instrument(span)
186 .await
187 }));
188 }
189
190 let mut ret = Vec::with_capacity(cleaning_tasks.len());
192
193 for task in cleaning_tasks {
194 ret.push(task.await??);
195 }
196
197 Ok(ret)
198 }
199
200 pub fn start_clean(
211 self: &Arc<Self>,
212 tracking_name: Option<Arc<str>>,
213 size: Option<u64>,
214 sha256: Sha256Policy,
215 ) -> Result<(UniqueID, SingleFileCleaner)> {
216 self.check_not_finalized()?;
217 let id = UniqueID::new();
218 let cleaner = self.start_clean_with_id(id, tracking_name, size, sha256);
219 Ok((id, cleaner))
220 }
221
222 fn start_clean_with_id(
223 self: &Arc<Self>,
224 id: UniqueID,
225 tracking_name: Option<Arc<str>>,
226 size: Option<u64>,
227 sha256: Sha256Policy,
228 ) -> SingleFileCleaner {
229 let updater = self.progress.new_item(id, tracking_name.clone().unwrap_or_default());
230 let file_id = self.completion_tracker.register_new_file(updater, size);
231 SingleFileCleaner::new(tracking_name, file_id, sha256, self.clone())
232 }
233
234 pub async fn spawn_upload_from_path(
238 self: &Arc<Self>,
239 file_path: PathBuf,
240 sha256: Sha256Policy,
241 ) -> Result<(UniqueID, JoinHandle<Result<(XetFileInfo, DeduplicationMetrics)>>)> {
242 self.check_not_finalized()?;
243 let file_size = std::fs::metadata(&file_path)?.len();
244 let tracking_name: Arc<str> = Arc::from(file_path.to_string_lossy().as_ref());
245 let (id, cleaner) = self.start_clean(Some(tracking_name), Some(file_size), sha256)?;
246
247 let rt = XetRuntime::current();
248 let semaphore = rt.common().file_ingestion_semaphore.clone();
249 let handle = rt.spawn(async move {
250 let _permit = semaphore.acquire().await?;
251 Self::feed_file_to_cleaner(cleaner, &file_path).await
252 });
253
254 Ok((id, handle))
255 }
256
257 pub async fn spawn_upload_bytes(
261 self: &Arc<Self>,
262 bytes: Vec<u8>,
263 sha256: Sha256Policy,
264 tracking_name: Option<Arc<str>>,
265 ) -> Result<(UniqueID, JoinHandle<Result<(XetFileInfo, DeduplicationMetrics)>>)> {
266 self.check_not_finalized()?;
267 let (id, mut cleaner) = self.start_clean(tracking_name, Some(bytes.len() as u64), sha256)?;
268
269 let rt = XetRuntime::current();
270 let semaphore = rt.common().file_ingestion_semaphore.clone();
271 let handle = rt.spawn(async move {
272 let _permit = semaphore.acquire().await?;
273 cleaner.add_data(&bytes).await?;
274 cleaner.finish().await
275 });
276
277 Ok((id, handle))
278 }
279
280 async fn feed_file_to_cleaner(
281 mut cleaner: SingleFileCleaner,
282 file_path: &Path,
283 ) -> Result<(XetFileInfo, DeduplicationMetrics)> {
284 let mut reader = File::open(file_path)?;
285 let filesize = reader.metadata()?.len();
286 let mut buffer = vec![0u8; u64::min(filesize, *xet_config().data.ingestion_block_size) as usize];
287
288 loop {
289 let n = reader.read(&mut buffer)?;
290 if n == 0 {
291 break;
292 }
293 cleaner.add_data(&buffer[..n]).await?;
294 }
295 cleaner.finish().await
296 }
297
298 #[instrument(skip_all, name="FileUploadSession::register_new_xorb_for_upload", fields(xorb_len = xorb.num_bytes()))]
301 pub(crate) async fn register_new_xorb(
302 self: &Arc<Self>,
303 xorb: RawXorbData,
304 file_dependencies: &[FileXorbDependency],
305 ) -> Result<bool> {
306 {
308 let mut upload_tasks = self.xorb_upload_tasks.lock().await;
309 while let Some(result) = upload_tasks.try_join_next() {
310 result??;
311 }
312 }
313
314 let xorb_hash = xorb.hash();
315
316 let xorb_is_new = self.completion_tracker.register_new_xorb(xorb_hash, xorb.num_bytes() as u64);
322
323 self.completion_tracker.register_dependencies(file_dependencies);
326
327 if !xorb_is_new {
328 return Ok(false);
329 }
330
331 if xorb.num_bytes() == 0 {
334 self.completion_tracker.register_xorb_upload_completion(xorb_hash);
335 return Ok(true);
336 }
337
338 let xorb_info = Arc::new(xorb.xorb_info.clone());
342 self.shard_interface.add_xorb_block(xorb_info.clone()).await?;
343
344 let xorb_obj = XetRuntime::current()
347 .spawn_blocking(move || SerializedXorbObject::from_xorb(xorb, false))
348 .await??;
349
350 let session = self.clone();
351 let upload_permit = self.client.acquire_upload_permit().await?;
352 let cas_prefix = xet_config().data.default_prefix.clone();
353 let completion_tracker = self.completion_tracker.clone();
354 let xorb_hash = xorb_obj.hash;
355 let raw_num_bytes = xorb_obj.raw_num_bytes;
356 let progress_callback: ProgressCallback = Arc::new(move |delta, _completed, total| {
357 let raw_delta = (delta * raw_num_bytes).checked_div(total).unwrap_or(0);
358 if raw_delta > 0 {
359 completion_tracker
360 .clone()
361 .register_xorb_upload_progress_background(xorb_hash, raw_delta);
362 }
363 });
364
365 self.xorb_upload_tasks.lock().await.spawn(
366 async move {
367 let n_bytes_transmitted = session
368 .client
369 .upload_xorb(&cas_prefix, xorb_obj, Some(progress_callback), upload_permit)
370 .await?;
371
372 session.completion_tracker.register_xorb_upload_completion(xorb_hash);
374
375 session.deduplication_metrics.lock().await.xorb_bytes_uploaded += n_bytes_transmitted;
377
378 session.shard_interface.add_uploaded_xorb_block(xorb_info).await?;
380
381 Ok(())
382 }
383 .instrument(info_span!("FileUploadSession::upload_xorb_task", xorb.hash = xorb_hash.hex())),
384 );
385
386 Ok(true)
387 }
388
389 #[instrument(skip_all, name="FileUploadSession::register_single_file_clean_completion", fields(num_bytes = file_data.num_bytes(), num_chunks = file_data.num_chunks()))]
391 pub(crate) async fn register_single_file_clean_completion(
392 self: &Arc<Self>,
393 mut file_data: DataAggregator,
394 dedup_metrics: &DeduplicationMetrics,
395 ) -> Result<()> {
396 {
398 let mut current_session_data = self.current_session_data.lock().await;
399
400 if current_session_data.num_bytes() + file_data.num_bytes() > *XORB_CUT_THRESHOLD_BYTES
402 || current_session_data.num_chunks() + file_data.num_chunks() > *XORB_CUT_THRESHOLD_CHUNKS
403 {
404 if current_session_data.num_bytes() > file_data.num_bytes() {
406 swap(&mut *current_session_data, &mut file_data);
407 }
408
409 debug_assert_le!(current_session_data.num_bytes(), file_data.num_bytes());
411
412 drop(current_session_data);
414
415 self.process_aggregated_data_as_xorb(file_data).await?;
416 } else {
417 current_session_data.merge_in(file_data);
418 }
419 }
420
421 #[cfg(debug_assertions)]
422 {
423 let current_session_data = self.current_session_data.lock().await;
424 debug_assert_le!(current_session_data.num_bytes(), *MAX_XORB_BYTES);
425 debug_assert_le!(current_session_data.num_chunks(), *MAX_XORB_CHUNKS);
426 }
427
428 self.deduplication_metrics.lock().await.merge_in(dedup_metrics);
430
431 Ok(())
432 }
433
434 async fn process_aggregated_data_as_xorb(self: &Arc<Self>, data_agg: DataAggregator) -> Result<()> {
436 let (xorb, new_files) = data_agg.finalize();
437 let xorb_hash = xorb.hash();
438
439 debug_assert_le!(xorb.num_bytes(), *MAX_XORB_BYTES);
440 debug_assert_le!(xorb.data.len(), *MAX_XORB_CHUNKS);
441
442 let mut new_dependencies = Vec::with_capacity(new_files.len());
445
446 {
447 for (file_id, fi, bytes_in_xorb) in new_files {
448 new_dependencies.push(FileXorbDependency {
449 file_id,
450 xorb_hash,
451 n_bytes: bytes_in_xorb,
452 is_external: false,
453 });
454
455 self.shard_interface.add_file_reconstruction_info(fi).await?;
457 }
458 }
459
460 self.register_new_xorb(xorb, &new_dependencies).await?;
462
463 Ok(())
464 }
465
466 pub(crate) fn register_xorb_dependencies(self: &Arc<Self>, xorb_dependencies: &[FileXorbDependency]) {
468 self.completion_tracker.register_dependencies(xorb_dependencies);
469 }
470
471 #[instrument(skip_all, name="FileUploadSession::finalize", fields(session.id))]
473 async fn finalize_impl(
474 self: Arc<Self>,
475 return_files: bool,
476 ) -> Result<(DeduplicationMetrics, Vec<MDBFileInfo>, GroupProgressReport)> {
477 if self.finalized.swap(true, Ordering::AcqRel) {
478 return Err(DataError::InvalidOperation("FileUploadSession already finalized".to_string()));
479 }
480
481 let data_agg = take(&mut *self.current_session_data.lock().await);
483 self.process_aggregated_data_as_xorb(data_agg).await?;
484
485 let mut metrics = take(&mut *self.deduplication_metrics.lock().await);
487
488 let mut upload_tasks = take(&mut *self.xorb_upload_tasks.lock().await);
490
491 while let Some(result) = upload_tasks.join_next().await {
492 result??;
493 }
494
495 let all_file_info = if return_files {
496 self.shard_interface.session_file_info_list().await?
497 } else {
498 Vec::new()
499 };
500
501 metrics.shard_bytes_uploaded = self.shard_interface.upload_and_register_session_shards().await?;
504 metrics.total_bytes_uploaded = metrics.shard_bytes_uploaded + metrics.xorb_bytes_uploaded;
505
506 #[cfg(debug_assertions)]
507 {
508 self.completion_tracker.assert_complete();
509 self.progress.assert_complete();
510 }
511
512 let report = self.report();
513 Ok((metrics, all_file_info, report))
514 }
515
516 pub async fn checkpoint(self: &Arc<Self>) -> Result<()> {
521 let data_agg = take(&mut *self.current_session_data.lock().await);
523 self.process_aggregated_data_as_xorb(data_agg).await?;
524
525 {
527 let mut upload_tasks = self.xorb_upload_tasks.lock().await;
528
529 while let Some(result) = upload_tasks.join_next().await {
530 result??;
531 }
532 }
533
534 Ok(())
535 }
536
537 fn check_not_finalized(&self) -> Result<()> {
538 if self.finalized.load(Ordering::Acquire) {
539 return Err(DataError::InvalidOperation("FileUploadSession already finalized".to_string()));
540 }
541 Ok(())
542 }
543
544 pub fn progress(&self) -> &Arc<GroupProgress> {
545 &self.progress
546 }
547
548 pub fn report(&self) -> GroupProgressReport {
549 self.progress.report()
550 }
551
552 pub fn item_report(&self, id: UniqueID) -> Option<ItemProgressReport> {
553 self.progress.item_report(id)
554 }
555
556 pub fn item_reports(&self) -> HashMap<UniqueID, ItemProgressReport> {
557 self.progress.item_reports()
558 }
559
560 pub async fn finalize(self: Arc<Self>) -> Result<DeduplicationMetrics> {
561 Ok(self.finalize_impl(false).await?.0)
562 }
563
564 pub async fn finalize_with_report(self: Arc<Self>) -> Result<(DeduplicationMetrics, GroupProgressReport)> {
565 let (metrics, _file_info, report) = self.finalize_impl(false).await?;
566 Ok((metrics, report))
567 }
568
569 pub async fn finalize_with_file_info(self: Arc<Self>) -> Result<(DeduplicationMetrics, Vec<MDBFileInfo>)> {
570 let (metrics, file_info, _report) = self.finalize_impl(true).await?;
571 Ok((metrics, file_info))
572 }
573}
574
575#[cfg(test)]
576mod tests {
577 use std::fs::{File, OpenOptions};
578 use std::io::{Read, Write};
579 use std::path::Path;
580 use std::sync::{Arc, OnceLock};
581
582 use xet_runtime::core::XetRuntime;
583
584 use crate::processing::{FileDownloadSession, FileUploadSession, XetFileInfo};
585
586 fn get_threadpool() -> Arc<XetRuntime> {
588 static THREADPOOL: OnceLock<Arc<XetRuntime>> = OnceLock::new();
589 THREADPOOL
590 .get_or_init(|| XetRuntime::new().expect("Error starting multithreaded runtime."))
591 .clone()
592 }
593
594 async fn test_clean_file(cas_path: &Path, input_path: &Path, output_path: &Path) {
599 let read_data = read(input_path).unwrap().to_vec();
600
601 let mut pf_out = Box::new(
602 OpenOptions::new()
603 .create(true)
604 .write(true)
605 .truncate(true)
606 .open(output_path)
607 .unwrap(),
608 );
609
610 let upload_session = FileUploadSession::new(TranslatorConfig::local_config(cas_path).unwrap().into())
611 .await
612 .unwrap();
613
614 let (_id, mut cleaner) = upload_session
615 .start_clean(Some("test".into()), Some(read_data.len() as u64), Sha256Policy::Compute)
616 .unwrap();
617
618 cleaner.add_data(&read_data[..]).await.unwrap();
620
621 let (xet_file_info, _metrics) = cleaner.finish().await.unwrap();
622 upload_session.finalize().await.unwrap();
623
624 pf_out
625 .write_all(serde_json::to_string(&xet_file_info).unwrap().as_bytes())
626 .unwrap();
627 }
628
629 async fn test_smudge_file(cas_path: &Path, pointer_path: &Path, output_path: &Path) {
634 let mut reader = File::open(pointer_path).unwrap();
635
636 let mut input = String::new();
637 reader.read_to_string(&mut input).unwrap();
638
639 let xet_file = serde_json::from_str::<XetFileInfo>(&input).unwrap();
640
641 let config = TranslatorConfig::local_config(cas_path).unwrap();
642 let session = FileDownloadSession::new(config.into(), None).await.unwrap();
643
644 let (_id, _n_bytes) = session.download_file(&xet_file, output_path).await.unwrap();
645 }
646
647 use std::fs::{read, write};
648
649 use tempfile::tempdir;
650
651 use super::*;
652
653 #[test]
654 fn test_clean_smudge_round_trip() {
655 let temp = tempdir().unwrap();
656 let original_data = b"Hello, world!";
657
658 let runtime = get_threadpool();
659
660 runtime
661 .clone()
662 .bridge_sync(async move {
663 let cas_path = temp.path().join("cas");
664
665 let original_path = temp.path().join("original.txt");
667 write(&original_path, original_data).unwrap();
668
669 let pointer_path = temp.path().join("pointer.txt");
671 test_clean_file(&cas_path, &original_path, &pointer_path).await;
672
673 let hydrated_path = temp.path().join("hydrated.txt");
675 test_smudge_file(&cas_path, &pointer_path, &hydrated_path).await;
676
677 let result_data = read(hydrated_path).unwrap();
679 assert_eq!(original_data.to_vec(), result_data);
680 })
681 .unwrap();
682 }
683
684 #[test]
685 fn test_clean_skip_sha256_no_metadata_ext() {
686 let temp = tempdir().unwrap();
687 let data = b"Hello, skip sha256!";
688
689 let runtime = get_threadpool();
690
691 runtime
692 .clone()
693 .bridge_sync(async move {
694 let cas_path = temp.path().join("cas");
695
696 let upload_session = FileUploadSession::new(TranslatorConfig::local_config(&cas_path).unwrap().into())
697 .await
698 .unwrap();
699
700 let (_id, mut cleaner) = upload_session
701 .start_clean(Some("test".into()), Some(data.len() as u64), Sha256Policy::Skip)
702 .unwrap();
703 cleaner.add_data(data).await.unwrap();
704 cleaner.finish().await.unwrap();
705
706 let (_metrics, file_infos) = upload_session.finalize_with_file_info().await.unwrap();
708 assert_eq!(file_infos.len(), 1);
709 assert!(file_infos[0].metadata_ext.is_none(), "Skip should produce no metadata_ext");
710 })
711 .unwrap();
712 }
713}