1use std::fs::OpenOptions;
2use std::io::{Seek, SeekFrom, Write};
3use std::path::Path;
4use std::sync::Arc;
5use std::sync::atomic::Ordering;
6
7use tokio_util::sync::CancellationToken;
8use tracing::{debug, info};
9use xet_client::cas_client::Client;
10use xet_client::cas_types::FileRange;
11use xet_client::chunk_cache::ChunkCache;
12use xet_core_structures::merklehash::MerkleHash;
13use xet_runtime::config::ReconstructionConfig;
14use xet_runtime::core::{XetRuntime, xet_config};
15use xet_runtime::utils::ClosureGuard;
16use xet_runtime::utils::adjustable_semaphore::AdjustableSemaphore;
17
18use super::data_writer::{DataWriter, DownloadStream, SequentialWriter, UnorderedDownloadStream};
19use super::error::{FileReconstructionError, Result};
20use super::reconstruction_terms::ReconstructionTermManager;
21use super::run_state::{RunError, RunState};
22use crate::progress_tracking::ItemProgressUpdater;
23
24pub struct FileReconstructor {
28 client: Arc<dyn Client>,
29 file_hash: MerkleHash,
30 byte_range: Option<FileRange>,
31 progress_updater: Option<Arc<ItemProgressUpdater>>,
32 config: Arc<ReconstructionConfig>,
33
34 chunk_cache: Option<Arc<dyn ChunkCache>>,
36
37 custom_buffer_semaphore: Option<Arc<AdjustableSemaphore>>,
39
40 cancellation_token: CancellationToken,
44}
45
46impl FileReconstructor {
47 pub fn new(client: &Arc<dyn Client>, file_hash: MerkleHash) -> Self {
48 Self {
49 client: client.clone(),
50 file_hash,
51 byte_range: None,
52 progress_updater: default_progress_updater(),
53 config: Arc::new(xet_config().reconstruction.clone()),
54 chunk_cache: None,
55 custom_buffer_semaphore: None,
56 cancellation_token: CancellationToken::new(),
57 }
58 }
59
60 pub fn with_byte_range(self, byte_range: FileRange) -> Self {
61 Self {
62 byte_range: Some(byte_range),
63 ..self
64 }
65 }
66
67 pub fn with_progress_updater(self, progress_updater: Arc<ItemProgressUpdater>) -> Self {
68 Self {
69 progress_updater: Some(progress_updater),
70 ..self
71 }
72 }
73
74 pub fn with_chunk_cache(self, cache: Arc<dyn ChunkCache>) -> Self {
75 Self {
76 chunk_cache: Some(cache),
77 ..self
78 }
79 }
80
81 pub fn with_config(self, config: impl AsRef<ReconstructionConfig>) -> Self {
82 Self {
83 config: Arc::new(config.as_ref().clone()),
84 ..self
85 }
86 }
87
88 pub fn with_buffer_semaphore(self, semaphore: Arc<AdjustableSemaphore>) -> Self {
92 Self {
93 custom_buffer_semaphore: Some(semaphore),
94 ..self
95 }
96 }
97
98 pub fn with_cancellation_token(self, token: CancellationToken) -> Self {
102 Self {
103 cancellation_token: token,
104 ..self
105 }
106 }
107
108 pub async fn reconstruct_to_file(self, path: &Path, write_offset: Option<u64>, truncate_file: bool) -> Result<u64> {
119 info!(
120 file_hash = %self.file_hash,
121 byte_range = ?self.byte_range,
122 path = %path.display(),
123 write_offset = ?write_offset,
124 "Reconstructing file to disk"
125 );
126
127 if let Some(parent) = path.parent() {
128 std::fs::create_dir_all(parent)?;
129 }
130
131 let mut file = OpenOptions::new().write(true).create(true).truncate(truncate_file).open(path)?;
132
133 let default_write_position = self.byte_range.map_or(0, |r| r.start);
134 let seek_position = write_offset.unwrap_or(default_write_position);
135 if seek_position > 0 {
136 file.seek(SeekFrom::Start(seek_position))?;
137 }
138
139 let run_state = RunState::new(self.cancellation_token.clone(), self.file_hash, self.progress_updater.clone());
140
141 let data_writer = SequentialWriter::new(file, self.config.use_vectored_write, run_state.clone());
142
143 self.run(data_writer, run_state, false).await
144 }
145
146 pub async fn reconstruct_to_writer<W: Write + Send + 'static>(self, writer: W) -> Result<u64> {
151 info!(
152 file_hash = %self.file_hash,
153 byte_range = ?self.byte_range,
154 "Reconstructing file to writer"
155 );
156
157 let run_state = RunState::new(self.cancellation_token.clone(), self.file_hash, self.progress_updater.clone());
158 let data_writer = SequentialWriter::new(writer, self.config.use_vectored_write, run_state.clone());
159 self.run(data_writer, run_state, false).await
160 }
161
162 pub fn reconstruct_to_stream(self) -> DownloadStream {
175 let run_state = RunState::new(self.cancellation_token.clone(), self.file_hash, self.progress_updater.clone());
176
177 DownloadStream::new(self, run_state)
178 }
179
180 pub fn reconstruct_to_unordered_stream(self) -> UnorderedDownloadStream {
195 let run_state = RunState::new(self.cancellation_token.clone(), self.file_hash, self.progress_updater.clone());
196
197 UnorderedDownloadStream::new(self, run_state)
198 }
199
200 pub(crate) async fn run(
207 self,
208 data_writer: Box<dyn DataWriter>,
209 run_state: Arc<RunState>,
210 is_streaming: bool,
211 ) -> Result<u64> {
212 match self.run_impl(data_writer, &run_state, is_streaming).await {
213 Ok(v) => Ok(v),
214 Err(RunError::Cancelled) => {
215 run_state.check_error()?;
216 Ok(0)
217 },
218 Err(RunError::Error(e)) => {
219 run_state.set_error(e.clone());
220 Err(e)
221 },
222 }
223 }
224
225 async fn run_impl(
226 self,
227 mut data_writer: Box<dyn DataWriter>,
228 run_state: &RunState,
229 _is_streaming: bool,
230 ) -> std::result::Result<u64, RunError> {
231 let Self {
232 client,
233 byte_range,
234 config,
235 chunk_cache,
236 custom_buffer_semaphore,
237 ..
238 } = self;
239
240 run_state.check_run_state()?;
241
242 let file_hash = *run_state.file_hash();
243 let requested_range = byte_range.unwrap_or_else(FileRange::full);
244
245 let mut term_manager = ReconstructionTermManager::new(
246 config.clone(),
247 client.clone(),
248 file_hash,
249 requested_range,
250 run_state.progress_updater().cloned(),
251 )
252 .await?;
253
254 let using_global_memory_limit = custom_buffer_semaphore.is_none();
255 let download_buffer_semaphore = custom_buffer_semaphore
256 .unwrap_or_else(|| XetRuntime::current().common().reconstruction_download_buffer.clone());
257
258 let mut seed_buffer_permit;
264 let _download_count_decrement_guard;
265
266 if using_global_memory_limit {
267 let active_downloads = XetRuntime::current().common().active_downloads.clone();
268 let n = active_downloads.fetch_add(1, Ordering::Relaxed) + 1;
269
270 let base = config.download_buffer_size.as_u64();
271 let perfile = config.download_buffer_perfile_size.as_u64();
272 let limit = config.download_buffer_limit.as_u64();
273
274 let target = base.saturating_add(n.saturating_mul(perfile)).min(limit);
275 seed_buffer_permit = download_buffer_semaphore.increment_permits_to_target(target);
276
277 let buffer_sem = download_buffer_semaphore.clone();
278 _download_count_decrement_guard = Some(ClosureGuard::new(move || {
279 let n = active_downloads.fetch_sub(1, Ordering::Relaxed).saturating_sub(1);
280 let target = base.saturating_add(n.saturating_mul(perfile)).min(limit);
281 buffer_sem.decrement_permits_to_target(target);
282 }));
283 } else {
284 seed_buffer_permit = None;
285 _download_count_decrement_guard = None;
286 }
287
288 let range_start_offset = requested_range.start;
290
291 loop {
295 let maybe_file_terms = tokio::select! {
296 biased;
297 _ = run_state.cancelled() => {
298 return run_state.check_run_state().map(|_| 0);
299 }
300 result = term_manager.next_file_terms() => result?
301 };
302
303 let Some(file_terms) = maybe_file_terms else {
304 break;
305 };
306
307 run_state.check_run_state()?;
308
309 run_state.record_new_block();
310
311 for file_term in file_terms {
313 run_state.check_run_state()?;
314
315 let term_size = file_term.byte_range.end - file_term.byte_range.start;
316
317 debug!(
318 file_hash = %file_hash,
319 xorb_hash = %file_term.xorb_block.xorb_hash,
320 term_byte_range = ?(file_term.byte_range.start, file_term.byte_range.end),
321 term_size,
322 "Processing file term"
323 );
324
325 let buffer_permit = match seed_buffer_permit.as_mut().and_then(|rp| rp.split(term_size)) {
329 Some(split) => split,
330 None => {
331 seed_buffer_permit = None;
332
333 tokio::select! {
335 biased;
336 _ = run_state.cancelled() => {
337 return run_state.check_run_state().map(|_| 0);
338 }
339 result = download_buffer_semaphore.acquire_many(term_size) => {
340 result.map_err(|e| {
341 FileReconstructionError::InternalError(format!(
342 "Error acquiring download buffer permit: {e}"
343 ))
344 })?
345 }
346 }
347 },
348 };
349
350 let data_future = file_term
351 .get_data_task(client.clone(), run_state.progress_updater().cloned(), chunk_cache.clone())
352 .await?;
353
354 #[cfg(debug_assertions)]
355 {
356 let refs = &file_term.xorb_block.references;
357 assert!(refs.iter().any(|r| r.term_chunks == file_term.xorb_chunk_range));
358 }
359
360 let relative_byte_range = FileRange::new(
362 file_term.byte_range.start - range_start_offset,
363 file_term.byte_range.end - range_start_offset,
364 );
365
366 data_writer
367 .set_next_term_data_source(relative_byte_range, Some(buffer_permit), data_future)
368 .await?;
369
370 run_state.record_new_term(term_size);
371 }
372 }
373
374 run_state.log_progress("All term blocks received and scheduled for writing");
375
376 let bytes_written = data_writer.finish().await?;
378 let total_bytes_scheduled = run_state.total_bytes_scheduled();
379
380 debug_assert_eq!(
381 bytes_written, total_bytes_scheduled,
382 "Bytes written ({bytes_written}) should match total bytes scheduled ({total_bytes_scheduled})"
383 );
384
385 run_state.log_progress("File reconstruction completed successfully");
386
387 #[cfg(debug_assertions)]
388 if !_is_streaming && let Some(updater) = run_state.progress_updater() {
389 updater.assert_complete();
390 if let Some(byte_range) = byte_range
391 && byte_range.end < u64::MAX
392 {
393 assert_eq!(updater.total_bytes_completed(), byte_range.end - byte_range.start);
394 }
395 }
396
397 Ok(total_bytes_scheduled)
398 }
399}
400
401#[cfg(test)]
402fn default_progress_updater() -> Option<Arc<ItemProgressUpdater>> {
403 Some(ItemProgressUpdater::new_standalone("test"))
404}
405
406#[cfg(not(test))]
407fn default_progress_updater() -> Option<Arc<ItemProgressUpdater>> {
408 None
409}
410
411#[cfg(test)]
412mod tests {
413 use std::io::{Cursor, Write};
414 use std::sync::Arc;
415 use std::sync::atomic::{AtomicUsize, Ordering};
416 use std::time::Duration;
417
418 use xet_client::cas_client::{ClientTestingUtils, DirectAccessClient, LocalClient, RandomFileContents};
419 use xet_client::cas_types::FileRange;
420 use xet_runtime::core::XetRuntime;
421
422 use super::*;
423 use crate::progress_tracking::ItemProgressUpdater;
424
425 const TEST_CHUNK_SIZE: usize = 101;
426
427 fn test_config() -> ReconstructionConfig {
429 let mut config = ReconstructionConfig::default();
430 config.min_reconstruction_fetch_size = xet_runtime::utils::ByteSize::from("100");
432 config.max_reconstruction_fetch_size = xet_runtime::utils::ByteSize::from("400");
433 config.min_prefetch_buffer = xet_runtime::utils::ByteSize::from("800");
434 config
435 }
436
437 async fn setup_test_file(term_spec: &[(u64, (u64, u64))]) -> (Arc<LocalClient>, RandomFileContents) {
439 let client = LocalClient::temporary().await.unwrap();
440 let file_contents = client.upload_random_file(term_spec, TEST_CHUNK_SIZE).await.unwrap();
441 (client, file_contents)
442 }
443
444 async fn reconstruct_to_vec(
446 client: &Arc<LocalClient>,
447 file_hash: MerkleHash,
448 byte_range: Option<FileRange>,
449 config: &ReconstructionConfig,
450 semaphore: Option<Arc<AdjustableSemaphore>>,
451 ) -> Result<Vec<u8>> {
452 let buffer = Arc::new(std::sync::Mutex::new(Cursor::new(Vec::new())));
453 let writer = StaticCursorWriter(buffer.clone());
454
455 let mut reconstructor =
456 FileReconstructor::new(&(client.clone() as Arc<dyn Client>), file_hash).with_config(config);
457
458 if let Some(range) = byte_range {
459 reconstructor = reconstructor.with_byte_range(range);
460 }
461 if let Some(sem) = semaphore {
462 reconstructor = reconstructor.with_buffer_semaphore(sem);
463 }
464
465 reconstructor.reconstruct_to_writer(writer).await?;
466
467 let data = buffer.lock().unwrap().get_ref().clone();
468 Ok(data)
469 }
470
471 async fn reconstruct_to_file(
474 client: &Arc<LocalClient>,
475 file_hash: MerkleHash,
476 byte_range: Option<FileRange>,
477 config: &ReconstructionConfig,
478 ) -> Result<Vec<u8>> {
479 let temp_dir = tempfile::tempdir().unwrap();
480 let file_path = temp_dir.path().join("output.bin");
481
482 let mut reconstructor =
483 FileReconstructor::new(&(client.clone() as Arc<dyn Client>), file_hash).with_config(config);
484
485 if let Some(range) = byte_range {
486 reconstructor = reconstructor.with_byte_range(range);
487 }
488
489 reconstructor.reconstruct_to_file(&file_path, None, false).await?;
490
491 let file_data = std::fs::read(&file_path)?;
493 let start = byte_range.map(|r| r.start as usize).unwrap_or(0);
494 Ok(file_data[start..].to_vec())
495 }
496
497 async fn reconstruct_to_file_at_specific_offset(
499 client: &Arc<LocalClient>,
500 file_hash: MerkleHash,
501 byte_range: Option<FileRange>,
502 config: &ReconstructionConfig,
503 ) -> Result<Vec<u8>> {
504 let offset = 9u64;
505
506 let temp_dir = tempfile::tempdir().unwrap();
507 let file_path = temp_dir.path().join("output.bin");
508
509 let mut reconstructor =
510 FileReconstructor::new(&(client.clone() as Arc<dyn Client>), file_hash).with_config(config);
511
512 if let Some(range) = byte_range {
513 reconstructor = reconstructor.with_byte_range(range);
514 }
515
516 reconstructor.reconstruct_to_file(&file_path, Some(offset), false).await?;
517
518 let file_data = std::fs::read(&file_path)?;
520 Ok(file_data[offset as usize..].to_vec())
521 }
522
523 async fn reconstruct_to_file_at_offset_zero(
526 client: &Arc<LocalClient>,
527 file_hash: MerkleHash,
528 byte_range: Option<FileRange>,
529 config: &ReconstructionConfig,
530 ) -> Result<Vec<u8>> {
531 let temp_dir = tempfile::tempdir().unwrap();
532 let file_path = temp_dir.path().join("output.bin");
533
534 let mut reconstructor =
535 FileReconstructor::new(&(client.clone() as Arc<dyn Client>), file_hash).with_config(config);
536
537 if let Some(range) = byte_range {
538 reconstructor = reconstructor.with_byte_range(range);
539 }
540
541 reconstructor.reconstruct_to_file(&file_path, Some(0), false).await?;
542
543 let file_data = std::fs::read(&file_path)?;
545 Ok(file_data)
546 }
547
548 struct StaticCursorWriter(Arc<std::sync::Mutex<Cursor<Vec<u8>>>>);
551
552 impl std::io::Write for StaticCursorWriter {
553 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
554 self.0.lock().unwrap().write(buf)
555 }
556
557 fn flush(&mut self) -> std::io::Result<()> {
558 self.0.lock().unwrap().flush()
559 }
560 }
561
562 async fn reconstruct_and_verify_full(
564 client: &Arc<LocalClient>,
565 file_contents: &RandomFileContents,
566 base_config: ReconstructionConfig,
567 ) {
568 let expected = &file_contents.data;
569 let h = file_contents.file_hash;
570
571 for use_vectored in [false, true] {
573 let mut config = base_config.clone();
574 config.use_vectored_write = use_vectored;
575
576 let vec_result = reconstruct_to_vec(client, h, None, &config, None).await.unwrap();
578 assert_eq!(vec_result, *expected, "vec failed (vectored={use_vectored})");
579
580 let file_result = reconstruct_to_file(client, h, None, &config).await.unwrap();
582 assert_eq!(file_result, *expected, "file failed (vectored={use_vectored})");
583
584 let file_offset_result = reconstruct_to_file_at_offset_zero(client, h, None, &config).await.unwrap();
586 assert_eq!(file_offset_result, *expected, "file_at_offset_zero failed (vectored={use_vectored})");
587
588 let file_specific_result = reconstruct_to_file_at_specific_offset(client, h, None, &config).await.unwrap();
590 assert_eq!(file_specific_result, *expected, "file_at_specific_offset failed (vectored={use_vectored})");
591 }
592 }
593
594 async fn reconstruct_and_verify_range(
596 client: &Arc<LocalClient>,
597 file_contents: &RandomFileContents,
598 range: FileRange,
599 base_config: ReconstructionConfig,
600 ) {
601 let expected = &file_contents.data[range.start as usize..range.end as usize];
602
603 for use_vectored in [false, true] {
605 let mut config = base_config.clone();
606 config.use_vectored_write = use_vectored;
607
608 let vec_result = reconstruct_to_vec(client, file_contents.file_hash, Some(range), &config, None)
610 .await
611 .expect("reconstruct_to_vec should succeed");
612 assert_eq!(vec_result, expected, "vec failed (vectored={use_vectored})");
613
614 let file_result = reconstruct_to_file(client, file_contents.file_hash, Some(range), &config)
616 .await
617 .expect("reconstruct_to_file should succeed");
618 assert_eq!(file_result, expected, "file failed (vectored={use_vectored})");
619
620 let file_offset_result =
622 reconstruct_to_file_at_offset_zero(client, file_contents.file_hash, Some(range), &config)
623 .await
624 .expect("reconstruct_to_file_at_offset_zero should succeed");
625 assert_eq!(file_offset_result, expected, "file_at_offset failed (vectored={use_vectored})");
626 }
627 }
628
629 #[tokio::test]
632 async fn test_single_term_full_reconstruction() {
633 let (client, file_contents) = setup_test_file(&[(1, (0, 3))]).await;
634 reconstruct_and_verify_full(&client, &file_contents, test_config()).await;
635 }
636
637 #[tokio::test]
638 async fn test_multiple_terms_same_xorb_full_reconstruction() {
639 let (client, file_contents) = setup_test_file(&[(1, (0, 2)), (1, (2, 4)), (1, (4, 6))]).await;
640 reconstruct_and_verify_full(&client, &file_contents, test_config()).await;
641 }
642
643 #[tokio::test]
644 async fn test_multiple_xorbs_full_reconstruction() {
645 let (client, file_contents) = setup_test_file(&[(1, (0, 3)), (2, (0, 2)), (3, (0, 4))]).await;
646 reconstruct_and_verify_full(&client, &file_contents, test_config()).await;
647 }
648
649 #[tokio::test]
650 async fn test_large_file_many_terms_full_reconstruction() {
651 let term_spec: Vec<(u64, (u64, u64))> = (1..=10).map(|i| (i, (0, 5))).collect();
653 let (client, file_contents) = setup_test_file(&term_spec).await;
654 reconstruct_and_verify_full(&client, &file_contents, test_config()).await;
655 }
656
657 #[tokio::test]
658 async fn test_interleaved_xorbs_full_reconstruction() {
659 let (client, file_contents) = setup_test_file(&[(1, (0, 2)), (2, (0, 2)), (1, (2, 4)), (2, (2, 4))]).await;
660 reconstruct_and_verify_full(&client, &file_contents, test_config()).await;
661 }
662
663 #[tokio::test]
664 async fn test_single_chunk_file() {
665 let (client, file_contents) = setup_test_file(&[(1, (0, 1))]).await;
666 reconstruct_and_verify_full(&client, &file_contents, test_config()).await;
667 }
668
669 #[tokio::test]
670 async fn test_many_small_terms_different_xorbs() {
671 let term_spec: Vec<(u64, (u64, u64))> = (1..=20).map(|i| (i, (0, 1))).collect();
672 let (client, file_contents) = setup_test_file(&term_spec).await;
673 reconstruct_and_verify_full(&client, &file_contents, test_config()).await;
674 }
675
676 #[tokio::test]
679 async fn test_progress_tracker_records_full_reconstruction_bytes() {
680 let (client, file_contents) = setup_test_file(&[(1, (0, 3)), (2, (0, 2))]).await;
681 let config = test_config();
682 let buffer = Arc::new(std::sync::Mutex::new(Cursor::new(Vec::new())));
683 let writer = StaticCursorWriter(buffer.clone());
684
685 let progress_updater = ItemProgressUpdater::new_standalone("file");
686 let bytes_written = FileReconstructor::new(&(client.clone() as Arc<dyn Client>), file_contents.file_hash)
687 .with_config(&config)
688 .with_progress_updater(progress_updater.clone())
689 .reconstruct_to_writer(writer)
690 .await
691 .unwrap();
692
693 assert_eq!(bytes_written, file_contents.data.len() as u64);
694 }
695
696 #[tokio::test]
697 async fn test_progress_tracker_records_partial_range_bytes() {
698 let (client, file_contents) = setup_test_file(&[(1, (0, 10))]).await;
699 let config = test_config();
700 let file_len = file_contents.data.len() as u64;
701 let range = FileRange::new(file_len / 4, file_len * 3 / 4);
702 let expected_bytes = range.end - range.start;
703
704 let buffer = Arc::new(std::sync::Mutex::new(Cursor::new(Vec::new())));
705 let writer = StaticCursorWriter(buffer.clone());
706
707 let progress_updater = ItemProgressUpdater::new_standalone("file");
708 let bytes_written = FileReconstructor::new(&(client.clone() as Arc<dyn Client>), file_contents.file_hash)
709 .with_config(&config)
710 .with_byte_range(range)
711 .with_progress_updater(progress_updater.clone())
712 .reconstruct_to_writer(writer)
713 .await
714 .unwrap();
715
716 assert_eq!(bytes_written, expected_bytes);
717 }
718
719 #[tokio::test]
722 async fn test_external_progress_tracker_incremental_discovery() {
723 let term_spec: Vec<(u64, (u64, u64))> = (1..=5).map(|i| (i, (0, 3))).collect();
724 let (client, file_contents) = setup_test_file(&term_spec).await;
725 let config = test_config();
726
727 let task = ItemProgressUpdater::new_standalone("test_file.bin");
728
729 let buffer = Arc::new(std::sync::Mutex::new(Cursor::new(Vec::new())));
730 let writer = StaticCursorWriter(buffer.clone());
731
732 let bytes_written = FileReconstructor::new(&(client.clone() as Arc<dyn Client>), file_contents.file_hash)
733 .with_config(&config)
734 .with_progress_updater(task.clone())
735 .reconstruct_to_writer(writer)
736 .await
737 .unwrap();
738
739 assert_eq!(bytes_written, file_contents.data.len() as u64);
740
741 task.assert_complete();
742 assert_eq!(task.total_bytes_completed(), file_contents.data.len() as u64);
743 }
744
745 #[tokio::test]
749 async fn test_external_progress_tracker_final_size_upfront() {
750 let term_spec: Vec<(u64, (u64, u64))> = (1..=5).map(|i| (i, (0, 3))).collect();
751 let (client, file_contents) = setup_test_file(&term_spec).await;
752 let config = test_config();
753 let file_size = file_contents.data.len() as u64;
754
755 let task = ItemProgressUpdater::new_standalone("test_file.bin");
756
757 task.update_item_size(file_size, true);
758
759 let buffer = Arc::new(std::sync::Mutex::new(Cursor::new(Vec::new())));
760 let writer = StaticCursorWriter(buffer.clone());
761
762 let bytes_written = FileReconstructor::new(&(client.clone() as Arc<dyn Client>), file_contents.file_hash)
763 .with_config(&config)
764 .with_progress_updater(task.clone())
765 .reconstruct_to_writer(writer)
766 .await
767 .unwrap();
768
769 assert_eq!(bytes_written, file_size);
770
771 assert_eq!(task.total_bytes_completed(), file_size);
772
773 task.assert_complete();
774 }
775
776 #[tokio::test]
779 async fn test_range_first_half() {
780 let (client, file_contents) = setup_test_file(&[(1, (0, 10))]).await;
781 let file_len = file_contents.data.len() as u64;
782 let range = FileRange::new(0, file_len / 2);
783 reconstruct_and_verify_range(&client, &file_contents, range, test_config()).await;
784 }
785
786 #[tokio::test]
787 async fn test_range_second_half() {
788 let (client, file_contents) = setup_test_file(&[(1, (0, 10))]).await;
789 let file_len = file_contents.data.len() as u64;
790 let range = FileRange::new(file_len / 2, file_len);
791 reconstruct_and_verify_range(&client, &file_contents, range, test_config()).await;
792 }
793
794 #[tokio::test]
795 async fn test_range_middle() {
796 let (client, file_contents) = setup_test_file(&[(1, (0, 10))]).await;
797 let file_len = file_contents.data.len() as u64;
798 let range = FileRange::new(file_len / 4, file_len * 3 / 4);
799 reconstruct_and_verify_range(&client, &file_contents, range, test_config()).await;
800 }
801
802 #[tokio::test]
803 async fn test_range_single_byte_start() {
804 let (client, file_contents) = setup_test_file(&[(1, (0, 5))]).await;
805 let range = FileRange::new(0, 1);
806 reconstruct_and_verify_range(&client, &file_contents, range, test_config()).await;
807 }
808
809 #[tokio::test]
810 async fn test_range_single_byte_end() {
811 let (client, file_contents) = setup_test_file(&[(1, (0, 5))]).await;
812 let file_len = file_contents.data.len() as u64;
813 let range = FileRange::new(file_len - 1, file_len);
814 reconstruct_and_verify_range(&client, &file_contents, range, test_config()).await;
815 }
816
817 #[tokio::test]
818 async fn test_range_single_byte_middle() {
819 let (client, file_contents) = setup_test_file(&[(1, (0, 5))]).await;
820 let file_len = file_contents.data.len() as u64;
821 let mid = file_len / 2;
822 let range = FileRange::new(mid, mid + 1);
823 reconstruct_and_verify_range(&client, &file_contents, range, test_config()).await;
824 }
825
826 #[tokio::test]
827 async fn test_range_few_bytes_from_start() {
828 let (client, file_contents) = setup_test_file(&[(1, (0, 5))]).await;
829 let file_len = file_contents.data.len() as u64;
830 let range = FileRange::new(3, file_len);
831 reconstruct_and_verify_range(&client, &file_contents, range, test_config()).await;
832 }
833
834 #[tokio::test]
835 async fn test_range_few_bytes_before_end() {
836 let (client, file_contents) = setup_test_file(&[(1, (0, 5))]).await;
837 let file_len = file_contents.data.len() as u64;
838 let range = FileRange::new(0, file_len - 3);
839 reconstruct_and_verify_range(&client, &file_contents, range, test_config()).await;
840 }
841
842 #[tokio::test]
843 async fn test_range_small_slice_in_middle() {
844 let (client, file_contents) = setup_test_file(&[(1, (0, 10))]).await;
845 let file_len = file_contents.data.len() as u64;
846 let range = FileRange::new(file_len / 3, file_len / 3 + 10);
847 reconstruct_and_verify_range(&client, &file_contents, range, test_config()).await;
848 }
849
850 #[tokio::test]
853 async fn test_range_spanning_multiple_terms() {
854 let (client, file_contents) = setup_test_file(&[(1, (0, 3)), (2, (0, 3)), (3, (0, 3))]).await;
855 let file_len = file_contents.data.len() as u64;
856 let range = FileRange::new(10, file_len - 10);
858 reconstruct_and_verify_range(&client, &file_contents, range, test_config()).await;
859 }
860
861 #[tokio::test]
862 async fn test_range_within_single_term() {
863 let (client, file_contents) = setup_test_file(&[(1, (0, 10)), (2, (0, 10))]).await;
864 let first_term_size = file_contents.terms[0].data.len() as u64;
866 let range = FileRange::new(5, first_term_size - 5);
868 reconstruct_and_verify_range(&client, &file_contents, range, test_config()).await;
869 }
870
871 #[tokio::test]
872 async fn test_range_crossing_term_boundary() {
873 let (client, file_contents) = setup_test_file(&[(1, (0, 5)), (2, (0, 5))]).await;
874 let first_term_size = file_contents.terms[0].data.len() as u64;
875 let range = FileRange::new(first_term_size - 10, first_term_size + 10);
877 reconstruct_and_verify_range(&client, &file_contents, range, test_config()).await;
878 }
879
880 #[tokio::test]
883 async fn test_large_file_range_first_portion() {
884 let term_spec: Vec<(u64, (u64, u64))> = (1..=15).map(|i| (i, (0, 4))).collect();
886 let (client, file_contents) = setup_test_file(&term_spec).await;
887 let file_len = file_contents.data.len() as u64;
888 let range = FileRange::new(0, file_len / 3);
889 reconstruct_and_verify_range(&client, &file_contents, range, test_config()).await;
890 }
891
892 #[tokio::test]
893 async fn test_large_file_range_last_portion() {
894 let term_spec: Vec<(u64, (u64, u64))> = (1..=15).map(|i| (i, (0, 4))).collect();
895 let (client, file_contents) = setup_test_file(&term_spec).await;
896 let file_len = file_contents.data.len() as u64;
897 let range = FileRange::new(file_len * 2 / 3, file_len);
898 reconstruct_and_verify_range(&client, &file_contents, range, test_config()).await;
899 }
900
901 #[tokio::test]
902 async fn test_large_file_range_middle_portion() {
903 let term_spec: Vec<(u64, (u64, u64))> = (1..=15).map(|i| (i, (0, 4))).collect();
904 let (client, file_contents) = setup_test_file(&term_spec).await;
905 let file_len = file_contents.data.len() as u64;
906 let range = FileRange::new(file_len / 3, file_len * 2 / 3);
907 reconstruct_and_verify_range(&client, &file_contents, range, test_config()).await;
908 }
909
910 #[tokio::test]
913 async fn test_complex_mixed_pattern_full() {
914 let term_spec = &[
915 (1, (0, 3)),
916 (2, (0, 2)),
917 (1, (3, 5)),
918 (3, (1, 4)),
919 (2, (4, 6)),
920 (1, (0, 2)),
921 ];
922 let (client, file_contents) = setup_test_file(term_spec).await;
923 reconstruct_and_verify_full(&client, &file_contents, test_config()).await;
924 }
925
926 #[tokio::test]
927 async fn test_complex_mixed_pattern_partial_range() {
928 let term_spec = &[
929 (1, (0, 3)),
930 (2, (0, 2)),
931 (1, (3, 5)),
932 (3, (1, 4)),
933 (2, (4, 6)),
934 (1, (0, 2)),
935 ];
936 let (client, file_contents) = setup_test_file(term_spec).await;
937 let file_len = file_contents.data.len() as u64;
938 let range = FileRange::new(file_len / 4, file_len * 3 / 4);
939 reconstruct_and_verify_range(&client, &file_contents, range, test_config()).await;
940 }
941
942 #[tokio::test]
943 async fn test_overlapping_chunk_ranges() {
944 let (client, file_contents) = setup_test_file(&[(1, (0, 5)), (1, (1, 3)), (1, (2, 4))]).await;
945 reconstruct_and_verify_full(&client, &file_contents, test_config()).await;
946 }
947
948 #[tokio::test]
949 async fn test_non_contiguous_chunks() {
950 let (client, file_contents) = setup_test_file(&[(1, (0, 2)), (1, (4, 6))]).await;
951 let config = test_config();
952 let result = reconstruct_to_vec(&client, file_contents.file_hash, None, &config, None)
953 .await
954 .unwrap();
955 assert_eq!(result, file_contents.data);
956 }
957
958 #[tokio::test]
961 async fn test_default_config_full_reconstruction() {
962 let (client, file_contents) = setup_test_file(&[(1, (0, 5)), (2, (0, 3))]).await;
963 reconstruct_and_verify_full(&client, &file_contents, ReconstructionConfig::default()).await;
965 }
966
967 #[tokio::test]
968 async fn test_default_config_partial_range() {
969 let (client, file_contents) = setup_test_file(&[(1, (0, 5)), (2, (0, 3))]).await;
970 let file_len = file_contents.data.len() as u64;
971 let range = FileRange::new(file_len / 4, file_len * 3 / 4);
972 reconstruct_and_verify_range(&client, &file_contents, range, ReconstructionConfig::default()).await;
973 }
974
975 struct TimeAdvancingWriter {
983 buffer: Arc<std::sync::Mutex<Vec<u8>>>,
984 advance_duration: Duration,
985 write_count: Arc<AtomicUsize>,
986 }
987
988 impl TimeAdvancingWriter {
989 fn new(advance_duration: Duration) -> Self {
990 Self {
991 buffer: Arc::new(std::sync::Mutex::new(Vec::new())),
992 advance_duration,
993 write_count: Arc::new(AtomicUsize::new(0)),
994 }
995 }
996 }
997
998 impl Write for TimeAdvancingWriter {
999 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
1000 let bytes_written = self.buffer.lock().unwrap().write(buf)?;
1001
1002 self.write_count.fetch_add(1, Ordering::Relaxed);
1004
1005 let advance_duration = self.advance_duration;
1009 tokio::runtime::Handle::current().block_on(async {
1010 tokio::time::advance(advance_duration).await;
1011 });
1012
1013 Ok(bytes_written)
1014 }
1015
1016 fn flush(&mut self) -> std::io::Result<()> {
1017 Ok(())
1018 }
1019 }
1020
1021 fn url_refresh_test_config() -> ReconstructionConfig {
1023 let mut config = ReconstructionConfig::default();
1024 config.min_reconstruction_fetch_size = xet_runtime::utils::ByteSize::from("50");
1026 config.max_reconstruction_fetch_size = xet_runtime::utils::ByteSize::from("100");
1027 config.min_prefetch_buffer = xet_runtime::utils::ByteSize::from("50");
1028 config
1029 }
1030
1031 #[tokio::test(start_paused = true)]
1035 async fn test_url_refresh_on_expiration() {
1036 let term_spec = &[(1, (0, 2)), (2, (0, 2)), (3, (0, 2))];
1038 let (client, file_contents) = setup_test_file(term_spec).await;
1039
1040 let url_expiration = Duration::from_secs(1);
1042 client.set_fetch_term_url_expiration(url_expiration);
1043
1044 let time_advance = Duration::from_secs(2);
1046 let writer = TimeAdvancingWriter::new(time_advance);
1047 let writer_buffer = writer.buffer.clone();
1048 let write_count = writer.write_count.clone();
1049
1050 let tiny_semaphore = AdjustableSemaphore::new(1, (1, 1));
1053
1054 FileReconstructor::new(&(client.clone() as Arc<dyn Client>), file_contents.file_hash)
1055 .with_config(url_refresh_test_config())
1056 .with_buffer_semaphore(tiny_semaphore)
1057 .reconstruct_to_writer(writer)
1058 .await
1059 .expect("Reconstruction should succeed with URL refresh");
1060
1061 let reconstructed = writer_buffer.lock().unwrap().clone();
1063 assert_eq!(reconstructed.len(), file_contents.data.len());
1064 assert_eq!(reconstructed, file_contents.data);
1065
1066 assert!(write_count.load(Ordering::Relaxed) >= term_spec.len());
1068 }
1069
1070 #[tokio::test(start_paused = true)]
1074 async fn test_url_refresh_same_xorb_multiple_terms() {
1075 let term_spec = &[(1, (0, 2)), (1, (2, 4)), (1, (4, 6))];
1077 let (client, file_contents) = setup_test_file(term_spec).await;
1078
1079 client.set_fetch_term_url_expiration(Duration::from_secs(1));
1081
1082 let writer = TimeAdvancingWriter::new(Duration::from_secs(2));
1084 let writer_buffer = writer.buffer.clone();
1085
1086 let tiny_semaphore = AdjustableSemaphore::new(1, (1, 1));
1087
1088 FileReconstructor::new(&(client.clone() as Arc<dyn Client>), file_contents.file_hash)
1089 .with_config(url_refresh_test_config())
1090 .with_buffer_semaphore(tiny_semaphore)
1091 .reconstruct_to_writer(writer)
1092 .await
1093 .expect("Reconstruction should succeed");
1094
1095 let reconstructed = writer_buffer.lock().unwrap().clone();
1096 assert_eq!(reconstructed, file_contents.data);
1097 }
1098
1099 #[tokio::test(start_paused = true)]
1101 async fn test_url_refresh_large_file_multiple_blocks() {
1102 let term_spec: Vec<(u64, (u64, u64))> = (1..=5).map(|i| (i, (0, 3))).collect();
1104 let (client, file_contents) = setup_test_file(&term_spec).await;
1105
1106 client.set_fetch_term_url_expiration(Duration::from_secs(1));
1108
1109 let writer = TimeAdvancingWriter::new(Duration::from_secs(2));
1110 let writer_buffer = writer.buffer.clone();
1111
1112 let tiny_semaphore = AdjustableSemaphore::new(1, (1, 1));
1113
1114 FileReconstructor::new(&(client.clone() as Arc<dyn Client>), file_contents.file_hash)
1115 .with_config(url_refresh_test_config())
1116 .with_buffer_semaphore(tiny_semaphore)
1117 .reconstruct_to_writer(writer)
1118 .await
1119 .expect("Reconstruction should succeed");
1120
1121 let reconstructed = writer_buffer.lock().unwrap().clone();
1122 assert_eq!(reconstructed, file_contents.data);
1123 }
1124
1125 #[tokio::test(start_paused = true)]
1127 async fn test_no_url_expiration_control() {
1128 let term_spec = &[(1, (0, 2)), (2, (0, 2)), (3, (0, 2))];
1129 let (client, file_contents) = setup_test_file(term_spec).await;
1130
1131 client.set_fetch_term_url_expiration(Duration::from_secs(3600));
1133
1134 let writer = TimeAdvancingWriter::new(Duration::from_millis(100));
1136 let writer_buffer = writer.buffer.clone();
1137
1138 let tiny_semaphore = AdjustableSemaphore::new(1, (1, 1));
1139
1140 FileReconstructor::new(&(client.clone() as Arc<dyn Client>), file_contents.file_hash)
1141 .with_config(url_refresh_test_config())
1142 .with_buffer_semaphore(tiny_semaphore)
1143 .reconstruct_to_writer(writer)
1144 .await
1145 .expect("Reconstruction should succeed");
1146
1147 let reconstructed = writer_buffer.lock().unwrap().clone();
1148 assert_eq!(reconstructed, file_contents.data);
1149 }
1150
1151 #[tokio::test(start_paused = true)]
1153 async fn test_url_refresh_partial_range() {
1154 let term_spec = &[(1, (0, 5)), (2, (0, 5))];
1155 let (client, file_contents) = setup_test_file(term_spec).await;
1156 let file_len = file_contents.data.len() as u64;
1157
1158 client.set_fetch_term_url_expiration(Duration::from_secs(1));
1159
1160 let writer = TimeAdvancingWriter::new(Duration::from_secs(2));
1161 let writer_buffer = writer.buffer.clone();
1162
1163 let tiny_semaphore = AdjustableSemaphore::new(1, (0, 1));
1164
1165 let range = FileRange::new(file_len / 4, file_len * 3 / 4);
1166
1167 FileReconstructor::new(&(client.clone() as Arc<dyn Client>), file_contents.file_hash)
1168 .with_byte_range(range)
1169 .with_config(url_refresh_test_config())
1170 .with_buffer_semaphore(tiny_semaphore)
1171 .reconstruct_to_writer(writer)
1172 .await
1173 .expect("Reconstruction should succeed");
1174
1175 let reconstructed = writer_buffer.lock().unwrap().clone();
1176 let expected = &file_contents.data[range.start as usize..range.end as usize];
1177 assert_eq!(reconstructed, expected);
1178 }
1179
1180 #[test]
1181 fn test_dynamic_buffer_scaling_noop_increment_preserves_total_permits() {
1182 let mut runtime_config = xet_runtime::config::XetConfig::new();
1183 runtime_config.reconstruction.download_buffer_size = xet_runtime::utils::ByteSize::from("1kb");
1184 runtime_config.reconstruction.download_buffer_limit = xet_runtime::utils::ByteSize::from("4kb");
1185 let expected_total = runtime_config.reconstruction.download_buffer_limit.as_u64();
1186
1187 let rt = XetRuntime::new_with_config(runtime_config).unwrap();
1188
1189 rt.bridge_sync(async move {
1190 let (client, file_contents) = setup_test_file(&[(1, (0, 2)), (2, (0, 2)), (3, (0, 2))]).await;
1191 let sem = XetRuntime::current().common().reconstruction_download_buffer.clone();
1192
1193 let p = sem.increment_total_permits(u64::MAX).unwrap();
1195 drop(p);
1196 assert_eq!(sem.total_permits(), expected_total);
1197
1198 let mut config = test_config();
1199 config.download_buffer_perfile_size = xet_runtime::utils::ByteSize::from("8kb");
1200
1201 let reconstructed = reconstruct_to_vec(&client, file_contents.file_hash, None, &config, None)
1202 .await
1203 .unwrap();
1204 assert_eq!(reconstructed, file_contents.data);
1205
1206 assert_eq!(sem.total_permits(), expected_total);
1207 assert_eq!(XetRuntime::current().common().active_downloads.load(Ordering::Relaxed), 0);
1208 })
1209 .unwrap();
1210 }
1211
1212 async fn reconstruct_range_to_file_path(
1218 client: &Arc<LocalClient>,
1219 file_hash: MerkleHash,
1220 file_path: &std::path::Path,
1221 range: FileRange,
1222 config: ReconstructionConfig,
1223 ) -> Result<u64> {
1224 FileReconstructor::new(&(client.clone() as Arc<dyn Client>), file_hash)
1225 .with_byte_range(range)
1226 .with_config(config)
1227 .reconstruct_to_file(file_path, None, false)
1228 .await
1229 }
1230
1231 #[tokio::test]
1232 async fn test_file_concurrent_non_overlapping_range_writes() {
1233 const NUM_WRITERS: usize = 16;
1235 const LARGE_CHUNK_SIZE: usize = 4096;
1236
1237 let term_spec: Vec<(u64, (u64, u64))> = (1..=16).map(|i| (i, (0, 16))).collect();
1240
1241 let client = LocalClient::temporary().await.unwrap();
1242 let file_contents = client.upload_random_file(&term_spec, LARGE_CHUNK_SIZE).await.unwrap();
1243 let file_len = file_contents.data.len() as u64;
1244
1245 let temp_dir = tempfile::tempdir().unwrap();
1246 let file_path = temp_dir.path().join("output.bin");
1247
1248 std::fs::write(&file_path, vec![0u8; file_len as usize]).unwrap();
1250
1251 let mut config = ReconstructionConfig::default();
1253 config.min_reconstruction_fetch_size = xet_runtime::utils::ByteSize::from("32kb");
1254 config.max_reconstruction_fetch_size = xet_runtime::utils::ByteSize::from("128kb");
1255
1256 let chunk_size = file_len / NUM_WRITERS as u64;
1258 let ranges: Vec<FileRange> = (0..NUM_WRITERS)
1259 .map(|i| {
1260 let start = i as u64 * chunk_size;
1261 let end = if i == NUM_WRITERS - 1 {
1262 file_len
1263 } else {
1264 (i as u64 + 1) * chunk_size
1265 };
1266 FileRange::new(start, end)
1267 })
1268 .collect();
1269
1270 let mut join_set = tokio::task::JoinSet::new();
1272
1273 for range in ranges {
1274 let client = client.clone();
1275 let file_hash = file_contents.file_hash;
1276 let file_path = file_path.clone();
1277 let config = config.clone();
1278
1279 join_set.spawn(async move {
1280 FileReconstructor::new(&(client as Arc<dyn Client>), file_hash)
1281 .with_byte_range(range)
1282 .with_config(config)
1283 .reconstruct_to_file(&file_path, None, false)
1284 .await
1285 });
1286 }
1287
1288 while let Some(result) = join_set.join_next().await {
1290 result.unwrap().unwrap();
1291 }
1292
1293 let reconstructed = std::fs::read(&file_path).unwrap();
1295 assert_eq!(reconstructed.len(), file_contents.data.len());
1296 assert_eq!(reconstructed, file_contents.data);
1297 }
1298
1299 #[tokio::test]
1300 async fn test_file_writes_preserve_existing_content() {
1301 let (client, file_contents) = setup_test_file(&[(1, (0, 10))]).await;
1303 let file_len = file_contents.data.len() as u64;
1304
1305 let temp_dir = tempfile::tempdir().unwrap();
1306 let file_path = temp_dir.path().join("output.bin");
1307
1308 let pattern: Vec<u8> = (0..file_len).map(|i| (i % 251) as u8).collect();
1310 std::fs::write(&file_path, &pattern).unwrap();
1311
1312 let start = file_len / 3;
1314 let end = 2 * file_len / 3;
1315 let range = FileRange::new(start, end);
1316
1317 reconstruct_range_to_file_path(&client, file_contents.file_hash, &file_path, range, test_config())
1318 .await
1319 .unwrap();
1320
1321 let result = std::fs::read(&file_path).unwrap();
1322
1323 assert_eq!(&result[..start as usize], &pattern[..start as usize]);
1325 assert_eq!(&result[end as usize..], &pattern[end as usize..]);
1326
1327 assert_eq!(&result[start as usize..end as usize], &file_contents.data[start as usize..end as usize]);
1329 }
1330
1331 #[tokio::test]
1339 async fn test_triple_disjoint_ranges_full() {
1340 let (client, file_contents) = setup_test_file(&[(1, (0, 2)), (1, (4, 6)), (1, (8, 10))]).await;
1341 reconstruct_and_verify_full(&client, &file_contents, test_config()).await;
1342 }
1343
1344 #[tokio::test]
1346 async fn test_triple_disjoint_ranges_partial() {
1347 let (client, file_contents) = setup_test_file(&[(1, (0, 2)), (1, (4, 6)), (1, (8, 10))]).await;
1348 let file_len = file_contents.data.len() as u64;
1349 let range = FileRange::new(file_len / 4, file_len * 3 / 4);
1350 reconstruct_and_verify_range(&client, &file_contents, range, test_config()).await;
1351 }
1352
1353 #[tokio::test]
1355 async fn test_multi_xorb_interleaved_disjoint() {
1356 let term_spec = &[
1357 (1, (0, 2)),
1358 (2, (0, 2)),
1359 (1, (4, 6)),
1360 (2, (4, 6)),
1361 (1, (8, 10)),
1362 (2, (8, 10)),
1363 ];
1364 let (client, file_contents) = setup_test_file(term_spec).await;
1365 reconstruct_and_verify_full(&client, &file_contents, test_config()).await;
1366 }
1367
1368 #[tokio::test]
1370 async fn test_multi_xorb_interleaved_disjoint_partial() {
1371 let term_spec = &[
1372 (1, (0, 2)),
1373 (2, (0, 2)),
1374 (1, (4, 6)),
1375 (2, (4, 6)),
1376 (1, (8, 10)),
1377 (2, (8, 10)),
1378 ];
1379 let (client, file_contents) = setup_test_file(term_spec).await;
1380 let file_len = file_contents.data.len() as u64;
1381 let range = FileRange::new(file_len / 3, file_len * 2 / 3);
1382 reconstruct_and_verify_range(&client, &file_contents, range, test_config()).await;
1383 }
1384
1385 #[tokio::test]
1387 async fn test_four_disjoint_ranges() {
1388 let term_spec = &[(1, (0, 2)), (1, (4, 6)), (1, (8, 10)), (1, (12, 14))];
1389 let (client, file_contents) = setup_test_file(term_spec).await;
1390 reconstruct_and_verify_full(&client, &file_contents, test_config()).await;
1391 }
1392
1393 #[tokio::test]
1395 async fn test_mixed_contiguous_and_disjoint() {
1396 let term_spec = &[
1397 (1, (0, 3)), (1, (3, 5)), (1, (8, 10)), ];
1401 let (client, file_contents) = setup_test_file(term_spec).await;
1402 reconstruct_and_verify_full(&client, &file_contents, test_config()).await;
1403 }
1404
1405 #[tokio::test]
1407 async fn test_complex_three_xorb_disjoint() {
1408 let term_spec = &[
1409 (1, (0, 2)),
1410 (2, (0, 3)),
1411 (3, (2, 5)),
1412 (1, (5, 8)),
1413 (2, (6, 8)),
1414 (3, (0, 2)),
1415 ];
1416 let (client, file_contents) = setup_test_file(term_spec).await;
1417 reconstruct_and_verify_full(&client, &file_contents, test_config()).await;
1418 }
1419
1420 #[tokio::test]
1422 async fn test_local_client_max_ranges_2_disjoint() {
1423 let client = LocalClient::temporary().await.unwrap();
1424 client.set_max_ranges_per_fetch(2);
1425
1426 let term_spec = &[(1, (0, 2)), (1, (4, 6)), (1, (8, 10)), (1, (12, 14))];
1427 let file_contents = client.upload_random_file(term_spec, TEST_CHUNK_SIZE).await.unwrap();
1428
1429 let config = test_config();
1430 let result = reconstruct_to_vec(&client, file_contents.file_hash, None, &config, None)
1431 .await
1432 .unwrap();
1433 assert_eq!(result, file_contents.data.as_ref());
1434 }
1435
1436 #[tokio::test]
1438 async fn test_local_client_max_ranges_1_multi_xorb() {
1439 let client = LocalClient::temporary().await.unwrap();
1440 client.set_max_ranges_per_fetch(1);
1441
1442 let term_spec = &[(1, (0, 2)), (2, (0, 2)), (1, (4, 6)), (2, (4, 6))];
1443 let file_contents = client.upload_random_file(term_spec, TEST_CHUNK_SIZE).await.unwrap();
1444
1445 let config = test_config();
1446 let result = reconstruct_to_vec(&client, file_contents.file_hash, None, &config, None)
1447 .await
1448 .unwrap();
1449 assert_eq!(result, file_contents.data.as_ref());
1450 }
1451
1452 mod cancellation_tests {
1455 use tokio_util::sync::CancellationToken;
1456
1457 use super::*;
1458
1459 #[tokio::test]
1460 async fn test_cancellation_token_before_start() {
1461 let (client, file_contents) = setup_test_file(&[(1, (0, 3))]).await;
1462 let config = test_config();
1463
1464 let token = CancellationToken::new();
1465 token.cancel();
1466 let buffer = Arc::new(std::sync::Mutex::new(Cursor::new(Vec::new())));
1467 let writer = StaticCursorWriter(buffer.clone());
1468
1469 let bytes_written = FileReconstructor::new(&(client.clone() as Arc<dyn Client>), file_contents.file_hash)
1470 .with_config(&config)
1471 .with_cancellation_token(token)
1472 .reconstruct_to_writer(writer)
1473 .await
1474 .unwrap();
1475
1476 assert_eq!(bytes_written, 0);
1477 }
1478
1479 struct CancellingWriter {
1482 buffer: Arc<std::sync::Mutex<Vec<u8>>>,
1483 cancel_token: CancellationToken,
1484 write_count: AtomicUsize,
1485 cancel_after_writes: usize,
1486 }
1487
1488 impl Write for CancellingWriter {
1489 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
1490 let n = self.buffer.lock().unwrap().write(buf)?;
1491 let count = self.write_count.fetch_add(1, Ordering::Relaxed) + 1;
1492 if count >= self.cancel_after_writes {
1493 self.cancel_token.cancel();
1494 }
1495 Ok(n)
1496 }
1497
1498 fn flush(&mut self) -> std::io::Result<()> {
1499 Ok(())
1500 }
1501 }
1502
1503 #[tokio::test]
1504 async fn test_cancellation_token_during_reconstruction() {
1505 let term_spec: Vec<(u64, (u64, u64))> = (1..=10).map(|i| (i, (0, 5))).collect();
1506 let (client, file_contents) = setup_test_file(&term_spec).await;
1507 let config = test_config();
1508
1509 let token = CancellationToken::new();
1510 let buffer = Arc::new(std::sync::Mutex::new(Vec::new()));
1511
1512 let writer = CancellingWriter {
1513 buffer: buffer.clone(),
1514 cancel_token: token.clone(),
1515 write_count: AtomicUsize::new(0),
1516 cancel_after_writes: 1,
1517 };
1518
1519 let tiny_semaphore = AdjustableSemaphore::new(1, (1, 1));
1521
1522 let bytes_written = FileReconstructor::new(&(client.clone() as Arc<dyn Client>), file_contents.file_hash)
1523 .with_config(&config)
1524 .with_cancellation_token(token)
1525 .with_buffer_semaphore(tiny_semaphore)
1526 .reconstruct_to_writer(writer)
1527 .await
1528 .unwrap();
1529
1530 assert_eq!(bytes_written, 0);
1532 let written = buffer.lock().unwrap().len();
1533 assert!(written < file_contents.data.len());
1534 }
1535
1536 #[tokio::test]
1537 async fn test_cancellation_token_not_set_completes_normally() {
1538 let (client, file_contents) = setup_test_file(&[(1, (0, 3)), (2, (0, 2))]).await;
1539 let config = test_config();
1540
1541 let token = CancellationToken::new();
1542 let buffer = Arc::new(std::sync::Mutex::new(Cursor::new(Vec::new())));
1543 let writer = StaticCursorWriter(buffer.clone());
1544
1545 let bytes_written = FileReconstructor::new(&(client.clone() as Arc<dyn Client>), file_contents.file_hash)
1546 .with_config(&config)
1547 .with_cancellation_token(token)
1548 .reconstruct_to_writer(writer)
1549 .await
1550 .unwrap();
1551
1552 assert_eq!(bytes_written, file_contents.data.len() as u64);
1553 assert_eq!(buffer.lock().unwrap().get_ref().clone(), file_contents.data);
1554 }
1555 }
1556
1557 mod multirange_tests {
1560 use super::*;
1561
1562 fn with_multirange_config(enable: bool) -> Arc<XetRuntime> {
1563 let mut config = xet_runtime::config::XetConfig::new();
1564 config.client.enable_multirange_fetching = enable;
1565 XetRuntime::new_with_config(config).unwrap()
1566 }
1567
1568 #[test]
1571 fn test_multirange_local_client() {
1572 for enable in [false, true] {
1573 let rt = with_multirange_config(enable);
1574 rt.bridge_sync(async move {
1575 let scenarios: Vec<Vec<(u64, (u64, u64))>> = vec![
1576 vec![(1, (0, 2)), (1, (4, 6)), (1, (8, 10))],
1577 vec![
1578 (1, (0, 2)),
1579 (2, (0, 2)),
1580 (1, (4, 6)),
1581 (2, (4, 6)),
1582 (1, (8, 10)),
1583 (2, (8, 10)),
1584 ],
1585 vec![
1586 (1, (0, 2)),
1587 (2, (0, 3)),
1588 (3, (2, 5)),
1589 (1, (5, 8)),
1590 (2, (6, 8)),
1591 (3, (0, 2)),
1592 ],
1593 ];
1594 let config = test_config();
1595 for term_spec in &scenarios {
1596 let (client, fc) = setup_test_file(term_spec).await;
1597 reconstruct_and_verify_full(&client, &fc, config.clone()).await;
1598
1599 let file_len = fc.data.len() as u64;
1600 let range = FileRange::new(file_len / 4, file_len * 3 / 4);
1601 reconstruct_and_verify_range(&client, &fc, range, config.clone()).await;
1602 }
1603 })
1604 .unwrap();
1605 }
1606 }
1607
1608 #[test]
1610 fn test_multirange_max_ranges() {
1611 for enable in [false, true] {
1612 let rt = with_multirange_config(enable);
1613 rt.bridge_sync(async {
1614 let client = LocalClient::temporary().await.unwrap();
1615 client.set_max_ranges_per_fetch(2);
1616
1617 let term_spec = &[(1, (0, 2)), (1, (4, 6)), (1, (8, 10)), (1, (12, 14))];
1618 let fc = client.upload_random_file(term_spec, TEST_CHUNK_SIZE).await.unwrap();
1619
1620 let config = test_config();
1621 let result = reconstruct_to_vec(&client, fc.file_hash, None, &config, None).await.unwrap();
1622 assert_eq!(result, fc.data.as_ref());
1623 })
1624 .unwrap();
1625 }
1626 }
1627 }
1628
1629 #[cfg(feature = "simulation")]
1631 mod server_tests {
1632 use super::*;
1633
1634 async fn reconstruct_via_server(
1641 server: &xet_client::cas_client::LocalTestServer,
1642 file_hash: MerkleHash,
1643 byte_range: Option<FileRange>,
1644 config: &ReconstructionConfig,
1645 ) -> Result<Vec<u8>> {
1646 let buffer = Arc::new(std::sync::Mutex::new(Cursor::new(Vec::new())));
1647 let writer = StaticCursorWriter(buffer.clone());
1648
1649 let client: Arc<dyn Client> = server.remote_client().clone();
1650 let mut reconstructor = FileReconstructor::new(&client, file_hash).with_config(config);
1651
1652 if let Some(range) = byte_range {
1653 reconstructor = reconstructor.with_byte_range(range);
1654 }
1655
1656 reconstructor.reconstruct_to_writer(writer).await?;
1657
1658 let data = buffer.lock().unwrap().get_ref().clone();
1659 Ok(data)
1660 }
1661
1662 #[tokio::test]
1663 async fn test_v1_fallback_full_reconstruction() {
1664 let server = xet_client::cas_client::LocalTestServerBuilder::new().start().await;
1665 let file_contents = server
1666 .remote_client()
1667 .upload_random_file(&[(1, (0, 3)), (2, (0, 2))], TEST_CHUNK_SIZE)
1668 .await
1669 .unwrap();
1670
1671 server.disable_v2_reconstruction(404);
1672
1673 let config = test_config();
1674 let result = reconstruct_via_server(&server, file_contents.file_hash, None, &config)
1675 .await
1676 .unwrap();
1677 assert_eq!(result, file_contents.data.as_ref());
1678 }
1679
1680 #[tokio::test]
1681 async fn test_v1_fallback_partial_range() {
1682 let server = xet_client::cas_client::LocalTestServerBuilder::new().start().await;
1683 let file_contents = server
1684 .remote_client()
1685 .upload_random_file(&[(1, (0, 5)), (2, (0, 3))], TEST_CHUNK_SIZE)
1686 .await
1687 .unwrap();
1688
1689 server.disable_v2_reconstruction(404);
1690
1691 let file_len = file_contents.data.len() as u64;
1692 let range = FileRange::new(file_len / 4, file_len * 3 / 4);
1693
1694 let config = test_config();
1695 let result = reconstruct_via_server(&server, file_contents.file_hash, Some(range), &config)
1696 .await
1697 .unwrap();
1698 assert_eq!(result, &file_contents.data[range.start as usize..range.end as usize]);
1699 }
1700
1701 #[tokio::test]
1702 async fn test_v1_fallback_non_contiguous_chunks() {
1703 let server = xet_client::cas_client::LocalTestServerBuilder::new().start().await;
1704 let file_contents = server
1705 .remote_client()
1706 .upload_random_file(&[(1, (0, 2)), (1, (4, 6))], TEST_CHUNK_SIZE)
1707 .await
1708 .unwrap();
1709
1710 server.disable_v2_reconstruction(404);
1711
1712 let config = test_config();
1713 let result = reconstruct_via_server(&server, file_contents.file_hash, None, &config)
1714 .await
1715 .unwrap();
1716 assert_eq!(result, file_contents.data.as_ref());
1717 }
1718
1719 #[tokio::test]
1720 async fn test_v1_fallback_multiple_xorbs() {
1721 let server = xet_client::cas_client::LocalTestServerBuilder::new().start().await;
1722 let file_contents = server
1723 .remote_client()
1724 .upload_random_file(&[(1, (0, 2)), (2, (0, 3)), (3, (0, 2)), (1, (2, 4))], TEST_CHUNK_SIZE)
1725 .await
1726 .unwrap();
1727
1728 server.disable_v2_reconstruction(404);
1729
1730 let config = test_config();
1731 let result = reconstruct_via_server(&server, file_contents.file_hash, None, &config)
1732 .await
1733 .unwrap();
1734 assert_eq!(result, file_contents.data.as_ref());
1735 }
1736
1737 #[tokio::test]
1739 async fn test_v1_fallback_triple_disjoint_ranges() {
1740 let server = xet_client::cas_client::LocalTestServerBuilder::new().start().await;
1741 let file_contents = server
1742 .remote_client()
1743 .upload_random_file(&[(1, (0, 2)), (1, (4, 6)), (1, (8, 10))], TEST_CHUNK_SIZE)
1744 .await
1745 .unwrap();
1746
1747 server.disable_v2_reconstruction(404);
1748
1749 let config = test_config();
1750 let result = reconstruct_via_server(&server, file_contents.file_hash, None, &config)
1751 .await
1752 .unwrap();
1753 assert_eq!(result, file_contents.data.as_ref());
1754 }
1755
1756 async fn reconstruct_via_server_with_max_ranges(
1760 term_spec: &[(u64, (u64, u64))],
1761 max_ranges: usize,
1762 byte_range: Option<FileRange>,
1763 ) -> (Vec<u8>, RandomFileContents) {
1764 let server = xet_client::cas_client::LocalTestServerBuilder::new().start().await;
1765 let file_contents = server
1766 .remote_client()
1767 .upload_random_file(term_spec, TEST_CHUNK_SIZE)
1768 .await
1769 .unwrap();
1770
1771 server.set_max_ranges_per_fetch(max_ranges);
1772
1773 let config = test_config();
1774 let result = reconstruct_via_server(&server, file_contents.file_hash, byte_range, &config)
1775 .await
1776 .unwrap();
1777 (result, file_contents)
1778 }
1779
1780 #[tokio::test]
1781 async fn test_max_ranges_simple() {
1782 let (result, file_contents) =
1783 reconstruct_via_server_with_max_ranges(&[(1, (0, 3)), (2, (0, 2))], 2, None).await;
1784 assert_eq!(result, file_contents.data.as_ref());
1785 }
1786
1787 #[tokio::test]
1788 async fn test_max_ranges_1_disjoint() {
1789 let (result, file_contents) =
1790 reconstruct_via_server_with_max_ranges(&[(1, (0, 2)), (1, (4, 6))], 1, None).await;
1791 assert_eq!(result, file_contents.data.as_ref());
1792 }
1793
1794 #[tokio::test]
1795 async fn test_max_ranges_2_triple_disjoint() {
1796 let (result, file_contents) =
1797 reconstruct_via_server_with_max_ranges(&[(1, (0, 2)), (1, (4, 6)), (1, (8, 10))], 2, None).await;
1798 assert_eq!(result, file_contents.data.as_ref());
1799 }
1800
1801 #[tokio::test]
1802 async fn test_max_ranges_2_multi_xorb_disjoint() {
1803 let term_spec = &[
1804 (1, (0, 2)),
1805 (2, (0, 2)),
1806 (1, (4, 6)),
1807 (2, (4, 6)),
1808 (1, (8, 10)),
1809 (2, (8, 10)),
1810 ];
1811 let (result, file_contents) = reconstruct_via_server_with_max_ranges(term_spec, 2, None).await;
1812 assert_eq!(result, file_contents.data.as_ref());
1813 }
1814
1815 #[tokio::test]
1816 async fn test_max_ranges_2_partial_range() {
1817 let term_spec = &[
1818 (1, (0, 3)),
1819 (2, (0, 2)),
1820 (1, (3, 5)),
1821 (3, (1, 4)),
1822 (2, (4, 6)),
1823 (1, (0, 2)),
1824 ];
1825 let server = xet_client::cas_client::LocalTestServerBuilder::new().start().await;
1826 let file_contents = server
1827 .remote_client()
1828 .upload_random_file(term_spec, TEST_CHUNK_SIZE)
1829 .await
1830 .unwrap();
1831
1832 server.set_max_ranges_per_fetch(2);
1833
1834 let file_len = file_contents.data.len() as u64;
1835 let range = FileRange::new(file_len / 4, file_len * 3 / 4);
1836
1837 let config = test_config();
1838 let result = reconstruct_via_server(&server, file_contents.file_hash, Some(range), &config)
1839 .await
1840 .unwrap();
1841 assert_eq!(result, &file_contents.data[range.start as usize..range.end as usize]);
1842 }
1843
1844 fn with_multirange_config(enable: bool) -> Arc<XetRuntime> {
1847 let mut config = xet_runtime::config::XetConfig::new();
1848 config.client.enable_multirange_fetching = enable;
1849 XetRuntime::new_with_config(config).unwrap()
1850 }
1851
1852 #[test]
1855 fn test_multirange_via_server() {
1856 for enable in [false, true] {
1857 let rt = with_multirange_config(enable);
1858 rt.bridge_sync(async {
1859 let config = test_config();
1860
1861 let server = xet_client::cas_client::LocalTestServerBuilder::new().start().await;
1863 let fc = server
1864 .remote_client()
1865 .upload_random_file(&[(1, (0, 2)), (1, (4, 6)), (1, (8, 10))], TEST_CHUNK_SIZE)
1866 .await
1867 .unwrap();
1868 let result = reconstruct_via_server(&server, fc.file_hash, None, &config).await.unwrap();
1869 assert_eq!(result, fc.data.as_ref());
1870
1871 let server = xet_client::cas_client::LocalTestServerBuilder::new().start().await;
1873 let fc = server
1874 .remote_client()
1875 .upload_random_file(
1876 &[(1, (0, 2)), (2, (0, 2)), (1, (4, 6)), (2, (4, 6)), (1, (8, 10))],
1877 TEST_CHUNK_SIZE,
1878 )
1879 .await
1880 .unwrap();
1881 server.set_max_ranges_per_fetch(2);
1882 let result = reconstruct_via_server(&server, fc.file_hash, None, &config).await.unwrap();
1883 assert_eq!(result, fc.data.as_ref());
1884
1885 let server = xet_client::cas_client::LocalTestServerBuilder::new().start().await;
1887 let fc = server
1888 .remote_client()
1889 .upload_random_file(&[(1, (0, 3)), (2, (0, 2)), (1, (3, 5)), (2, (4, 6))], TEST_CHUNK_SIZE)
1890 .await
1891 .unwrap();
1892 let file_len = fc.data.len() as u64;
1893 let range = FileRange::new(file_len / 4, file_len * 3 / 4);
1894 let result = reconstruct_via_server(&server, fc.file_hash, Some(range), &config)
1895 .await
1896 .unwrap();
1897 assert_eq!(result, &fc.data[range.start as usize..range.end as usize]);
1898 })
1899 .unwrap();
1900 }
1901 }
1902 } }