Skip to main content

xet_data/file_reconstruction/
file_reconstructor.rs

1use std::fs::OpenOptions;
2use std::io::{Seek, SeekFrom, Write};
3use std::path::Path;
4use std::sync::Arc;
5use std::sync::atomic::Ordering;
6
7use tokio_util::sync::CancellationToken;
8use tracing::{debug, info};
9use xet_client::cas_client::Client;
10use xet_client::cas_types::FileRange;
11use xet_client::chunk_cache::ChunkCache;
12use xet_core_structures::merklehash::MerkleHash;
13use xet_runtime::config::ReconstructionConfig;
14use xet_runtime::core::{XetRuntime, xet_config};
15use xet_runtime::utils::ClosureGuard;
16use xet_runtime::utils::adjustable_semaphore::AdjustableSemaphore;
17
18use super::data_writer::{DataWriter, DownloadStream, SequentialWriter, UnorderedDownloadStream};
19use super::error::{FileReconstructionError, Result};
20use super::reconstruction_terms::ReconstructionTermManager;
21use super::run_state::{RunError, RunState};
22use crate::progress_tracking::ItemProgressUpdater;
23
24/// Reconstructs a file from its content-addressed chunks by downloading xorb blocks
25/// and writing the reassembled data to an output. Supports byte range requests and
26/// uses memory-limited buffering with adaptive prefetching.
27pub struct FileReconstructor {
28    client: Arc<dyn Client>,
29    file_hash: MerkleHash,
30    byte_range: Option<FileRange>,
31    progress_updater: Option<Arc<ItemProgressUpdater>>,
32    config: Arc<ReconstructionConfig>,
33
34    /// Optional on-disk chunk cache for cross-file deduplication.
35    chunk_cache: Option<Arc<dyn ChunkCache>>,
36
37    /// Custom buffer semaphore for testing or specialized use cases.
38    custom_buffer_semaphore: Option<Arc<AdjustableSemaphore>>,
39
40    /// Cancellation token checked at each major step of the reconstruction loop.
41    /// When cancelled, reconstruction stops at its next check point. Long waits
42    /// (such as semaphore acquisition) use `tokio::select!` so they abort promptly.
43    cancellation_token: CancellationToken,
44}
45
46impl FileReconstructor {
47    pub fn new(client: &Arc<dyn Client>, file_hash: MerkleHash) -> Self {
48        Self {
49            client: client.clone(),
50            file_hash,
51            byte_range: None,
52            progress_updater: default_progress_updater(),
53            config: Arc::new(xet_config().reconstruction.clone()),
54            chunk_cache: None,
55            custom_buffer_semaphore: None,
56            cancellation_token: CancellationToken::new(),
57        }
58    }
59
60    pub fn with_byte_range(self, byte_range: FileRange) -> Self {
61        Self {
62            byte_range: Some(byte_range),
63            ..self
64        }
65    }
66
67    pub fn with_progress_updater(self, progress_updater: Arc<ItemProgressUpdater>) -> Self {
68        Self {
69            progress_updater: Some(progress_updater),
70            ..self
71        }
72    }
73
74    pub fn with_chunk_cache(self, cache: Arc<dyn ChunkCache>) -> Self {
75        Self {
76            chunk_cache: Some(cache),
77            ..self
78        }
79    }
80
81    pub fn with_config(self, config: impl AsRef<ReconstructionConfig>) -> Self {
82        Self {
83            config: Arc::new(config.as_ref().clone()),
84            ..self
85        }
86    }
87
88    /// Sets a custom buffer semaphore for controlling download buffer memory usage.
89    /// This is primarily useful for testing scenarios where you want to control
90    /// the timing of term fetches by limiting buffer capacity.
91    pub fn with_buffer_semaphore(self, semaphore: Arc<AdjustableSemaphore>) -> Self {
92        Self {
93            custom_buffer_semaphore: Some(semaphore),
94            ..self
95        }
96    }
97
98    /// Replaces the default cancellation token with the given one. This is used
99    /// when external code needs to share the same token for coordinated
100    /// cancellation.
101    pub fn with_cancellation_token(self, token: CancellationToken) -> Self {
102        Self {
103            cancellation_token: token,
104            ..self
105        }
106    }
107
108    /// Reconstructs the file and writes it to the given path.
109    ///
110    /// The file is opened with read/write access. When `truncate_file` is `true`
111    /// the file is truncated to the reconstructed length; when `false` the file
112    /// is left at its existing size, allowing multiple concurrent reconstructions
113    /// to write to different regions of the same file.
114    ///
115    /// When `write_offset` is `Some(offset)`, writing begins at that byte
116    /// position regardless of the byte range. When `None`, writing begins at
117    /// the byte range start (or 0 for a full-file reconstruction).
118    pub async fn reconstruct_to_file(self, path: &Path, write_offset: Option<u64>, truncate_file: bool) -> Result<u64> {
119        info!(
120            file_hash = %self.file_hash,
121            byte_range = ?self.byte_range,
122            path = %path.display(),
123            write_offset = ?write_offset,
124            "Reconstructing file to disk"
125        );
126
127        if let Some(parent) = path.parent() {
128            std::fs::create_dir_all(parent)?;
129        }
130
131        let mut file = OpenOptions::new().write(true).create(true).truncate(truncate_file).open(path)?;
132
133        let default_write_position = self.byte_range.map_or(0, |r| r.start);
134        let seek_position = write_offset.unwrap_or(default_write_position);
135        if seek_position > 0 {
136            file.seek(SeekFrom::Start(seek_position))?;
137        }
138
139        let run_state = RunState::new(self.cancellation_token.clone(), self.file_hash, self.progress_updater.clone());
140
141        let data_writer = SequentialWriter::new(file, self.config.use_vectored_write, run_state.clone());
142
143        self.run(data_writer, run_state, false).await
144    }
145
146    /// Reconstructs the file and writes it to the given writer.
147    ///
148    /// The writer receives data starting from its current position (position 0
149    /// for a fresh writer), regardless of the byte range being reconstructed.
150    pub async fn reconstruct_to_writer<W: Write + Send + 'static>(self, writer: W) -> Result<u64> {
151        info!(
152            file_hash = %self.file_hash,
153            byte_range = ?self.byte_range,
154            "Reconstructing file to writer"
155        );
156
157        let run_state = RunState::new(self.cancellation_token.clone(), self.file_hash, self.progress_updater.clone());
158        let data_writer = SequentialWriter::new(writer, self.config.use_vectored_write, run_state.clone());
159        self.run(data_writer, run_state, false).await
160    }
161
162    /// Reconstructs the file as a stream, returning a [`DownloadStream`] that
163    /// yields data chunks as they become available.
164    ///
165    /// The reconstruction task is spawned immediately but pauses on an
166    /// internal [`tokio::sync::Notify`] until [`DownloadStream::start`] is
167    /// called (or the first [`DownloadStream::next`] /
168    /// [`DownloadStream::blocking_next`]).
169    ///
170    /// # Panics
171    ///
172    /// Panics if called outside a tokio runtime context (the constructor
173    /// uses [`tokio::spawn`]).
174    pub fn reconstruct_to_stream(self) -> DownloadStream {
175        let run_state = RunState::new(self.cancellation_token.clone(), self.file_hash, self.progress_updater.clone());
176
177        DownloadStream::new(self, run_state)
178    }
179
180    /// Reconstructs the file as an unordered stream, returning an
181    /// [`UnorderedDownloadStream`] that yields `(offset, Bytes)` chunks
182    /// in whatever order they complete.
183    ///
184    /// The reconstruction task is spawned immediately but pauses on an
185    /// internal [`tokio::sync::Notify`] until
186    /// [`UnorderedDownloadStream::start`] is called (or the first
187    /// [`UnorderedDownloadStream::next`] /
188    /// [`UnorderedDownloadStream::blocking_next`]).
189    ///
190    /// # Panics
191    ///
192    /// Panics if called outside a tokio runtime context (the constructor
193    /// uses [`tokio::spawn`]).
194    pub fn reconstruct_to_unordered_stream(self) -> UnorderedDownloadStream {
195        let run_state = RunState::new(self.cancellation_token.clone(), self.file_hash, self.progress_updater.clone());
196
197        UnorderedDownloadStream::new(self, run_state)
198    }
199
200    /// Runs the file reconstruction with error handling and cancellation support.
201    /// Returns the number of bytes written.
202    ///
203    /// When `is_streaming` is true, the progress completion assertions at the end
204    /// of reconstruction are skipped because the stream consumer reports bytes
205    /// asynchronously after this method returns.
206    pub(crate) async fn run(
207        self,
208        data_writer: Box<dyn DataWriter>,
209        run_state: Arc<RunState>,
210        is_streaming: bool,
211    ) -> Result<u64> {
212        match self.run_impl(data_writer, &run_state, is_streaming).await {
213            Ok(v) => Ok(v),
214            Err(RunError::Cancelled) => {
215                run_state.check_error()?;
216                Ok(0)
217            },
218            Err(RunError::Error(e)) => {
219                run_state.set_error(e.clone());
220                Err(e)
221            },
222        }
223    }
224
225    async fn run_impl(
226        self,
227        mut data_writer: Box<dyn DataWriter>,
228        run_state: &RunState,
229        _is_streaming: bool,
230    ) -> std::result::Result<u64, RunError> {
231        let Self {
232            client,
233            byte_range,
234            config,
235            chunk_cache,
236            custom_buffer_semaphore,
237            ..
238        } = self;
239
240        run_state.check_run_state()?;
241
242        let file_hash = *run_state.file_hash();
243        let requested_range = byte_range.unwrap_or_else(FileRange::full);
244
245        let mut term_manager = ReconstructionTermManager::new(
246            config.clone(),
247            client.clone(),
248            file_hash,
249            requested_range,
250            run_state.progress_updater().cloned(),
251        )
252        .await?;
253
254        let using_global_memory_limit = custom_buffer_semaphore.is_none();
255        let download_buffer_semaphore = custom_buffer_semaphore
256            .unwrap_or_else(|| XetRuntime::current().common().reconstruction_download_buffer.clone());
257
258        // Dynamic buffer scaling: the target buffer size grows with the number of active
259        // downloads: target = (base + n * perfile).min(limit). On start we increment to
260        // the new target, possibly getting back a virtual permit that lets this download begin
261        // immediately without queuing behind existing acquires. On exit, the ClosureGuard
262        // recomputes the target for the reduced download count and shrinks back if needed.
263        let mut seed_buffer_permit;
264        let _download_count_decrement_guard;
265
266        if using_global_memory_limit {
267            let active_downloads = XetRuntime::current().common().active_downloads.clone();
268            let n = active_downloads.fetch_add(1, Ordering::Relaxed) + 1;
269
270            let base = config.download_buffer_size.as_u64();
271            let perfile = config.download_buffer_perfile_size.as_u64();
272            let limit = config.download_buffer_limit.as_u64();
273
274            let target = base.saturating_add(n.saturating_mul(perfile)).min(limit);
275            seed_buffer_permit = download_buffer_semaphore.increment_permits_to_target(target);
276
277            let buffer_sem = download_buffer_semaphore.clone();
278            _download_count_decrement_guard = Some(ClosureGuard::new(move || {
279                let n = active_downloads.fetch_sub(1, Ordering::Relaxed).saturating_sub(1);
280                let target = base.saturating_add(n.saturating_mul(perfile)).min(limit);
281                buffer_sem.decrement_permits_to_target(target);
282            }));
283        } else {
284            seed_buffer_permit = None;
285            _download_count_decrement_guard = None;
286        }
287
288        // The range start offset - we need to adjust byte ranges to be relative to this.
289        let range_start_offset = requested_range.start;
290
291        // Outer loop: retrieve blocks of file terms.
292        // Use select! so a background error (which cancels the token) wakes this
293        // up immediately rather than waiting for the network round-trip to finish.
294        loop {
295            let maybe_file_terms = tokio::select! {
296                biased;
297                _ = run_state.cancelled() => {
298                    return run_state.check_run_state().map(|_| 0);
299                }
300                result = term_manager.next_file_terms() => result?
301            };
302
303            let Some(file_terms) = maybe_file_terms else {
304                break;
305            };
306
307            run_state.check_run_state()?;
308
309            run_state.record_new_block();
310
311            // Inner loop: process each file term in the block.
312            for file_term in file_terms {
313                run_state.check_run_state()?;
314
315                let term_size = file_term.byte_range.end - file_term.byte_range.start;
316
317                debug!(
318                    file_hash = %file_hash,
319                    xorb_hash = %file_term.xorb_block.xorb_hash,
320                    term_byte_range = ?(file_term.byte_range.start, file_term.byte_range.end),
321                    term_size,
322                    "Processing file term"
323                );
324
325                // Try to split from the reserved (virtual) permit first, giving this
326                // download immediate access without waiting in the FIFO queue.
327                // Fall back to the shared semaphore if the seed permit has been exhausted.
328                let buffer_permit = match seed_buffer_permit.as_mut().and_then(|rp| rp.split(term_size)) {
329                    Some(split) => split,
330                    None => {
331                        seed_buffer_permit = None;
332
333                        // Use tokio::select! to abort promptly if the run state fires.
334                        tokio::select! {
335                            biased;
336                            _ = run_state.cancelled() => {
337                                return run_state.check_run_state().map(|_| 0);
338                            }
339                            result = download_buffer_semaphore.acquire_many(term_size) => {
340                                result.map_err(|e| {
341                                    FileReconstructionError::InternalError(format!(
342                                        "Error acquiring download buffer permit: {e}"
343                                    ))
344                                })?
345                            }
346                        }
347                    },
348                };
349
350                let data_future = file_term
351                    .get_data_task(client.clone(), run_state.progress_updater().cloned(), chunk_cache.clone())
352                    .await?;
353
354                #[cfg(debug_assertions)]
355                {
356                    let refs = &file_term.xorb_block.references;
357                    assert!(refs.iter().any(|r| r.term_chunks == file_term.xorb_chunk_range));
358                }
359
360                // Adjust byte range to be relative to the requested range start (writer expects 0-based ranges).
361                let relative_byte_range = FileRange::new(
362                    file_term.byte_range.start - range_start_offset,
363                    file_term.byte_range.end - range_start_offset,
364                );
365
366                data_writer
367                    .set_next_term_data_source(relative_byte_range, Some(buffer_permit), data_future)
368                    .await?;
369
370                run_state.record_new_term(term_size);
371            }
372        }
373
374        run_state.log_progress("All term blocks received and scheduled for writing");
375
376        // Finish the data writer and wait for all data to be written.
377        let bytes_written = data_writer.finish().await?;
378        let total_bytes_scheduled = run_state.total_bytes_scheduled();
379
380        debug_assert_eq!(
381            bytes_written, total_bytes_scheduled,
382            "Bytes written ({bytes_written}) should match total bytes scheduled ({total_bytes_scheduled})"
383        );
384
385        run_state.log_progress("File reconstruction completed successfully");
386
387        #[cfg(debug_assertions)]
388        if !_is_streaming && let Some(updater) = run_state.progress_updater() {
389            updater.assert_complete();
390            if let Some(byte_range) = byte_range
391                && byte_range.end < u64::MAX
392            {
393                assert_eq!(updater.total_bytes_completed(), byte_range.end - byte_range.start);
394            }
395        }
396
397        Ok(total_bytes_scheduled)
398    }
399}
400
401#[cfg(test)]
402fn default_progress_updater() -> Option<Arc<ItemProgressUpdater>> {
403    Some(ItemProgressUpdater::new_standalone("test"))
404}
405
406#[cfg(not(test))]
407fn default_progress_updater() -> Option<Arc<ItemProgressUpdater>> {
408    None
409}
410
411#[cfg(test)]
412mod tests {
413    use std::io::{Cursor, Write};
414    use std::sync::Arc;
415    use std::sync::atomic::{AtomicUsize, Ordering};
416    use std::time::Duration;
417
418    use xet_client::cas_client::{ClientTestingUtils, DirectAccessClient, LocalClient, RandomFileContents};
419    use xet_client::cas_types::FileRange;
420    use xet_runtime::core::XetRuntime;
421
422    use super::*;
423    use crate::progress_tracking::ItemProgressUpdater;
424
425    const TEST_CHUNK_SIZE: usize = 101;
426
427    /// Creates a test config with small fetch sizes to force multiple iterations.
428    fn test_config() -> ReconstructionConfig {
429        let mut config = ReconstructionConfig::default();
430        // Use small fetch sizes to force multiple prefetch iterations
431        config.min_reconstruction_fetch_size = xet_runtime::utils::ByteSize::from("100");
432        config.max_reconstruction_fetch_size = xet_runtime::utils::ByteSize::from("400");
433        config.min_prefetch_buffer = xet_runtime::utils::ByteSize::from("800");
434        config
435    }
436
437    /// Creates a test client and uploads a random file with the given term specification.
438    async fn setup_test_file(term_spec: &[(u64, (u64, u64))]) -> (Arc<LocalClient>, RandomFileContents) {
439        let client = LocalClient::temporary().await.unwrap();
440        let file_contents = client.upload_random_file(term_spec, TEST_CHUNK_SIZE).await.unwrap();
441        (client, file_contents)
442    }
443
444    /// Reconstructs a file (or byte range) using a writer and returns the reconstructed data.
445    async fn reconstruct_to_vec(
446        client: &Arc<LocalClient>,
447        file_hash: MerkleHash,
448        byte_range: Option<FileRange>,
449        config: &ReconstructionConfig,
450        semaphore: Option<Arc<AdjustableSemaphore>>,
451    ) -> Result<Vec<u8>> {
452        let buffer = Arc::new(std::sync::Mutex::new(Cursor::new(Vec::new())));
453        let writer = StaticCursorWriter(buffer.clone());
454
455        let mut reconstructor =
456            FileReconstructor::new(&(client.clone() as Arc<dyn Client>), file_hash).with_config(config);
457
458        if let Some(range) = byte_range {
459            reconstructor = reconstructor.with_byte_range(range);
460        }
461        if let Some(sem) = semaphore {
462            reconstructor = reconstructor.with_buffer_semaphore(sem);
463        }
464
465        reconstructor.reconstruct_to_writer(writer).await?;
466
467        let data = buffer.lock().unwrap().get_ref().clone();
468        Ok(data)
469    }
470
471    /// Reconstructs to a file and returns the reconstructed data.
472    /// Creates a temp file, reconstructs to it, then reads the relevant portion back.
473    async fn reconstruct_to_file(
474        client: &Arc<LocalClient>,
475        file_hash: MerkleHash,
476        byte_range: Option<FileRange>,
477        config: &ReconstructionConfig,
478    ) -> Result<Vec<u8>> {
479        let temp_dir = tempfile::tempdir().unwrap();
480        let file_path = temp_dir.path().join("output.bin");
481
482        let mut reconstructor =
483            FileReconstructor::new(&(client.clone() as Arc<dyn Client>), file_hash).with_config(config);
484
485        if let Some(range) = byte_range {
486            reconstructor = reconstructor.with_byte_range(range);
487        }
488
489        reconstructor.reconstruct_to_file(&file_path, None, false).await?;
490
491        // Read back the data from the file at the expected location.
492        let file_data = std::fs::read(&file_path)?;
493        let start = byte_range.map(|r| r.start as usize).unwrap_or(0);
494        Ok(file_data[start..].to_vec())
495    }
496
497    /// Reconstructs to a file at a specific offset and returns the data.
498    async fn reconstruct_to_file_at_specific_offset(
499        client: &Arc<LocalClient>,
500        file_hash: MerkleHash,
501        byte_range: Option<FileRange>,
502        config: &ReconstructionConfig,
503    ) -> Result<Vec<u8>> {
504        let offset = 9u64;
505
506        let temp_dir = tempfile::tempdir().unwrap();
507        let file_path = temp_dir.path().join("output.bin");
508
509        let mut reconstructor =
510            FileReconstructor::new(&(client.clone() as Arc<dyn Client>), file_hash).with_config(config);
511
512        if let Some(range) = byte_range {
513            reconstructor = reconstructor.with_byte_range(range);
514        }
515
516        reconstructor.reconstruct_to_file(&file_path, Some(offset), false).await?;
517
518        // Read back all file data.
519        let file_data = std::fs::read(&file_path)?;
520        Ok(file_data[offset as usize..].to_vec())
521    }
522
523    /// Reconstructs to a file at offset 0 and returns the data.
524    /// This tests writing to the beginning of a file regardless of the byte range.
525    async fn reconstruct_to_file_at_offset_zero(
526        client: &Arc<LocalClient>,
527        file_hash: MerkleHash,
528        byte_range: Option<FileRange>,
529        config: &ReconstructionConfig,
530    ) -> Result<Vec<u8>> {
531        let temp_dir = tempfile::tempdir().unwrap();
532        let file_path = temp_dir.path().join("output.bin");
533
534        let mut reconstructor =
535            FileReconstructor::new(&(client.clone() as Arc<dyn Client>), file_hash).with_config(config);
536
537        if let Some(range) = byte_range {
538            reconstructor = reconstructor.with_byte_range(range);
539        }
540
541        reconstructor.reconstruct_to_file(&file_path, Some(0), false).await?;
542
543        // Read back all file data (it starts at offset 0).
544        let file_data = std::fs::read(&file_path)?;
545        Ok(file_data)
546    }
547
548    /// A wrapper that allows writing to a shared Vec; needed for testing
549    /// with the 'static cursor writer present in the code.
550    struct StaticCursorWriter(Arc<std::sync::Mutex<Cursor<Vec<u8>>>>);
551
552    impl std::io::Write for StaticCursorWriter {
553        fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
554            self.0.lock().unwrap().write(buf)
555        }
556
557        fn flush(&mut self) -> std::io::Result<()> {
558            self.0.lock().unwrap().flush()
559        }
560    }
561
562    /// Reconstructs and verifies the full file using all output methods and vectored/non-vectored writes.
563    async fn reconstruct_and_verify_full(
564        client: &Arc<LocalClient>,
565        file_contents: &RandomFileContents,
566        base_config: ReconstructionConfig,
567    ) {
568        let expected = &file_contents.data;
569        let h = file_contents.file_hash;
570
571        // Test both vectored and non-vectored write paths.
572        for use_vectored in [false, true] {
573            let mut config = base_config.clone();
574            config.use_vectored_write = use_vectored;
575
576            // Test 1: reconstruct_to_writer
577            let vec_result = reconstruct_to_vec(client, h, None, &config, None).await.unwrap();
578            assert_eq!(vec_result, *expected, "vec failed (vectored={use_vectored})");
579
580            // Test 2: reconstruct_to_file
581            let file_result = reconstruct_to_file(client, h, None, &config).await.unwrap();
582            assert_eq!(file_result, *expected, "file failed (vectored={use_vectored})");
583
584            // Test 3: reconstruct_to_file with offset 0
585            let file_offset_result = reconstruct_to_file_at_offset_zero(client, h, None, &config).await.unwrap();
586            assert_eq!(file_offset_result, *expected, "file_at_offset_zero failed (vectored={use_vectored})");
587
588            // Test 4: reconstruct_to_file with specific offset
589            let file_specific_result = reconstruct_to_file_at_specific_offset(client, h, None, &config).await.unwrap();
590            assert_eq!(file_specific_result, *expected, "file_at_specific_offset failed (vectored={use_vectored})");
591        }
592    }
593
594    /// Reconstructs and verifies a byte range using all output methods and vectored/non-vectored writes.
595    async fn reconstruct_and_verify_range(
596        client: &Arc<LocalClient>,
597        file_contents: &RandomFileContents,
598        range: FileRange,
599        base_config: ReconstructionConfig,
600    ) {
601        let expected = &file_contents.data[range.start as usize..range.end as usize];
602
603        // Test both vectored and non-vectored write paths.
604        for use_vectored in [false, true] {
605            let mut config = base_config.clone();
606            config.use_vectored_write = use_vectored;
607
608            // Test 1: reconstruct_to_writer
609            let vec_result = reconstruct_to_vec(client, file_contents.file_hash, Some(range), &config, None)
610                .await
611                .expect("reconstruct_to_vec should succeed");
612            assert_eq!(vec_result, expected, "vec failed (vectored={use_vectored})");
613
614            // Test 2: reconstruct_to_file
615            let file_result = reconstruct_to_file(client, file_contents.file_hash, Some(range), &config)
616                .await
617                .expect("reconstruct_to_file should succeed");
618            assert_eq!(file_result, expected, "file failed (vectored={use_vectored})");
619
620            // Test 3: reconstruct_to_file with offset 0
621            let file_offset_result =
622                reconstruct_to_file_at_offset_zero(client, file_contents.file_hash, Some(range), &config)
623                    .await
624                    .expect("reconstruct_to_file_at_offset_zero should succeed");
625            assert_eq!(file_offset_result, expected, "file_at_offset failed (vectored={use_vectored})");
626        }
627    }
628
629    // ==================== Full File Reconstruction Tests ====================
630
631    #[tokio::test]
632    async fn test_single_term_full_reconstruction() {
633        let (client, file_contents) = setup_test_file(&[(1, (0, 3))]).await;
634        reconstruct_and_verify_full(&client, &file_contents, test_config()).await;
635    }
636
637    #[tokio::test]
638    async fn test_multiple_terms_same_xorb_full_reconstruction() {
639        let (client, file_contents) = setup_test_file(&[(1, (0, 2)), (1, (2, 4)), (1, (4, 6))]).await;
640        reconstruct_and_verify_full(&client, &file_contents, test_config()).await;
641    }
642
643    #[tokio::test]
644    async fn test_multiple_xorbs_full_reconstruction() {
645        let (client, file_contents) = setup_test_file(&[(1, (0, 3)), (2, (0, 2)), (3, (0, 4))]).await;
646        reconstruct_and_verify_full(&client, &file_contents, test_config()).await;
647    }
648
649    #[tokio::test]
650    async fn test_large_file_many_terms_full_reconstruction() {
651        // Create a file large enough to require multiple prefetch iterations
652        let term_spec: Vec<(u64, (u64, u64))> = (1..=10).map(|i| (i, (0, 5))).collect();
653        let (client, file_contents) = setup_test_file(&term_spec).await;
654        reconstruct_and_verify_full(&client, &file_contents, test_config()).await;
655    }
656
657    #[tokio::test]
658    async fn test_interleaved_xorbs_full_reconstruction() {
659        let (client, file_contents) = setup_test_file(&[(1, (0, 2)), (2, (0, 2)), (1, (2, 4)), (2, (2, 4))]).await;
660        reconstruct_and_verify_full(&client, &file_contents, test_config()).await;
661    }
662
663    #[tokio::test]
664    async fn test_single_chunk_file() {
665        let (client, file_contents) = setup_test_file(&[(1, (0, 1))]).await;
666        reconstruct_and_verify_full(&client, &file_contents, test_config()).await;
667    }
668
669    #[tokio::test]
670    async fn test_many_small_terms_different_xorbs() {
671        let term_spec: Vec<(u64, (u64, u64))> = (1..=20).map(|i| (i, (0, 1))).collect();
672        let (client, file_contents) = setup_test_file(&term_spec).await;
673        reconstruct_and_verify_full(&client, &file_contents, test_config()).await;
674    }
675
676    // ==================== Progress tracker tests ====================
677
678    #[tokio::test]
679    async fn test_progress_tracker_records_full_reconstruction_bytes() {
680        let (client, file_contents) = setup_test_file(&[(1, (0, 3)), (2, (0, 2))]).await;
681        let config = test_config();
682        let buffer = Arc::new(std::sync::Mutex::new(Cursor::new(Vec::new())));
683        let writer = StaticCursorWriter(buffer.clone());
684
685        let progress_updater = ItemProgressUpdater::new_standalone("file");
686        let bytes_written = FileReconstructor::new(&(client.clone() as Arc<dyn Client>), file_contents.file_hash)
687            .with_config(&config)
688            .with_progress_updater(progress_updater.clone())
689            .reconstruct_to_writer(writer)
690            .await
691            .unwrap();
692
693        assert_eq!(bytes_written, file_contents.data.len() as u64);
694    }
695
696    #[tokio::test]
697    async fn test_progress_tracker_records_partial_range_bytes() {
698        let (client, file_contents) = setup_test_file(&[(1, (0, 10))]).await;
699        let config = test_config();
700        let file_len = file_contents.data.len() as u64;
701        let range = FileRange::new(file_len / 4, file_len * 3 / 4);
702        let expected_bytes = range.end - range.start;
703
704        let buffer = Arc::new(std::sync::Mutex::new(Cursor::new(Vec::new())));
705        let writer = StaticCursorWriter(buffer.clone());
706
707        let progress_updater = ItemProgressUpdater::new_standalone("file");
708        let bytes_written = FileReconstructor::new(&(client.clone() as Arc<dyn Client>), file_contents.file_hash)
709            .with_config(&config)
710            .with_byte_range(range)
711            .with_progress_updater(progress_updater.clone())
712            .reconstruct_to_writer(writer)
713            .await
714            .unwrap();
715
716        assert_eq!(bytes_written, expected_bytes);
717    }
718
719    /// Verifies the external progress tracker flow without a known file size:
720    /// totals are discovered incrementally by the ReconstructionTermManager.
721    #[tokio::test]
722    async fn test_external_progress_tracker_incremental_discovery() {
723        let term_spec: Vec<(u64, (u64, u64))> = (1..=5).map(|i| (i, (0, 3))).collect();
724        let (client, file_contents) = setup_test_file(&term_spec).await;
725        let config = test_config();
726
727        let task = ItemProgressUpdater::new_standalone("test_file.bin");
728
729        let buffer = Arc::new(std::sync::Mutex::new(Cursor::new(Vec::new())));
730        let writer = StaticCursorWriter(buffer.clone());
731
732        let bytes_written = FileReconstructor::new(&(client.clone() as Arc<dyn Client>), file_contents.file_hash)
733            .with_config(&config)
734            .with_progress_updater(task.clone())
735            .reconstruct_to_writer(writer)
736            .await
737            .unwrap();
738
739        assert_eq!(bytes_written, file_contents.data.len() as u64);
740
741        task.assert_complete();
742        assert_eq!(task.total_bytes_completed(), file_contents.data.len() as u64);
743    }
744
745    /// Verifies the data_client.rs flow: file size is known upfront (is_final=true),
746    /// then the manager discovers transfer sizes and also tries to update_item_size
747    /// (which is ignored since final was already set).
748    #[tokio::test]
749    async fn test_external_progress_tracker_final_size_upfront() {
750        let term_spec: Vec<(u64, (u64, u64))> = (1..=5).map(|i| (i, (0, 3))).collect();
751        let (client, file_contents) = setup_test_file(&term_spec).await;
752        let config = test_config();
753        let file_size = file_contents.data.len() as u64;
754
755        let task = ItemProgressUpdater::new_standalone("test_file.bin");
756
757        task.update_item_size(file_size, true);
758
759        let buffer = Arc::new(std::sync::Mutex::new(Cursor::new(Vec::new())));
760        let writer = StaticCursorWriter(buffer.clone());
761
762        let bytes_written = FileReconstructor::new(&(client.clone() as Arc<dyn Client>), file_contents.file_hash)
763            .with_config(&config)
764            .with_progress_updater(task.clone())
765            .reconstruct_to_writer(writer)
766            .await
767            .unwrap();
768
769        assert_eq!(bytes_written, file_size);
770
771        assert_eq!(task.total_bytes_completed(), file_size);
772
773        task.assert_complete();
774    }
775
776    // ==================== Byte Range Reconstruction Tests ====================
777
778    #[tokio::test]
779    async fn test_range_first_half() {
780        let (client, file_contents) = setup_test_file(&[(1, (0, 10))]).await;
781        let file_len = file_contents.data.len() as u64;
782        let range = FileRange::new(0, file_len / 2);
783        reconstruct_and_verify_range(&client, &file_contents, range, test_config()).await;
784    }
785
786    #[tokio::test]
787    async fn test_range_second_half() {
788        let (client, file_contents) = setup_test_file(&[(1, (0, 10))]).await;
789        let file_len = file_contents.data.len() as u64;
790        let range = FileRange::new(file_len / 2, file_len);
791        reconstruct_and_verify_range(&client, &file_contents, range, test_config()).await;
792    }
793
794    #[tokio::test]
795    async fn test_range_middle() {
796        let (client, file_contents) = setup_test_file(&[(1, (0, 10))]).await;
797        let file_len = file_contents.data.len() as u64;
798        let range = FileRange::new(file_len / 4, file_len * 3 / 4);
799        reconstruct_and_verify_range(&client, &file_contents, range, test_config()).await;
800    }
801
802    #[tokio::test]
803    async fn test_range_single_byte_start() {
804        let (client, file_contents) = setup_test_file(&[(1, (0, 5))]).await;
805        let range = FileRange::new(0, 1);
806        reconstruct_and_verify_range(&client, &file_contents, range, test_config()).await;
807    }
808
809    #[tokio::test]
810    async fn test_range_single_byte_end() {
811        let (client, file_contents) = setup_test_file(&[(1, (0, 5))]).await;
812        let file_len = file_contents.data.len() as u64;
813        let range = FileRange::new(file_len - 1, file_len);
814        reconstruct_and_verify_range(&client, &file_contents, range, test_config()).await;
815    }
816
817    #[tokio::test]
818    async fn test_range_single_byte_middle() {
819        let (client, file_contents) = setup_test_file(&[(1, (0, 5))]).await;
820        let file_len = file_contents.data.len() as u64;
821        let mid = file_len / 2;
822        let range = FileRange::new(mid, mid + 1);
823        reconstruct_and_verify_range(&client, &file_contents, range, test_config()).await;
824    }
825
826    #[tokio::test]
827    async fn test_range_few_bytes_from_start() {
828        let (client, file_contents) = setup_test_file(&[(1, (0, 5))]).await;
829        let file_len = file_contents.data.len() as u64;
830        let range = FileRange::new(3, file_len);
831        reconstruct_and_verify_range(&client, &file_contents, range, test_config()).await;
832    }
833
834    #[tokio::test]
835    async fn test_range_few_bytes_before_end() {
836        let (client, file_contents) = setup_test_file(&[(1, (0, 5))]).await;
837        let file_len = file_contents.data.len() as u64;
838        let range = FileRange::new(0, file_len - 3);
839        reconstruct_and_verify_range(&client, &file_contents, range, test_config()).await;
840    }
841
842    #[tokio::test]
843    async fn test_range_small_slice_in_middle() {
844        let (client, file_contents) = setup_test_file(&[(1, (0, 10))]).await;
845        let file_len = file_contents.data.len() as u64;
846        let range = FileRange::new(file_len / 3, file_len / 3 + 10);
847        reconstruct_and_verify_range(&client, &file_contents, range, test_config()).await;
848    }
849
850    // ==================== Multi-term Range Tests ====================
851
852    #[tokio::test]
853    async fn test_range_spanning_multiple_terms() {
854        let (client, file_contents) = setup_test_file(&[(1, (0, 3)), (2, (0, 3)), (3, (0, 3))]).await;
855        let file_len = file_contents.data.len() as u64;
856        // Range that spans all three terms but not full file
857        let range = FileRange::new(10, file_len - 10);
858        reconstruct_and_verify_range(&client, &file_contents, range, test_config()).await;
859    }
860
861    #[tokio::test]
862    async fn test_range_within_single_term() {
863        let (client, file_contents) = setup_test_file(&[(1, (0, 10)), (2, (0, 10))]).await;
864        // First term size
865        let first_term_size = file_contents.terms[0].data.len() as u64;
866        // Range within the first term only
867        let range = FileRange::new(5, first_term_size - 5);
868        reconstruct_and_verify_range(&client, &file_contents, range, test_config()).await;
869    }
870
871    #[tokio::test]
872    async fn test_range_crossing_term_boundary() {
873        let (client, file_contents) = setup_test_file(&[(1, (0, 5)), (2, (0, 5))]).await;
874        let first_term_size = file_contents.terms[0].data.len() as u64;
875        // Range that straddles the boundary between terms
876        let range = FileRange::new(first_term_size - 10, first_term_size + 10);
877        reconstruct_and_verify_range(&client, &file_contents, range, test_config()).await;
878    }
879
880    // ==================== Edge Cases with Multiple Prefetch Iterations ====================
881
882    #[tokio::test]
883    async fn test_large_file_range_first_portion() {
884        // Large file to ensure multiple prefetch iterations
885        let term_spec: Vec<(u64, (u64, u64))> = (1..=15).map(|i| (i, (0, 4))).collect();
886        let (client, file_contents) = setup_test_file(&term_spec).await;
887        let file_len = file_contents.data.len() as u64;
888        let range = FileRange::new(0, file_len / 3);
889        reconstruct_and_verify_range(&client, &file_contents, range, test_config()).await;
890    }
891
892    #[tokio::test]
893    async fn test_large_file_range_last_portion() {
894        let term_spec: Vec<(u64, (u64, u64))> = (1..=15).map(|i| (i, (0, 4))).collect();
895        let (client, file_contents) = setup_test_file(&term_spec).await;
896        let file_len = file_contents.data.len() as u64;
897        let range = FileRange::new(file_len * 2 / 3, file_len);
898        reconstruct_and_verify_range(&client, &file_contents, range, test_config()).await;
899    }
900
901    #[tokio::test]
902    async fn test_large_file_range_middle_portion() {
903        let term_spec: Vec<(u64, (u64, u64))> = (1..=15).map(|i| (i, (0, 4))).collect();
904        let (client, file_contents) = setup_test_file(&term_spec).await;
905        let file_len = file_contents.data.len() as u64;
906        let range = FileRange::new(file_len / 3, file_len * 2 / 3);
907        reconstruct_and_verify_range(&client, &file_contents, range, test_config()).await;
908    }
909
910    // ==================== Complex File Structures ====================
911
912    #[tokio::test]
913    async fn test_complex_mixed_pattern_full() {
914        let term_spec = &[
915            (1, (0, 3)),
916            (2, (0, 2)),
917            (1, (3, 5)),
918            (3, (1, 4)),
919            (2, (4, 6)),
920            (1, (0, 2)),
921        ];
922        let (client, file_contents) = setup_test_file(term_spec).await;
923        reconstruct_and_verify_full(&client, &file_contents, test_config()).await;
924    }
925
926    #[tokio::test]
927    async fn test_complex_mixed_pattern_partial_range() {
928        let term_spec = &[
929            (1, (0, 3)),
930            (2, (0, 2)),
931            (1, (3, 5)),
932            (3, (1, 4)),
933            (2, (4, 6)),
934            (1, (0, 2)),
935        ];
936        let (client, file_contents) = setup_test_file(term_spec).await;
937        let file_len = file_contents.data.len() as u64;
938        let range = FileRange::new(file_len / 4, file_len * 3 / 4);
939        reconstruct_and_verify_range(&client, &file_contents, range, test_config()).await;
940    }
941
942    #[tokio::test]
943    async fn test_overlapping_chunk_ranges() {
944        let (client, file_contents) = setup_test_file(&[(1, (0, 5)), (1, (1, 3)), (1, (2, 4))]).await;
945        reconstruct_and_verify_full(&client, &file_contents, test_config()).await;
946    }
947
948    #[tokio::test]
949    async fn test_non_contiguous_chunks() {
950        let (client, file_contents) = setup_test_file(&[(1, (0, 2)), (1, (4, 6))]).await;
951        let config = test_config();
952        let result = reconstruct_to_vec(&client, file_contents.file_hash, None, &config, None)
953            .await
954            .unwrap();
955        assert_eq!(result, file_contents.data);
956    }
957
958    // ==================== Default Config Tests ====================
959
960    #[tokio::test]
961    async fn test_default_config_full_reconstruction() {
962        let (client, file_contents) = setup_test_file(&[(1, (0, 5)), (2, (0, 3))]).await;
963        // Use default config (larger fetch sizes)
964        reconstruct_and_verify_full(&client, &file_contents, ReconstructionConfig::default()).await;
965    }
966
967    #[tokio::test]
968    async fn test_default_config_partial_range() {
969        let (client, file_contents) = setup_test_file(&[(1, (0, 5)), (2, (0, 3))]).await;
970        let file_len = file_contents.data.len() as u64;
971        let range = FileRange::new(file_len / 4, file_len * 3 / 4);
972        reconstruct_and_verify_range(&client, &file_contents, range, ReconstructionConfig::default()).await;
973    }
974
975    // ==================== URL Refresh Tests ====================
976    //
977    // These tests verify that URL refresh logic works correctly when URLs expire.
978    // We use tokio's time advancement (start_paused = true) to control time precisely.
979
980    /// A writer that advances tokio time after each write, causing URL expiration.
981    /// This forces the reconstruction logic to refresh URLs for subsequent fetches.
982    struct TimeAdvancingWriter {
983        buffer: Arc<std::sync::Mutex<Vec<u8>>>,
984        advance_duration: Duration,
985        write_count: Arc<AtomicUsize>,
986    }
987
988    impl TimeAdvancingWriter {
989        fn new(advance_duration: Duration) -> Self {
990            Self {
991                buffer: Arc::new(std::sync::Mutex::new(Vec::new())),
992                advance_duration,
993                write_count: Arc::new(AtomicUsize::new(0)),
994            }
995        }
996    }
997
998    impl Write for TimeAdvancingWriter {
999        fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
1000            let bytes_written = self.buffer.lock().unwrap().write(buf)?;
1001
1002            // Increment write count
1003            self.write_count.fetch_add(1, Ordering::Relaxed);
1004
1005            // Advance tokio time to cause URL expiration for next fetch.
1006            // Use Handle::block_on directly since we're in a spawn_blocking context
1007            // (block_in_place is not allowed from blocking threads).
1008            let advance_duration = self.advance_duration;
1009            tokio::runtime::Handle::current().block_on(async {
1010                tokio::time::advance(advance_duration).await;
1011            });
1012
1013            Ok(bytes_written)
1014        }
1015
1016        fn flush(&mut self) -> std::io::Result<()> {
1017            Ok(())
1018        }
1019    }
1020
1021    /// Creates a config with very small fetch sizes to ensure we get multiple terms.
1022    fn url_refresh_test_config() -> ReconstructionConfig {
1023        let mut config = ReconstructionConfig::default();
1024        // Very small fetch sizes to force multiple term blocks
1025        config.min_reconstruction_fetch_size = xet_runtime::utils::ByteSize::from("50");
1026        config.max_reconstruction_fetch_size = xet_runtime::utils::ByteSize::from("100");
1027        config.min_prefetch_buffer = xet_runtime::utils::ByteSize::from("50");
1028        config
1029    }
1030
1031    /// Test that URL refresh works correctly when URLs expire between term fetches.
1032    /// Uses a tiny buffer semaphore (1 byte) to force sequential term processing,
1033    /// and advances time after each write to cause URL expiration.
1034    #[tokio::test(start_paused = true)]
1035    async fn test_url_refresh_on_expiration() {
1036        // Create a file with multiple terms from multiple xorbs
1037        let term_spec = &[(1, (0, 2)), (2, (0, 2)), (3, (0, 2))];
1038        let (client, file_contents) = setup_test_file(term_spec).await;
1039
1040        // Set a short URL expiration (1 second)
1041        let url_expiration = Duration::from_secs(1);
1042        client.set_fetch_term_url_expiration(url_expiration);
1043
1044        // Create a writer that advances time by more than the expiration after each write
1045        let time_advance = Duration::from_secs(2);
1046        let writer = TimeAdvancingWriter::new(time_advance);
1047        let writer_buffer = writer.buffer.clone();
1048        let write_count = writer.write_count.clone();
1049
1050        // Create a tiny semaphore (1 permit) to force sequential processing
1051        // This ensures each term is fully written before the next is fetched
1052        let tiny_semaphore = AdjustableSemaphore::new(1, (1, 1));
1053
1054        FileReconstructor::new(&(client.clone() as Arc<dyn Client>), file_contents.file_hash)
1055            .with_config(url_refresh_test_config())
1056            .with_buffer_semaphore(tiny_semaphore)
1057            .reconstruct_to_writer(writer)
1058            .await
1059            .expect("Reconstruction should succeed with URL refresh");
1060
1061        // Verify the reconstructed data is correct
1062        let reconstructed = writer_buffer.lock().unwrap().clone();
1063        assert_eq!(reconstructed.len(), file_contents.data.len());
1064        assert_eq!(reconstructed, file_contents.data);
1065
1066        // Verify we had multiple writes (one per term at minimum)
1067        assert!(write_count.load(Ordering::Relaxed) >= term_spec.len());
1068    }
1069
1070    /// Test URL refresh with a single xorb but multiple terms.
1071    /// This tests the case where the cached xorb data should still be valid
1072    /// but the URL needs refreshing.
1073    #[tokio::test(start_paused = true)]
1074    async fn test_url_refresh_same_xorb_multiple_terms() {
1075        // Create multiple terms from the same xorb
1076        let term_spec = &[(1, (0, 2)), (1, (2, 4)), (1, (4, 6))];
1077        let (client, file_contents) = setup_test_file(term_spec).await;
1078
1079        // Set a short URL expiration
1080        client.set_fetch_term_url_expiration(Duration::from_secs(1));
1081
1082        // Create a writer that advances time
1083        let writer = TimeAdvancingWriter::new(Duration::from_secs(2));
1084        let writer_buffer = writer.buffer.clone();
1085
1086        let tiny_semaphore = AdjustableSemaphore::new(1, (1, 1));
1087
1088        FileReconstructor::new(&(client.clone() as Arc<dyn Client>), file_contents.file_hash)
1089            .with_config(url_refresh_test_config())
1090            .with_buffer_semaphore(tiny_semaphore)
1091            .reconstruct_to_writer(writer)
1092            .await
1093            .expect("Reconstruction should succeed");
1094
1095        let reconstructed = writer_buffer.lock().unwrap().clone();
1096        assert_eq!(reconstructed, file_contents.data);
1097    }
1098
1099    /// Test URL refresh with a larger file that requires multiple prefetch blocks.
1100    #[tokio::test(start_paused = true)]
1101    async fn test_url_refresh_large_file_multiple_blocks() {
1102        // Create a larger file with many terms
1103        let term_spec: Vec<(u64, (u64, u64))> = (1..=5).map(|i| (i, (0, 3))).collect();
1104        let (client, file_contents) = setup_test_file(&term_spec).await;
1105
1106        // Set a short URL expiration
1107        client.set_fetch_term_url_expiration(Duration::from_secs(1));
1108
1109        let writer = TimeAdvancingWriter::new(Duration::from_secs(2));
1110        let writer_buffer = writer.buffer.clone();
1111
1112        let tiny_semaphore = AdjustableSemaphore::new(1, (1, 1));
1113
1114        FileReconstructor::new(&(client.clone() as Arc<dyn Client>), file_contents.file_hash)
1115            .with_config(url_refresh_test_config())
1116            .with_buffer_semaphore(tiny_semaphore)
1117            .reconstruct_to_writer(writer)
1118            .await
1119            .expect("Reconstruction should succeed");
1120
1121        let reconstructed = writer_buffer.lock().unwrap().clone();
1122        assert_eq!(reconstructed, file_contents.data);
1123    }
1124
1125    /// Test that reconstruction works when URLs don't expire (control test).
1126    #[tokio::test(start_paused = true)]
1127    async fn test_no_url_expiration_control() {
1128        let term_spec = &[(1, (0, 2)), (2, (0, 2)), (3, (0, 2))];
1129        let (client, file_contents) = setup_test_file(term_spec).await;
1130
1131        // Set a long URL expiration that won't trigger
1132        client.set_fetch_term_url_expiration(Duration::from_secs(3600));
1133
1134        // Advance time only slightly (less than expiration)
1135        let writer = TimeAdvancingWriter::new(Duration::from_millis(100));
1136        let writer_buffer = writer.buffer.clone();
1137
1138        let tiny_semaphore = AdjustableSemaphore::new(1, (1, 1));
1139
1140        FileReconstructor::new(&(client.clone() as Arc<dyn Client>), file_contents.file_hash)
1141            .with_config(url_refresh_test_config())
1142            .with_buffer_semaphore(tiny_semaphore)
1143            .reconstruct_to_writer(writer)
1144            .await
1145            .expect("Reconstruction should succeed");
1146
1147        let reconstructed = writer_buffer.lock().unwrap().clone();
1148        assert_eq!(reconstructed, file_contents.data);
1149    }
1150
1151    /// Test partial range reconstruction with URL refresh.
1152    #[tokio::test(start_paused = true)]
1153    async fn test_url_refresh_partial_range() {
1154        let term_spec = &[(1, (0, 5)), (2, (0, 5))];
1155        let (client, file_contents) = setup_test_file(term_spec).await;
1156        let file_len = file_contents.data.len() as u64;
1157
1158        client.set_fetch_term_url_expiration(Duration::from_secs(1));
1159
1160        let writer = TimeAdvancingWriter::new(Duration::from_secs(2));
1161        let writer_buffer = writer.buffer.clone();
1162
1163        let tiny_semaphore = AdjustableSemaphore::new(1, (0, 1));
1164
1165        let range = FileRange::new(file_len / 4, file_len * 3 / 4);
1166
1167        FileReconstructor::new(&(client.clone() as Arc<dyn Client>), file_contents.file_hash)
1168            .with_byte_range(range)
1169            .with_config(url_refresh_test_config())
1170            .with_buffer_semaphore(tiny_semaphore)
1171            .reconstruct_to_writer(writer)
1172            .await
1173            .expect("Reconstruction should succeed");
1174
1175        let reconstructed = writer_buffer.lock().unwrap().clone();
1176        let expected = &file_contents.data[range.start as usize..range.end as usize];
1177        assert_eq!(reconstructed, expected);
1178    }
1179
1180    #[test]
1181    fn test_dynamic_buffer_scaling_noop_increment_preserves_total_permits() {
1182        let mut runtime_config = xet_runtime::config::XetConfig::new();
1183        runtime_config.reconstruction.download_buffer_size = xet_runtime::utils::ByteSize::from("1kb");
1184        runtime_config.reconstruction.download_buffer_limit = xet_runtime::utils::ByteSize::from("4kb");
1185        let expected_total = runtime_config.reconstruction.download_buffer_limit.as_u64();
1186
1187        let rt = XetRuntime::new_with_config(runtime_config).unwrap();
1188
1189        rt.bridge_sync(async move {
1190            let (client, file_contents) = setup_test_file(&[(1, (0, 2)), (2, (0, 2)), (3, (0, 2))]).await;
1191            let sem = XetRuntime::current().common().reconstruction_download_buffer.clone();
1192
1193            // Pre-grow to max so the run's increment request is a no-op.
1194            let p = sem.increment_total_permits(u64::MAX).unwrap();
1195            drop(p);
1196            assert_eq!(sem.total_permits(), expected_total);
1197
1198            let mut config = test_config();
1199            config.download_buffer_perfile_size = xet_runtime::utils::ByteSize::from("8kb");
1200
1201            let reconstructed = reconstruct_to_vec(&client, file_contents.file_hash, None, &config, None)
1202                .await
1203                .unwrap();
1204            assert_eq!(reconstructed, file_contents.data);
1205
1206            assert_eq!(sem.total_permits(), expected_total);
1207            assert_eq!(XetRuntime::current().common().active_downloads.load(Ordering::Relaxed), 0);
1208        })
1209        .unwrap();
1210    }
1211
1212    // ==================== File Output Specific Tests ====================
1213    // Note: Basic file output is tested via reconstruct_and_verify_full/range.
1214    // These tests cover file-specific scenarios like multiple writes to the same file.
1215
1216    /// Helper to reconstruct to a specific file path (for multi-write tests).
1217    async fn reconstruct_range_to_file_path(
1218        client: &Arc<LocalClient>,
1219        file_hash: MerkleHash,
1220        file_path: &std::path::Path,
1221        range: FileRange,
1222        config: ReconstructionConfig,
1223    ) -> Result<u64> {
1224        FileReconstructor::new(&(client.clone() as Arc<dyn Client>), file_hash)
1225            .with_byte_range(range)
1226            .with_config(config)
1227            .reconstruct_to_file(file_path, None, false)
1228            .await
1229    }
1230
1231    #[tokio::test]
1232    async fn test_file_concurrent_non_overlapping_range_writes() {
1233        // Test 16 concurrent writers writing non-overlapping ranges to a ~1MB file.
1234        const NUM_WRITERS: usize = 16;
1235        const LARGE_CHUNK_SIZE: usize = 4096;
1236
1237        // Create a large file (~1MB) with many xorbs.
1238        // Each xorb has ~64KB of data (16 chunks * 4KB), giving us ~1MB total with 16 xorbs.
1239        let term_spec: Vec<(u64, (u64, u64))> = (1..=16).map(|i| (i, (0, 16))).collect();
1240
1241        let client = LocalClient::temporary().await.unwrap();
1242        let file_contents = client.upload_random_file(&term_spec, LARGE_CHUNK_SIZE).await.unwrap();
1243        let file_len = file_contents.data.len() as u64;
1244
1245        let temp_dir = tempfile::tempdir().unwrap();
1246        let file_path = temp_dir.path().join("output.bin");
1247
1248        // Pre-create the file with zeros.
1249        std::fs::write(&file_path, vec![0u8; file_len as usize]).unwrap();
1250
1251        // Use a config with larger fetch sizes for the concurrent test.
1252        let mut config = ReconstructionConfig::default();
1253        config.min_reconstruction_fetch_size = xet_runtime::utils::ByteSize::from("32kb");
1254        config.max_reconstruction_fetch_size = xet_runtime::utils::ByteSize::from("128kb");
1255
1256        // Create 16 non-overlapping ranges.
1257        let chunk_size = file_len / NUM_WRITERS as u64;
1258        let ranges: Vec<FileRange> = (0..NUM_WRITERS)
1259            .map(|i| {
1260                let start = i as u64 * chunk_size;
1261                let end = if i == NUM_WRITERS - 1 {
1262                    file_len
1263                } else {
1264                    (i as u64 + 1) * chunk_size
1265                };
1266                FileRange::new(start, end)
1267            })
1268            .collect();
1269
1270        // Spawn all writers concurrently using a JoinSet.
1271        let mut join_set = tokio::task::JoinSet::new();
1272
1273        for range in ranges {
1274            let client = client.clone();
1275            let file_hash = file_contents.file_hash;
1276            let file_path = file_path.clone();
1277            let config = config.clone();
1278
1279            join_set.spawn(async move {
1280                FileReconstructor::new(&(client as Arc<dyn Client>), file_hash)
1281                    .with_byte_range(range)
1282                    .with_config(config)
1283                    .reconstruct_to_file(&file_path, None, false)
1284                    .await
1285            });
1286        }
1287
1288        // Wait for all writers to complete.
1289        while let Some(result) = join_set.join_next().await {
1290            result.unwrap().unwrap();
1291        }
1292
1293        // Verify the complete file.
1294        let reconstructed = std::fs::read(&file_path).unwrap();
1295        assert_eq!(reconstructed.len(), file_contents.data.len());
1296        assert_eq!(reconstructed, file_contents.data);
1297    }
1298
1299    #[tokio::test]
1300    async fn test_file_writes_preserve_existing_content() {
1301        // Test that writing a range doesn't affect content outside that range.
1302        let (client, file_contents) = setup_test_file(&[(1, (0, 10))]).await;
1303        let file_len = file_contents.data.len() as u64;
1304
1305        let temp_dir = tempfile::tempdir().unwrap();
1306        let file_path = temp_dir.path().join("output.bin");
1307
1308        // Pre-create the file with a specific pattern.
1309        let pattern: Vec<u8> = (0..file_len).map(|i| (i % 251) as u8).collect();
1310        std::fs::write(&file_path, &pattern).unwrap();
1311
1312        // Write only the middle third.
1313        let start = file_len / 3;
1314        let end = 2 * file_len / 3;
1315        let range = FileRange::new(start, end);
1316
1317        reconstruct_range_to_file_path(&client, file_contents.file_hash, &file_path, range, test_config())
1318            .await
1319            .unwrap();
1320
1321        let result = std::fs::read(&file_path).unwrap();
1322
1323        // First and last thirds should still have the pattern.
1324        assert_eq!(&result[..start as usize], &pattern[..start as usize]);
1325        assert_eq!(&result[end as usize..], &pattern[end as usize..]);
1326
1327        // Middle third should have reconstructed data.
1328        assert_eq!(&result[start as usize..end as usize], &file_contents.data[start as usize..end as usize]);
1329    }
1330
1331    // ==================== Multi-Disjoint Range Tests (LocalClient) ====================
1332    //
1333    // These tests exercise complex disjoint range patterns through the LocalClient path
1334    // (no HTTP server), ensuring the reconstruction logic handles V2 multi-range
1335    // XorbBlocks correctly.
1336
1337    /// Single xorb with three disjoint chunk ranges.
1338    #[tokio::test]
1339    async fn test_triple_disjoint_ranges_full() {
1340        let (client, file_contents) = setup_test_file(&[(1, (0, 2)), (1, (4, 6)), (1, (8, 10))]).await;
1341        reconstruct_and_verify_full(&client, &file_contents, test_config()).await;
1342    }
1343
1344    /// Single xorb with three disjoint chunk ranges, partial byte range.
1345    #[tokio::test]
1346    async fn test_triple_disjoint_ranges_partial() {
1347        let (client, file_contents) = setup_test_file(&[(1, (0, 2)), (1, (4, 6)), (1, (8, 10))]).await;
1348        let file_len = file_contents.data.len() as u64;
1349        let range = FileRange::new(file_len / 4, file_len * 3 / 4);
1350        reconstruct_and_verify_range(&client, &file_contents, range, test_config()).await;
1351    }
1352
1353    /// Multiple xorbs, each with multiple disjoint ranges, interleaved.
1354    #[tokio::test]
1355    async fn test_multi_xorb_interleaved_disjoint() {
1356        let term_spec = &[
1357            (1, (0, 2)),
1358            (2, (0, 2)),
1359            (1, (4, 6)),
1360            (2, (4, 6)),
1361            (1, (8, 10)),
1362            (2, (8, 10)),
1363        ];
1364        let (client, file_contents) = setup_test_file(term_spec).await;
1365        reconstruct_and_verify_full(&client, &file_contents, test_config()).await;
1366    }
1367
1368    /// Multiple xorbs with interleaved disjoint ranges, partial byte range.
1369    #[tokio::test]
1370    async fn test_multi_xorb_interleaved_disjoint_partial() {
1371        let term_spec = &[
1372            (1, (0, 2)),
1373            (2, (0, 2)),
1374            (1, (4, 6)),
1375            (2, (4, 6)),
1376            (1, (8, 10)),
1377            (2, (8, 10)),
1378        ];
1379        let (client, file_contents) = setup_test_file(term_spec).await;
1380        let file_len = file_contents.data.len() as u64;
1381        let range = FileRange::new(file_len / 3, file_len * 2 / 3);
1382        reconstruct_and_verify_range(&client, &file_contents, range, test_config()).await;
1383    }
1384
1385    /// Single xorb with four disjoint ranges (many gaps).
1386    #[tokio::test]
1387    async fn test_four_disjoint_ranges() {
1388        let term_spec = &[(1, (0, 2)), (1, (4, 6)), (1, (8, 10)), (1, (12, 14))];
1389        let (client, file_contents) = setup_test_file(term_spec).await;
1390        reconstruct_and_verify_full(&client, &file_contents, test_config()).await;
1391    }
1392
1393    /// Mix of contiguous and disjoint ranges from the same xorb.
1394    #[tokio::test]
1395    async fn test_mixed_contiguous_and_disjoint() {
1396        let term_spec = &[
1397            (1, (0, 3)),  // contiguous block
1398            (1, (3, 5)),  // continues contiguously
1399            (1, (8, 10)), // gap, then disjoint
1400        ];
1401        let (client, file_contents) = setup_test_file(term_spec).await;
1402        reconstruct_and_verify_full(&client, &file_contents, test_config()).await;
1403    }
1404
1405    /// Disjoint ranges across three xorbs with a complex access pattern.
1406    #[tokio::test]
1407    async fn test_complex_three_xorb_disjoint() {
1408        let term_spec = &[
1409            (1, (0, 2)),
1410            (2, (0, 3)),
1411            (3, (2, 5)),
1412            (1, (5, 8)),
1413            (2, (6, 8)),
1414            (3, (0, 2)),
1415        ];
1416        let (client, file_contents) = setup_test_file(term_spec).await;
1417        reconstruct_and_verify_full(&client, &file_contents, test_config()).await;
1418    }
1419
1420    /// LocalClient with max_ranges_per_fetch=2 (tests V2 response splitting without HTTP).
1421    #[tokio::test]
1422    async fn test_local_client_max_ranges_2_disjoint() {
1423        let client = LocalClient::temporary().await.unwrap();
1424        client.set_max_ranges_per_fetch(2);
1425
1426        let term_spec = &[(1, (0, 2)), (1, (4, 6)), (1, (8, 10)), (1, (12, 14))];
1427        let file_contents = client.upload_random_file(term_spec, TEST_CHUNK_SIZE).await.unwrap();
1428
1429        let config = test_config();
1430        let result = reconstruct_to_vec(&client, file_contents.file_hash, None, &config, None)
1431            .await
1432            .unwrap();
1433        assert_eq!(result, file_contents.data.as_ref());
1434    }
1435
1436    /// LocalClient with max_ranges_per_fetch=1 (every range gets its own fetch entry).
1437    #[tokio::test]
1438    async fn test_local_client_max_ranges_1_multi_xorb() {
1439        let client = LocalClient::temporary().await.unwrap();
1440        client.set_max_ranges_per_fetch(1);
1441
1442        let term_spec = &[(1, (0, 2)), (2, (0, 2)), (1, (4, 6)), (2, (4, 6))];
1443        let file_contents = client.upload_random_file(term_spec, TEST_CHUNK_SIZE).await.unwrap();
1444
1445        let config = test_config();
1446        let result = reconstruct_to_vec(&client, file_contents.file_hash, None, &config, None)
1447            .await
1448            .unwrap();
1449        assert_eq!(result, file_contents.data.as_ref());
1450    }
1451
1452    // ==================== Cancellation Flag Tests ====================
1453
1454    mod cancellation_tests {
1455        use tokio_util::sync::CancellationToken;
1456
1457        use super::*;
1458
1459        #[tokio::test]
1460        async fn test_cancellation_token_before_start() {
1461            let (client, file_contents) = setup_test_file(&[(1, (0, 3))]).await;
1462            let config = test_config();
1463
1464            let token = CancellationToken::new();
1465            token.cancel();
1466            let buffer = Arc::new(std::sync::Mutex::new(Cursor::new(Vec::new())));
1467            let writer = StaticCursorWriter(buffer.clone());
1468
1469            let bytes_written = FileReconstructor::new(&(client.clone() as Arc<dyn Client>), file_contents.file_hash)
1470                .with_config(&config)
1471                .with_cancellation_token(token)
1472                .reconstruct_to_writer(writer)
1473                .await
1474                .unwrap();
1475
1476            assert_eq!(bytes_written, 0);
1477        }
1478
1479        /// A writer that cancels a token after a certain number of writes,
1480        /// used to deterministically test mid-reconstruction cancellation.
1481        struct CancellingWriter {
1482            buffer: Arc<std::sync::Mutex<Vec<u8>>>,
1483            cancel_token: CancellationToken,
1484            write_count: AtomicUsize,
1485            cancel_after_writes: usize,
1486        }
1487
1488        impl Write for CancellingWriter {
1489            fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
1490                let n = self.buffer.lock().unwrap().write(buf)?;
1491                let count = self.write_count.fetch_add(1, Ordering::Relaxed) + 1;
1492                if count >= self.cancel_after_writes {
1493                    self.cancel_token.cancel();
1494                }
1495                Ok(n)
1496            }
1497
1498            fn flush(&mut self) -> std::io::Result<()> {
1499                Ok(())
1500            }
1501        }
1502
1503        #[tokio::test]
1504        async fn test_cancellation_token_during_reconstruction() {
1505            let term_spec: Vec<(u64, (u64, u64))> = (1..=10).map(|i| (i, (0, 5))).collect();
1506            let (client, file_contents) = setup_test_file(&term_spec).await;
1507            let config = test_config();
1508
1509            let token = CancellationToken::new();
1510            let buffer = Arc::new(std::sync::Mutex::new(Vec::new()));
1511
1512            let writer = CancellingWriter {
1513                buffer: buffer.clone(),
1514                cancel_token: token.clone(),
1515                write_count: AtomicUsize::new(0),
1516                cancel_after_writes: 1,
1517            };
1518
1519            // Use a tiny semaphore to force sequential term processing.
1520            let tiny_semaphore = AdjustableSemaphore::new(1, (1, 1));
1521
1522            let bytes_written = FileReconstructor::new(&(client.clone() as Arc<dyn Client>), file_contents.file_hash)
1523                .with_config(&config)
1524                .with_cancellation_token(token)
1525                .with_buffer_semaphore(tiny_semaphore)
1526                .reconstruct_to_writer(writer)
1527                .await
1528                .unwrap();
1529
1530            // Verify cancellation returned Ok(0) and only partial data was written.
1531            assert_eq!(bytes_written, 0);
1532            let written = buffer.lock().unwrap().len();
1533            assert!(written < file_contents.data.len());
1534        }
1535
1536        #[tokio::test]
1537        async fn test_cancellation_token_not_set_completes_normally() {
1538            let (client, file_contents) = setup_test_file(&[(1, (0, 3)), (2, (0, 2))]).await;
1539            let config = test_config();
1540
1541            let token = CancellationToken::new();
1542            let buffer = Arc::new(std::sync::Mutex::new(Cursor::new(Vec::new())));
1543            let writer = StaticCursorWriter(buffer.clone());
1544
1545            let bytes_written = FileReconstructor::new(&(client.clone() as Arc<dyn Client>), file_contents.file_hash)
1546                .with_config(&config)
1547                .with_cancellation_token(token)
1548                .reconstruct_to_writer(writer)
1549                .await
1550                .unwrap();
1551
1552            assert_eq!(bytes_written, file_contents.data.len() as u64);
1553            assert_eq!(buffer.lock().unwrap().get_ref().clone(), file_contents.data);
1554        }
1555    }
1556
1557    // ==================== Multirange Fetching Tests (LocalClient) ====================
1558
1559    mod multirange_tests {
1560        use super::*;
1561
1562        fn with_multirange_config(enable: bool) -> Arc<XetRuntime> {
1563            let mut config = xet_runtime::config::XetConfig::new();
1564            config.client.enable_multirange_fetching = enable;
1565            XetRuntime::new_with_config(config).unwrap()
1566        }
1567
1568        /// Exercises multiple disjoint-range scenarios through LocalClient with both
1569        /// enable_multirange_fetching=true and =false.
1570        #[test]
1571        fn test_multirange_local_client() {
1572            for enable in [false, true] {
1573                let rt = with_multirange_config(enable);
1574                rt.bridge_sync(async move {
1575                    let scenarios: Vec<Vec<(u64, (u64, u64))>> = vec![
1576                        vec![(1, (0, 2)), (1, (4, 6)), (1, (8, 10))],
1577                        vec![
1578                            (1, (0, 2)),
1579                            (2, (0, 2)),
1580                            (1, (4, 6)),
1581                            (2, (4, 6)),
1582                            (1, (8, 10)),
1583                            (2, (8, 10)),
1584                        ],
1585                        vec![
1586                            (1, (0, 2)),
1587                            (2, (0, 3)),
1588                            (3, (2, 5)),
1589                            (1, (5, 8)),
1590                            (2, (6, 8)),
1591                            (3, (0, 2)),
1592                        ],
1593                    ];
1594                    let config = test_config();
1595                    for term_spec in &scenarios {
1596                        let (client, fc) = setup_test_file(term_spec).await;
1597                        reconstruct_and_verify_full(&client, &fc, config.clone()).await;
1598
1599                        let file_len = fc.data.len() as u64;
1600                        let range = FileRange::new(file_len / 4, file_len * 3 / 4);
1601                        reconstruct_and_verify_range(&client, &fc, range, config.clone()).await;
1602                    }
1603                })
1604                .unwrap();
1605            }
1606        }
1607
1608        /// LocalClient with max_ranges_per_fetch constraint, both enable settings.
1609        #[test]
1610        fn test_multirange_max_ranges() {
1611            for enable in [false, true] {
1612                let rt = with_multirange_config(enable);
1613                rt.bridge_sync(async {
1614                    let client = LocalClient::temporary().await.unwrap();
1615                    client.set_max_ranges_per_fetch(2);
1616
1617                    let term_spec = &[(1, (0, 2)), (1, (4, 6)), (1, (8, 10)), (1, (12, 14))];
1618                    let fc = client.upload_random_file(term_spec, TEST_CHUNK_SIZE).await.unwrap();
1619
1620                    let config = test_config();
1621                    let result = reconstruct_to_vec(&client, fc.file_hash, None, &config, None).await.unwrap();
1622                    assert_eq!(result, fc.data.as_ref());
1623                })
1624                .unwrap();
1625            }
1626        }
1627    }
1628
1629    // ==================== Server-dependent tests (require simulation feature) ====================
1630    #[cfg(feature = "simulation")]
1631    mod server_tests {
1632        use super::*;
1633
1634        // ==================== V1 Fallback Tests ====================
1635        //
1636        // These tests use LocalTestServer with V2 disabled to verify that
1637        // reconstruction works correctly when the client falls back from V2 to V1.
1638
1639        /// Helper to reconstruct through a LocalTestServer (RemoteClient HTTP path).
1640        async fn reconstruct_via_server(
1641            server: &xet_client::cas_client::LocalTestServer,
1642            file_hash: MerkleHash,
1643            byte_range: Option<FileRange>,
1644            config: &ReconstructionConfig,
1645        ) -> Result<Vec<u8>> {
1646            let buffer = Arc::new(std::sync::Mutex::new(Cursor::new(Vec::new())));
1647            let writer = StaticCursorWriter(buffer.clone());
1648
1649            let client: Arc<dyn Client> = server.remote_client().clone();
1650            let mut reconstructor = FileReconstructor::new(&client, file_hash).with_config(config);
1651
1652            if let Some(range) = byte_range {
1653                reconstructor = reconstructor.with_byte_range(range);
1654            }
1655
1656            reconstructor.reconstruct_to_writer(writer).await?;
1657
1658            let data = buffer.lock().unwrap().get_ref().clone();
1659            Ok(data)
1660        }
1661
1662        #[tokio::test]
1663        async fn test_v1_fallback_full_reconstruction() {
1664            let server = xet_client::cas_client::LocalTestServerBuilder::new().start().await;
1665            let file_contents = server
1666                .remote_client()
1667                .upload_random_file(&[(1, (0, 3)), (2, (0, 2))], TEST_CHUNK_SIZE)
1668                .await
1669                .unwrap();
1670
1671            server.disable_v2_reconstruction(404);
1672
1673            let config = test_config();
1674            let result = reconstruct_via_server(&server, file_contents.file_hash, None, &config)
1675                .await
1676                .unwrap();
1677            assert_eq!(result, file_contents.data.as_ref());
1678        }
1679
1680        #[tokio::test]
1681        async fn test_v1_fallback_partial_range() {
1682            let server = xet_client::cas_client::LocalTestServerBuilder::new().start().await;
1683            let file_contents = server
1684                .remote_client()
1685                .upload_random_file(&[(1, (0, 5)), (2, (0, 3))], TEST_CHUNK_SIZE)
1686                .await
1687                .unwrap();
1688
1689            server.disable_v2_reconstruction(404);
1690
1691            let file_len = file_contents.data.len() as u64;
1692            let range = FileRange::new(file_len / 4, file_len * 3 / 4);
1693
1694            let config = test_config();
1695            let result = reconstruct_via_server(&server, file_contents.file_hash, Some(range), &config)
1696                .await
1697                .unwrap();
1698            assert_eq!(result, &file_contents.data[range.start as usize..range.end as usize]);
1699        }
1700
1701        #[tokio::test]
1702        async fn test_v1_fallback_non_contiguous_chunks() {
1703            let server = xet_client::cas_client::LocalTestServerBuilder::new().start().await;
1704            let file_contents = server
1705                .remote_client()
1706                .upload_random_file(&[(1, (0, 2)), (1, (4, 6))], TEST_CHUNK_SIZE)
1707                .await
1708                .unwrap();
1709
1710            server.disable_v2_reconstruction(404);
1711
1712            let config = test_config();
1713            let result = reconstruct_via_server(&server, file_contents.file_hash, None, &config)
1714                .await
1715                .unwrap();
1716            assert_eq!(result, file_contents.data.as_ref());
1717        }
1718
1719        #[tokio::test]
1720        async fn test_v1_fallback_multiple_xorbs() {
1721            let server = xet_client::cas_client::LocalTestServerBuilder::new().start().await;
1722            let file_contents = server
1723                .remote_client()
1724                .upload_random_file(&[(1, (0, 2)), (2, (0, 3)), (3, (0, 2)), (1, (2, 4))], TEST_CHUNK_SIZE)
1725                .await
1726                .unwrap();
1727
1728            server.disable_v2_reconstruction(404);
1729
1730            let config = test_config();
1731            let result = reconstruct_via_server(&server, file_contents.file_hash, None, &config)
1732                .await
1733                .unwrap();
1734            assert_eq!(result, file_contents.data.as_ref());
1735        }
1736
1737        /// V1 fallback with three disjoint ranges from the same xorb.
1738        #[tokio::test]
1739        async fn test_v1_fallback_triple_disjoint_ranges() {
1740            let server = xet_client::cas_client::LocalTestServerBuilder::new().start().await;
1741            let file_contents = server
1742                .remote_client()
1743                .upload_random_file(&[(1, (0, 2)), (1, (4, 6)), (1, (8, 10))], TEST_CHUNK_SIZE)
1744                .await
1745                .unwrap();
1746
1747            server.disable_v2_reconstruction(404);
1748
1749            let config = test_config();
1750            let result = reconstruct_via_server(&server, file_contents.file_hash, None, &config)
1751                .await
1752                .unwrap();
1753            assert_eq!(result, file_contents.data.as_ref());
1754        }
1755
1756        // ==================== Max Ranges Tests (via server) ====================
1757
1758        /// Helper to set up a server with max_ranges_per_fetch and reconstruct.
1759        async fn reconstruct_via_server_with_max_ranges(
1760            term_spec: &[(u64, (u64, u64))],
1761            max_ranges: usize,
1762            byte_range: Option<FileRange>,
1763        ) -> (Vec<u8>, RandomFileContents) {
1764            let server = xet_client::cas_client::LocalTestServerBuilder::new().start().await;
1765            let file_contents = server
1766                .remote_client()
1767                .upload_random_file(term_spec, TEST_CHUNK_SIZE)
1768                .await
1769                .unwrap();
1770
1771            server.set_max_ranges_per_fetch(max_ranges);
1772
1773            let config = test_config();
1774            let result = reconstruct_via_server(&server, file_contents.file_hash, byte_range, &config)
1775                .await
1776                .unwrap();
1777            (result, file_contents)
1778        }
1779
1780        #[tokio::test]
1781        async fn test_max_ranges_simple() {
1782            let (result, file_contents) =
1783                reconstruct_via_server_with_max_ranges(&[(1, (0, 3)), (2, (0, 2))], 2, None).await;
1784            assert_eq!(result, file_contents.data.as_ref());
1785        }
1786
1787        #[tokio::test]
1788        async fn test_max_ranges_1_disjoint() {
1789            let (result, file_contents) =
1790                reconstruct_via_server_with_max_ranges(&[(1, (0, 2)), (1, (4, 6))], 1, None).await;
1791            assert_eq!(result, file_contents.data.as_ref());
1792        }
1793
1794        #[tokio::test]
1795        async fn test_max_ranges_2_triple_disjoint() {
1796            let (result, file_contents) =
1797                reconstruct_via_server_with_max_ranges(&[(1, (0, 2)), (1, (4, 6)), (1, (8, 10))], 2, None).await;
1798            assert_eq!(result, file_contents.data.as_ref());
1799        }
1800
1801        #[tokio::test]
1802        async fn test_max_ranges_2_multi_xorb_disjoint() {
1803            let term_spec = &[
1804                (1, (0, 2)),
1805                (2, (0, 2)),
1806                (1, (4, 6)),
1807                (2, (4, 6)),
1808                (1, (8, 10)),
1809                (2, (8, 10)),
1810            ];
1811            let (result, file_contents) = reconstruct_via_server_with_max_ranges(term_spec, 2, None).await;
1812            assert_eq!(result, file_contents.data.as_ref());
1813        }
1814
1815        #[tokio::test]
1816        async fn test_max_ranges_2_partial_range() {
1817            let term_spec = &[
1818                (1, (0, 3)),
1819                (2, (0, 2)),
1820                (1, (3, 5)),
1821                (3, (1, 4)),
1822                (2, (4, 6)),
1823                (1, (0, 2)),
1824            ];
1825            let server = xet_client::cas_client::LocalTestServerBuilder::new().start().await;
1826            let file_contents = server
1827                .remote_client()
1828                .upload_random_file(term_spec, TEST_CHUNK_SIZE)
1829                .await
1830                .unwrap();
1831
1832            server.set_max_ranges_per_fetch(2);
1833
1834            let file_len = file_contents.data.len() as u64;
1835            let range = FileRange::new(file_len / 4, file_len * 3 / 4);
1836
1837            let config = test_config();
1838            let result = reconstruct_via_server(&server, file_contents.file_hash, Some(range), &config)
1839                .await
1840                .unwrap();
1841            assert_eq!(result, &file_contents.data[range.start as usize..range.end as usize]);
1842        }
1843
1844        // ==================== Multirange via Server ====================
1845
1846        fn with_multirange_config(enable: bool) -> Arc<XetRuntime> {
1847            let mut config = xet_runtime::config::XetConfig::new();
1848            config.client.enable_multirange_fetching = enable;
1849            XetRuntime::new_with_config(config).unwrap()
1850        }
1851
1852        /// Exercises HTTP server path with full, max-ranges-split, and partial-range
1853        /// reconstruction, both enable_multirange_fetching values.
1854        #[test]
1855        fn test_multirange_via_server() {
1856            for enable in [false, true] {
1857                let rt = with_multirange_config(enable);
1858                rt.bridge_sync(async {
1859                    let config = test_config();
1860
1861                    // Full reconstruction with disjoint ranges
1862                    let server = xet_client::cas_client::LocalTestServerBuilder::new().start().await;
1863                    let fc = server
1864                        .remote_client()
1865                        .upload_random_file(&[(1, (0, 2)), (1, (4, 6)), (1, (8, 10))], TEST_CHUNK_SIZE)
1866                        .await
1867                        .unwrap();
1868                    let result = reconstruct_via_server(&server, fc.file_hash, None, &config).await.unwrap();
1869                    assert_eq!(result, fc.data.as_ref());
1870
1871                    // Multi-xorb with max_ranges_per_fetch=2
1872                    let server = xet_client::cas_client::LocalTestServerBuilder::new().start().await;
1873                    let fc = server
1874                        .remote_client()
1875                        .upload_random_file(
1876                            &[(1, (0, 2)), (2, (0, 2)), (1, (4, 6)), (2, (4, 6)), (1, (8, 10))],
1877                            TEST_CHUNK_SIZE,
1878                        )
1879                        .await
1880                        .unwrap();
1881                    server.set_max_ranges_per_fetch(2);
1882                    let result = reconstruct_via_server(&server, fc.file_hash, None, &config).await.unwrap();
1883                    assert_eq!(result, fc.data.as_ref());
1884
1885                    // Partial byte range
1886                    let server = xet_client::cas_client::LocalTestServerBuilder::new().start().await;
1887                    let fc = server
1888                        .remote_client()
1889                        .upload_random_file(&[(1, (0, 3)), (2, (0, 2)), (1, (3, 5)), (2, (4, 6))], TEST_CHUNK_SIZE)
1890                        .await
1891                        .unwrap();
1892                    let file_len = fc.data.len() as u64;
1893                    let range = FileRange::new(file_len / 4, file_len * 3 / 4);
1894                    let result = reconstruct_via_server(&server, fc.file_hash, Some(range), &config)
1895                        .await
1896                        .unwrap();
1897                    assert_eq!(result, &fc.data[range.start as usize..range.end as usize]);
1898                })
1899                .unwrap();
1900            }
1901        }
1902    } // mod server_tests
1903}