Skip to main content

xet_data/file_reconstruction/reconstruction_terms/
file_term.rs

1use std::collections::HashMap;
2use std::collections::hash_map::Entry;
3use std::sync::Arc;
4
5use bytes::Bytes;
6use tokio::sync::OnceCell;
7use xet_client::cas_client::Client;
8use xet_client::cas_types::{ChunkRange, FileRange, HttpRange};
9use xet_client::chunk_cache::ChunkCache;
10use xet_core_structures::merklehash::MerkleHash;
11use xet_runtime::core::xet_config;
12use xet_runtime::utils::UniqueId;
13
14use super::super::FileReconstructionError;
15use super::super::data_writer::DataFuture;
16use super::super::error::Result;
17use super::retrieval_urls::TermBlockRetrievalURLs;
18use super::xorb_block::{XorbBlock, XorbBlockData, XorbReference};
19use crate::progress_tracking::ItemProgressUpdater;
20/// A single term in a file reconstruction, representing a contiguous byte range
21/// in the output file that maps to a chunk range within a xorb block.
22#[derive(Clone)]
23pub struct FileTerm {
24    // The byte range in the file of this term.
25    pub byte_range: FileRange,
26
27    // Absolute chunk range within the full xorb.  Doesn't account for only a partial xorb being downloaded.
28    pub xorb_chunk_range: ChunkRange,
29
30    // The index of the (chunk index, byte offset) pair in the xorb block that starts this file term.
31    pub xorb_block_start_index: usize,
32
33    // The byte offset into the first range of the xorb block should this term not start on a chunk boundary.
34    pub offset_into_first_range: u64,
35
36    // The xorb block that sourced this file term.
37    pub xorb_block: Arc<XorbBlock>,
38
39    // The retrieval URL information for this file term.
40    pub url_info: Arc<TermBlockRetrievalURLs>,
41}
42
43impl FileTerm {
44    pub fn extract_bytes(&self, xorb_block_data: &XorbBlockData) -> Bytes {
45        let (_, start_byte_offset) = xorb_block_data.chunk_offsets[self.xorb_block_start_index];
46        let start_byte_offset = start_byte_offset + self.offset_into_first_range as usize;
47        let expected_size = (self.byte_range.end - self.byte_range.start) as usize;
48        let end_byte_offset = start_byte_offset + expected_size;
49
50        xorb_block_data.data.slice(start_byte_offset..end_byte_offset)
51    }
52
53    /// Get a future that will retrieve and extract the data bytes for this file term.
54    ///
55    /// If the xorb data is already cached, returns a future that immediately resolves (no progress
56    /// report, since the block was already reported by the term that triggered the download).
57    /// Otherwise, spawns a task that calls retrieve_data, which uses single-flight to ensure
58    /// only one download per xorb block (other callers wait without acquiring CAS permits).
59    pub async fn get_data_task(
60        &self,
61        client: Arc<dyn Client>,
62        progress_updater: Option<Arc<ItemProgressUpdater>>,
63        chunk_cache: Option<Arc<dyn ChunkCache>>,
64    ) -> Result<DataFuture> {
65        // Fast path: data already cached, no need to spawn a task.
66        if let Some(xorb_block_data) = self.xorb_block.data.get() {
67            let bytes = self.extract_bytes(xorb_block_data);
68            return Ok(Box::pin(async move { Ok(bytes) }));
69        }
70
71        let file_term = self.clone();
72        let url_info = self.url_info.clone();
73        let xorb_block = self.xorb_block.clone();
74
75        let task = tokio::task::spawn(async move {
76            let xorb_block_data = xorb_block
77                .retrieve_data(client, url_info, progress_updater, chunk_cache)
78                .await?;
79            Ok(file_term.extract_bytes(&xorb_block_data))
80        });
81
82        Ok(Box::pin(async move { task.await? }))
83    }
84}
85
86/// Intermediate data for a single file term, collected during the first pass of
87/// `retrieve_file_term_block` before the final `FileTerm` structs are built.
88///
89/// We need this because `FileTerm` requires `Arc<XorbBlock>` and `Arc<TermBlockRetrievalURLs>`,
90/// which can't be constructed until all terms have been processed.
91struct FileTermEntry {
92    /// The byte range in the output file that this term covers.
93    byte_range: FileRange,
94    /// The chunk range within the xorb that sources this term's data.
95    xorb_chunk_range: ChunkRange,
96    /// Byte offset into the first chunk's data, non-zero only for the first term
97    /// when the query range starts mid-chunk.
98    offset_into_first_range: u64,
99    /// Index into the `xorb_blocks` / `xorb_block_retrieval_urls` vectors.
100    xorb_block_index: usize,
101    /// Flattened index into the xorb block's `chunk_offsets` for this term's start chunk.
102    xorb_block_start_index: usize,
103}
104
105/// Retrieve file terms from the client for a given file hash and byte range.
106/// Returns None if the requested byte range is past the end of the file.
107/// Returns the actual retrieved range and the number of bytes required for the
108/// download (with dedup and compression enabled)
109/// along with the Vec<FileTerm>.
110pub async fn retrieve_file_term_block(
111    client: Arc<dyn Client>,
112    file_hash: MerkleHash,
113    query_file_byte_range: FileRange,
114) -> Result<Option<(FileRange, u64, Vec<FileTerm>)>> {
115    // get_reconstruction always returns V2 format (the client converts V1 internally).
116    let Some(raw_reconstruction) = client.get_reconstruction(&file_hash, Some(query_file_byte_range)).await? else {
117        // None means we've requested a byte range beyond the end of the file.
118        return Ok(None);
119    };
120
121    // Each acquisition gets a unique ID used for single-flight URL refresh dedup.
122    let acquisition_id = UniqueId::new();
123
124    // First pass: iterate through the reconstruction terms and build up intermediate
125    // FileTermEntry data, XorbBlock objects, and retrieval URL info.  We can't construct
126    // the final FileTerm structs yet because they need Arc<XorbBlock> and Arc<TermBlockRetrievalURLs>,
127    // which require all terms to be processed first.
128    let mut file_term_data = Vec::<FileTermEntry>::with_capacity(raw_reconstruction.terms.len());
129
130    // Parallel vectors indexed by xorb_block_index:
131    // - xorb_blocks: the block metadata (hash, chunk ranges, references)
132    // - xorb_block_retrieval_urls: the download URL and byte ranges for each block
133    let mut xorb_blocks: Vec<XorbBlock> = Vec::new();
134    let mut xorb_block_retrieval_urls = Vec::<(String, Vec<HttpRange>)>::new();
135
136    // Dedup map: (xorb_hash, first_range_chunk_start) -> xorb_block_index.
137    // Multiple terms may reference the same xorb block; this ensures we create
138    // each block only once and share it across terms.
139    let mut xorb_index_lookup = HashMap::<(MerkleHash, u32), usize>::new();
140
141    // Track the current byte offset in the output file as we process terms sequentially.
142    let mut cur_file_byte_offset = query_file_byte_range.start;
143
144    let enable_multirange = xet_config().client.enable_multirange_fetching;
145
146    for (local_term_index, term) in raw_reconstruction.terms.iter().enumerate() {
147        let xorb_hash: MerkleHash = term.hash.into();
148
149        let Some(xorb_descriptor) = raw_reconstruction.xorbs.get(&term.hash) else {
150            return Err(FileReconstructionError::CorruptedReconstruction(format!(
151                "Xorb info not found for xorb hash {xorb_hash:?}"
152            )));
153        };
154
155        // Find the XorbBlock for this term's chunk range. The behavior depends on the
156        // enable_multirange_fetching config:
157        //
158        // - When true: one XorbBlock per XorbMultiRangeFetch entry, preserving all ranges in a single block
159        //   (multi-range HTTP request).
160        // - When false (default): one XorbBlock per individual XorbRangeDescriptor, so each range is fetched as a
161        //   separate single-range HTTP request in parallel.
162        let xorb_block_index = 'find_xorb_block: {
163            for fetch_entry in xorb_descriptor.iter() {
164                if enable_multirange {
165                    let term_contained = fetch_entry
166                        .ranges
167                        .iter()
168                        .any(|r| r.chunks.start <= term.range.start && term.range.end <= r.chunks.end);
169
170                    if !term_contained {
171                        continue;
172                    }
173
174                    let first_chunk_start = fetch_entry.ranges[0].chunks.start;
175
176                    let index = match xorb_index_lookup.entry((xorb_hash, first_chunk_start)) {
177                        Entry::Occupied(entry) => *entry.get(),
178                        Entry::Vacant(entry) => {
179                            let new_index = xorb_blocks.len();
180
181                            let chunk_ranges: Vec<ChunkRange> = fetch_entry.ranges.iter().map(|r| r.chunks).collect();
182                            let http_ranges: Vec<HttpRange> = fetch_entry.ranges.iter().map(|r| r.bytes).collect();
183
184                            xorb_blocks.push(XorbBlock {
185                                xorb_hash,
186                                chunk_ranges,
187                                xorb_block_index: new_index,
188                                references: vec![],
189                                uncompressed_size_if_known: None,
190                                data: OnceCell::new(),
191                            });
192
193                            xorb_block_retrieval_urls.push((fetch_entry.url.clone(), http_ranges));
194
195                            entry.insert(new_index);
196                            new_index
197                        },
198                    };
199
200                    break 'find_xorb_block index;
201                } else {
202                    for range in &fetch_entry.ranges {
203                        if range.chunks.start <= term.range.start && term.range.end <= range.chunks.end {
204                            let index = match xorb_index_lookup.entry((xorb_hash, range.chunks.start)) {
205                                Entry::Occupied(entry) => *entry.get(),
206                                Entry::Vacant(entry) => {
207                                    let new_index = xorb_blocks.len();
208
209                                    xorb_blocks.push(XorbBlock {
210                                        xorb_hash,
211                                        chunk_ranges: vec![range.chunks],
212                                        xorb_block_index: new_index,
213                                        references: vec![],
214                                        uncompressed_size_if_known: None,
215                                        data: OnceCell::new(),
216                                    });
217
218                                    xorb_block_retrieval_urls.push((fetch_entry.url.clone(), vec![range.bytes]));
219
220                                    entry.insert(new_index);
221                                    new_index
222                                },
223                            };
224
225                            break 'find_xorb_block index;
226                        }
227                    }
228                }
229            }
230            return Err(FileReconstructionError::CorruptedReconstruction(format!(
231                "No xorb fetch entry found for file term {local_term_index:?} in xorb info for xorb hash {xorb_hash:?}"
232            )));
233        };
234
235        // Only the first term can have a non-zero offset into its first chunk,
236        // which happens when the query byte range starts mid-chunk.
237        let offset_into_first_range = if local_term_index == 0 {
238            raw_reconstruction.offset_into_first_range
239        } else {
240            0
241        };
242
243        // The term's contribution to the output file is its full uncompressed size
244        // minus any offset into the first chunk.
245        let term_byte_size = term.unpacked_length as u64 - offset_into_first_range;
246
247        // Record this term as a reference on its xorb block (used later to determine
248        // whether the block's total uncompressed size can be inferred).
249        xorb_blocks[xorb_block_index].references.push(XorbReference {
250            term_chunks: term.range,
251            uncompressed_size: term.unpacked_length as usize,
252        });
253
254        // Compute the flattened index into the block's chunk_offsets for this term's
255        // starting chunk. This accounts for disjoint chunk ranges in multi-range blocks.
256        //
257        // The term_contained check above guarantees term.range.start falls within one of
258        // the block's chunk_ranges, so this loop always finds a match.
259        let xorb_block_start_index = {
260            let chunk_start = term.range.start;
261            let chunk_ranges = &xorb_blocks[xorb_block_index].chunk_ranges;
262            let mut idx = 0;
263            let mut found = false;
264            for range in chunk_ranges {
265                if chunk_start >= range.start && chunk_start < range.end {
266                    idx += (chunk_start - range.start) as usize;
267                    found = true;
268                    break;
269                }
270                idx += (range.end - range.start) as usize;
271            }
272            if !found {
273                return Err(FileReconstructionError::CorruptedReconstruction(format!(
274                    "chunk_start {chunk_start} not found in chunk_ranges {chunk_ranges:?} for file term {local_term_index}"
275                )));
276            }
277            idx
278        };
279
280        file_term_data.push(FileTermEntry {
281            byte_range: FileRange::new(cur_file_byte_offset, cur_file_byte_offset + term_byte_size),
282            xorb_chunk_range: term.range,
283            offset_into_first_range,
284            xorb_block_index,
285            xorb_block_start_index,
286        });
287
288        cur_file_byte_offset += term_byte_size;
289    }
290
291    // Sort each block's references by chunk start so that determine_size_if_possible
292    // can use its forward-chaining DP to check coverage.
293    for block in &mut xorb_blocks {
294        block.references.sort_by_key(|r| r.term_chunks.start);
295        block.uncompressed_size_if_known =
296            XorbBlock::determine_size_if_possible(&block.chunk_ranges, &block.references);
297    }
298
299    // The last term in the reconstruction may extend beyond the requested range
300    // (e.g. when the query ends mid-chunk). Trim it to the query boundary.
301    if cur_file_byte_offset > query_file_byte_range.end {
302        let last_term_shrinkage = cur_file_byte_offset - query_file_byte_range.end;
303
304        debug_assert!(!file_term_data.is_empty());
305
306        if let Some(entry) = file_term_data.last_mut() {
307            entry.byte_range.end -= last_term_shrinkage;
308        }
309    }
310
311    // The actual range covered, which may be smaller than requested if the file
312    // ends before the requested range.
313    let actual_range = FileRange::new(
314        file_term_data.first().map(|e| e.byte_range.start).unwrap_or(0),
315        file_term_data.last().map(|e| e.byte_range.end).unwrap_or(0),
316    );
317
318    // Total compressed bytes that will be transferred across all xorb block downloads.
319    let total_transfer_bytes: u64 = xorb_block_retrieval_urls
320        .iter()
321        .flat_map(|(_, ranges)| ranges)
322        .map(|r| r.length())
323        .sum();
324
325    // Wrap the retrieval URLs in a shared struct so all file terms can share them
326    // and coordinate URL refreshes through a single lock.
327    let url_info =
328        Arc::new(TermBlockRetrievalURLs::new(file_hash, actual_range, acquisition_id, xorb_block_retrieval_urls));
329
330    // Second pass: convert the intermediate FileTermEntry data into final FileTerm
331    // structs, now that we can wrap xorb blocks in Arc and share the url_info.
332    let xorb_blocks_arc: Vec<Arc<XorbBlock>> = xorb_blocks.into_iter().map(Arc::new).collect();
333
334    let file_terms: Vec<FileTerm> = file_term_data
335        .into_iter()
336        .map(|entry| FileTerm {
337            byte_range: entry.byte_range,
338            xorb_chunk_range: entry.xorb_chunk_range,
339            xorb_block_start_index: entry.xorb_block_start_index,
340            offset_into_first_range: entry.offset_into_first_range,
341            xorb_block: xorb_blocks_arc[entry.xorb_block_index].clone(),
342            url_info: url_info.clone(),
343        })
344        .collect();
345
346    Ok(Some((actual_range, total_transfer_bytes, file_terms)))
347}
348
349#[cfg(test)]
350mod tests {
351    use std::sync::Arc;
352
353    use more_asserts::assert_le;
354    use xet_client::cas_client::{ClientTestingUtils, LocalClient, RandomFileContents};
355    use xet_client::cas_types::{ChunkRange, FileRange};
356    use xet_runtime::utils::UniqueId;
357
358    use super::*;
359
360    const TEST_CHUNK_SIZE: usize = 101;
361
362    fn verify_xorb_block_references(file_terms: &[FileTerm]) {
363        for file_term in file_terms {
364            let refs = &file_term.xorb_block.references;
365            assert!(
366                refs.iter().any(|r| r.term_chunks == file_term.xorb_chunk_range),
367                "xorb_chunk_range {:?} must be in block references {:?}",
368                file_term.xorb_chunk_range,
369                refs.as_slice()
370            );
371        }
372        let mut seen_blocks = std::collections::HashSet::new();
373        for file_term in file_terms {
374            if seen_blocks.insert(file_term.xorb_block.xorb_block_index) {
375                let refs = &file_term.xorb_block.references;
376                for w in refs.windows(2) {
377                    assert_le!(w[0].term_chunks.start, w[1].term_chunks.start);
378                }
379            }
380        }
381    }
382
383    /// Creates a test client and uploads a random file with the given term specification.
384    /// Returns the client and file contents for verification.
385    async fn setup_test_file(term_spec: &[(u64, (u64, u64))]) -> (Arc<LocalClient>, RandomFileContents) {
386        let client = LocalClient::temporary().await.unwrap();
387        let file_contents = client.upload_random_file(term_spec, TEST_CHUNK_SIZE).await.unwrap();
388        (client, file_contents)
389    }
390
391    /// Retrieves file terms and thoroughly verifies their correctness.
392    ///
393    /// If `requested_range` is None, retrieves the full file range.
394    ///
395    /// This function:
396    /// - Retrieves file terms from the client for the given range
397    /// - Verifies file terms are contiguous and cover the range
398    /// - Verifies each file term's xorb block references are valid
399    /// - Verifies chunk ranges are within xorb block boundaries
400    /// - Cross-references with the known file contents for correctness
401    /// - Verifies number of file terms matches expected from term_spec
402    async fn retrieve_and_verify(
403        client: &Arc<LocalClient>,
404        file_contents: &RandomFileContents,
405        requested_range: Option<FileRange>,
406    ) {
407        let requested_range = requested_range.unwrap_or_else(|| FileRange::new(0, file_contents.data.len() as u64));
408        let dyn_client: Arc<dyn Client> = client.clone();
409
410        let (returned_range, _, file_terms) =
411            retrieve_file_term_block(dyn_client.clone(), file_contents.file_hash, requested_range)
412                .await
413                .expect("retrieve_file_term_block should succeed")
414                .expect("file_terms should not be None for valid range");
415
416        // Verify the returned range matches the requested range.
417        assert_eq!(returned_range, requested_range);
418
419        // Track position within the requested range.
420        let mut current_pos = requested_range.start;
421        let mut file_term_data_offset = 0usize;
422
423        // Find the starting term index in file_contents based on requested_range.start.
424        let mut expected_term_idx = 0;
425        let mut byte_offset = 0u64;
426        for (idx, term) in file_contents.terms.iter().enumerate() {
427            let term_end = byte_offset + term.data.len() as u64;
428            if term_end > requested_range.start {
429                expected_term_idx = idx;
430                file_term_data_offset = (requested_range.start - byte_offset) as usize;
431                break;
432            }
433            byte_offset = term_end;
434        }
435
436        // Collect unique xorb block indices to verify count
437        let mut seen_xorb_indices = std::collections::HashSet::new();
438
439        // Now verify actual data reconstruction by fetching all file terms.
440        let mut reconstructed_data = Vec::with_capacity((requested_range.end - requested_range.start) as usize);
441        let mut term_count = 0;
442
443        for file_term in &file_terms {
444            // Verify byte range is contiguous.
445            assert_eq!(file_term.byte_range.start, current_pos);
446            assert!(file_term.byte_range.end > file_term.byte_range.start);
447            assert_le!(file_term.byte_range.end, requested_range.end);
448
449            // Track xorb block index
450            seen_xorb_indices.insert(file_term.xorb_block.xorb_block_index);
451
452            // Verify chunk range is within xorb block boundaries: the term's chunk range
453            // must be contained within at least one of the block's chunk ranges.
454            let xorb_block = &file_term.xorb_block;
455            let term_in_some_range = xorb_block
456                .chunk_ranges
457                .iter()
458                .any(|cr| file_term.xorb_chunk_range.start >= cr.start && file_term.xorb_chunk_range.end <= cr.end);
459            assert!(
460                term_in_some_range,
461                "term chunk range {:?} not within any block chunk range {:?}",
462                file_term.xorb_chunk_range, xorb_block.chunk_ranges
463            );
464
465            // Cross-reference with known file contents.
466            if expected_term_idx < file_contents.terms.len() {
467                let expected_term = &file_contents.terms[expected_term_idx];
468
469                // Verify xorb hash matches.
470                assert_eq!(xorb_block.xorb_hash, expected_term.xorb_hash);
471
472                // Verify chunk range matches (accounting for partial first term).
473                if file_term_data_offset == 0 {
474                    assert_eq!(file_term.xorb_chunk_range.start, expected_term.chunk_start);
475                }
476            }
477
478            // Verify all xorb blocks referenced have valid hashes.
479            assert!(file_contents.xorbs.contains_key(&file_term.xorb_block.xorb_hash));
480
481            // Get the data task and await it.
482            let data_future = file_term.get_data_task(dyn_client.clone(), None, None).await.unwrap();
483            let data = data_future.await.unwrap();
484
485            // Verify the data size matches the byte range.
486            let expected_size = (file_term.byte_range.end - file_term.byte_range.start) as usize;
487            assert_eq!(data.len(), expected_size, "Term {term_count} data size mismatch");
488
489            reconstructed_data.extend_from_slice(&data);
490
491            current_pos = file_term.byte_range.end;
492            expected_term_idx += 1;
493            file_term_data_offset = 0;
494            term_count += 1;
495        }
496
497        // Verify we covered the entire requested range.
498        assert_eq!(current_pos, requested_range.end);
499
500        // For full file range, verify we have the expected number of file terms.
501        if requested_range.start == 0 && requested_range.end == file_contents.data.len() as u64 {
502            assert_eq!(term_count, file_contents.terms.len());
503        }
504
505        // Compare reconstructed data with expected file contents.
506        let expected_data = &file_contents.data[requested_range.start as usize..requested_range.end as usize];
507        assert_eq!(reconstructed_data.len(), expected_data.len());
508        assert_eq!(reconstructed_data, expected_data);
509
510        verify_xorb_block_references(&file_terms);
511    }
512
513    // ==================== Test Cases ====================
514
515    #[tokio::test]
516    async fn test_xorb_block_references_exact() {
517        let (client, file_contents) = setup_test_file(&[(1, (0, 2)), (1, (2, 4)), (1, (4, 6))]).await;
518        let file_range = FileRange::new(0, file_contents.data.len() as u64);
519        let dyn_client: Arc<dyn Client> = client.clone();
520        let (_, _, file_terms) = retrieve_file_term_block(dyn_client, file_contents.file_hash, file_range)
521            .await
522            .unwrap()
523            .unwrap();
524        verify_xorb_block_references(&file_terms);
525        assert_eq!(file_terms.len(), 3);
526        let block = &file_terms[0].xorb_block;
527        let ref_ranges: Vec<ChunkRange> = block.references.iter().map(|r| r.term_chunks).collect();
528        let expected = vec![ChunkRange::new(0, 2), ChunkRange::new(2, 4), ChunkRange::new(4, 6)];
529        assert_eq!(ref_ranges, expected);
530
531        let (client2, file_contents2) = setup_test_file(&[(1, (0, 5)), (1, (0, 5))]).await;
532        let file_range2 = FileRange::new(0, file_contents2.data.len() as u64);
533        let dyn_client2: Arc<dyn Client> = client2.clone();
534        let (_, _, file_terms2) = retrieve_file_term_block(dyn_client2, file_contents2.file_hash, file_range2)
535            .await
536            .unwrap()
537            .unwrap();
538        verify_xorb_block_references(&file_terms2);
539        let block2 = &file_terms2[0].xorb_block;
540        let ref_ranges2: Vec<ChunkRange> = block2.references.iter().map(|r| r.term_chunks).collect();
541        let expected2 = vec![ChunkRange::new(0, 5), ChunkRange::new(0, 5)];
542        assert_eq!(ref_ranges2, expected2);
543    }
544
545    #[tokio::test]
546    async fn test_single_xorb_full_range() {
547        let (client, file_contents) = setup_test_file(&[(1, (0, 5))]).await;
548        retrieve_and_verify(&client, &file_contents, None).await;
549    }
550
551    #[tokio::test]
552    async fn test_multiple_terms_same_xorb() {
553        let (client, file_contents) = setup_test_file(&[(1, (0, 2)), (1, (2, 4)), (1, (4, 6))]).await;
554        retrieve_and_verify(&client, &file_contents, None).await;
555    }
556
557    #[tokio::test]
558    async fn test_multiple_xorbs() {
559        let (client, file_contents) = setup_test_file(&[(1, (0, 3)), (2, (0, 2)), (3, (0, 4))]).await;
560        retrieve_and_verify(&client, &file_contents, None).await;
561    }
562
563    #[tokio::test]
564    async fn test_overlapping_chunk_ranges() {
565        let (client, file_contents) = setup_test_file(&[(1, (0, 5)), (1, (1, 3)), (1, (2, 4))]).await;
566        retrieve_and_verify(&client, &file_contents, None).await;
567    }
568
569    #[tokio::test]
570    async fn test_partial_range_middle() {
571        let (client, file_contents) = setup_test_file(&[(1, (0, 10))]).await;
572        let file_len = file_contents.data.len() as u64;
573        retrieve_and_verify(&client, &file_contents, Some(FileRange::new(file_len / 4, file_len * 3 / 4))).await;
574    }
575
576    #[tokio::test]
577    async fn test_partial_range_start() {
578        let (client, file_contents) = setup_test_file(&[(1, (0, 10))]).await;
579        let file_len = file_contents.data.len() as u64;
580        retrieve_and_verify(&client, &file_contents, Some(FileRange::new(0, file_len / 2))).await;
581    }
582
583    #[tokio::test]
584    async fn test_partial_range_end() {
585        let (client, file_contents) = setup_test_file(&[(1, (0, 10))]).await;
586        let file_len = file_contents.data.len() as u64;
587        retrieve_and_verify(&client, &file_contents, Some(FileRange::new(file_len / 2, file_len))).await;
588    }
589
590    #[tokio::test]
591    async fn test_beyond_file_end() {
592        let (client, file_contents) = setup_test_file(&[(1, (0, 3))]).await;
593        let file_len = file_contents.data.len() as u64;
594        let beyond_range = FileRange::new(file_len + 1000, file_len + 2000);
595
596        let dyn_client: Arc<dyn Client> = client.clone();
597        let result = retrieve_file_term_block(dyn_client, file_contents.file_hash, beyond_range).await;
598
599        match result {
600            Ok(None) => {},
601            Ok(Some((_, _, file_terms))) => assert!(file_terms.is_empty()),
602            Err(_) => {},
603        }
604    }
605
606    #[tokio::test]
607    async fn test_interleaved_xorbs() {
608        let (client, file_contents) = setup_test_file(&[(1, (0, 2)), (2, (0, 2)), (1, (2, 4)), (2, (2, 4))]).await;
609        retrieve_and_verify(&client, &file_contents, None).await;
610    }
611
612    #[tokio::test]
613    async fn test_non_contiguous_chunks() {
614        let (client, file_contents) = setup_test_file(&[(1, (0, 2)), (1, (4, 6))]).await;
615        retrieve_and_verify(&client, &file_contents, None).await;
616    }
617
618    #[tokio::test]
619    async fn test_adjacent_chunks() {
620        let (client, file_contents) = setup_test_file(&[(1, (0, 3)), (1, (3, 5))]).await;
621        retrieve_and_verify(&client, &file_contents, None).await;
622    }
623
624    #[tokio::test]
625    async fn test_single_chunk_terms() {
626        let (client, file_contents) =
627            setup_test_file(&[(1, (0, 1)), (1, (1, 2)), (1, (2, 3)), (2, (0, 1)), (2, (1, 2))]).await;
628        retrieve_and_verify(&client, &file_contents, None).await;
629    }
630
631    #[tokio::test]
632    async fn test_large_file_many_xorbs() {
633        let term_spec: Vec<(u64, (u64, u64))> = (1..=10).map(|i| (i, (0, 3))).collect();
634        let (client, file_contents) = setup_test_file(&term_spec).await;
635        retrieve_and_verify(&client, &file_contents, None).await;
636    }
637
638    #[tokio::test]
639    async fn test_xorb_block_deduplication() {
640        let (client, file_contents) = setup_test_file(&[(1, (0, 5)), (1, (0, 5))]).await;
641        retrieve_and_verify(&client, &file_contents, None).await;
642    }
643
644    #[tokio::test]
645    async fn test_retrieval_url_acquisition() {
646        let (client, file_contents) = setup_test_file(&[(1, (0, 5))]).await;
647        let file_range = FileRange::new(0, file_contents.data.len() as u64);
648        let dyn_client: Arc<dyn Client> = client.clone();
649
650        let (_, _, file_terms) = retrieve_file_term_block(dyn_client, file_contents.file_hash, file_range)
651            .await
652            .unwrap()
653            .unwrap();
654
655        // Get the first file term's xorb block to test URL retrieval
656        let file_term = &file_terms[0];
657        let xorb_block_index = file_term.xorb_block.xorb_block_index;
658        let (unique_id, url, http_ranges) = file_term.url_info.get_retrieval_url(xorb_block_index).await;
659
660        assert!(!url.is_empty());
661        assert!(!http_ranges.is_empty());
662        assert!(http_ranges[0].start <= http_ranges[0].end);
663        assert!(unique_id != UniqueId::null());
664    }
665
666    #[tokio::test]
667    async fn test_complex_mixed_pattern() {
668        let term_spec = &[
669            (1, (0, 3)),
670            (2, (0, 2)),
671            (1, (3, 5)),
672            (3, (1, 4)),
673            (2, (4, 6)),
674            (1, (0, 2)),
675        ];
676        let (client, file_contents) = setup_test_file(term_spec).await;
677        retrieve_and_verify(&client, &file_contents, None).await;
678    }
679
680    #[tokio::test]
681    async fn test_repeated_xorb_different_ranges() {
682        let (client, file_contents) = setup_test_file(&[(1, (0, 2)), (1, (3, 5)), (1, (1, 3)), (1, (4, 6))]).await;
683        retrieve_and_verify(&client, &file_contents, None).await;
684    }
685
686    #[tokio::test]
687    async fn test_single_chunk_file() {
688        let (client, file_contents) = setup_test_file(&[(1, (0, 1))]).await;
689        retrieve_and_verify(&client, &file_contents, None).await;
690    }
691
692    #[tokio::test]
693    async fn test_many_small_terms_from_different_xorbs() {
694        let term_spec: Vec<(u64, (u64, u64))> = (1..=20).map(|i| (i, (0, 1))).collect();
695        let (client, file_contents) = setup_test_file(&term_spec).await;
696        retrieve_and_verify(&client, &file_contents, None).await;
697    }
698
699    #[tokio::test]
700    async fn test_range_few_bytes_before_end() {
701        let (client, file_contents) = setup_test_file(&[(1, (0, 5))]).await;
702        let file_len = file_contents.data.len() as u64;
703
704        let range = FileRange::new(0, file_len - 3);
705        retrieve_and_verify(&client, &file_contents, Some(range)).await;
706
707        let range = FileRange::new(0, file_len - 1);
708        retrieve_and_verify(&client, &file_contents, Some(range)).await;
709    }
710
711    #[tokio::test]
712    async fn test_range_few_bytes_after_start() {
713        let (client, file_contents) = setup_test_file(&[(1, (0, 5))]).await;
714        let file_len = file_contents.data.len() as u64;
715
716        let range = FileRange::new(3, file_len);
717        retrieve_and_verify(&client, &file_contents, Some(range)).await;
718
719        let range = FileRange::new(1, file_len);
720        retrieve_and_verify(&client, &file_contents, Some(range)).await;
721    }
722
723    #[tokio::test]
724    async fn test_range_few_bytes_offset_both_ends() {
725        let (client, file_contents) = setup_test_file(&[(1, (0, 5))]).await;
726        let file_len = file_contents.data.len() as u64;
727
728        let range = FileRange::new(2, file_len - 2);
729        retrieve_and_verify(&client, &file_contents, Some(range)).await;
730
731        let range = FileRange::new(file_len / 2 - 1, file_len / 2 + 1);
732        retrieve_and_verify(&client, &file_contents, Some(range)).await;
733    }
734
735    #[tokio::test]
736    async fn test_range_single_byte_at_various_positions() {
737        let (client, file_contents) = setup_test_file(&[(1, (0, 5))]).await;
738        let file_len = file_contents.data.len() as u64;
739
740        retrieve_and_verify(&client, &file_contents, Some(FileRange::new(0, 1))).await;
741
742        retrieve_and_verify(&client, &file_contents, Some(FileRange::new(file_len - 1, file_len))).await;
743
744        let mid = file_len / 2;
745        retrieve_and_verify(&client, &file_contents, Some(FileRange::new(mid, mid + 1))).await;
746    }
747
748    #[tokio::test]
749    async fn test_multi_term_range_ends_mid_chunk() {
750        let (client, file_contents) = setup_test_file(&[(1, (0, 3)), (2, (0, 3)), (3, (0, 3))]).await;
751        let file_len = file_contents.data.len() as u64;
752
753        let range = FileRange::new(0, file_len - 5);
754        retrieve_and_verify(&client, &file_contents, Some(range)).await;
755    }
756
757    #[tokio::test]
758    async fn test_multi_term_range_starts_mid_chunk() {
759        let (client, file_contents) = setup_test_file(&[(1, (0, 3)), (2, (0, 3)), (3, (0, 3))]).await;
760        let file_len = file_contents.data.len() as u64;
761
762        let range = FileRange::new(5, file_len);
763        retrieve_and_verify(&client, &file_contents, Some(range)).await;
764    }
765
766    // ==================== Multi-Disjoint Range Edge Cases ====================
767
768    /// Single xorb with three disjoint chunk ranges.
769    /// This creates one XorbBlock with chunk_ranges = [(0,2), (4,6), (8,10)].
770    #[tokio::test]
771    async fn test_triple_disjoint_same_xorb() {
772        let (client, file_contents) = setup_test_file(&[(1, (0, 2)), (1, (4, 6)), (1, (8, 10))]).await;
773        retrieve_and_verify(&client, &file_contents, None).await;
774    }
775
776    /// Triple disjoint ranges with a partial byte range spanning the gap.
777    #[tokio::test]
778    async fn test_triple_disjoint_partial_range_across_gap() {
779        let (client, file_contents) = setup_test_file(&[(1, (0, 2)), (1, (4, 6)), (1, (8, 10))]).await;
780        let file_len = file_contents.data.len() as u64;
781        let range = FileRange::new(file_len / 4, file_len * 3 / 4);
782        retrieve_and_verify(&client, &file_contents, Some(range)).await;
783    }
784
785    /// Two xorbs, each with two disjoint ranges, interleaved in file order.
786    #[tokio::test]
787    async fn test_two_xorbs_interleaved_disjoint() {
788        let term_spec = &[(1, (0, 2)), (2, (0, 2)), (1, (4, 6)), (2, (4, 6))];
789        let (client, file_contents) = setup_test_file(term_spec).await;
790        retrieve_and_verify(&client, &file_contents, None).await;
791    }
792
793    /// Two xorbs interleaved with disjoint ranges, partial byte range.
794    #[tokio::test]
795    async fn test_two_xorbs_interleaved_disjoint_partial() {
796        let term_spec = &[(1, (0, 2)), (2, (0, 2)), (1, (4, 6)), (2, (4, 6))];
797        let (client, file_contents) = setup_test_file(term_spec).await;
798        let file_len = file_contents.data.len() as u64;
799        retrieve_and_verify(&client, &file_contents, Some(FileRange::new(file_len / 3, file_len * 2 / 3))).await;
800    }
801
802    /// Single xorb with four disjoint ranges, each a single chunk wide.
803    #[tokio::test]
804    async fn test_four_single_chunk_disjoint() {
805        let term_spec = &[(1, (0, 1)), (1, (3, 4)), (1, (6, 7)), (1, (9, 10))];
806        let (client, file_contents) = setup_test_file(term_spec).await;
807        retrieve_and_verify(&client, &file_contents, None).await;
808    }
809
810    /// Mix of contiguous and disjoint ranges from the same xorb.
811    /// Chunks 0-4 are contiguous, then a gap, then chunk 8-10.
812    #[tokio::test]
813    async fn test_contiguous_then_disjoint() {
814        let term_spec = &[(1, (0, 2)), (1, (2, 4)), (1, (8, 10))];
815        let (client, file_contents) = setup_test_file(term_spec).await;
816        retrieve_and_verify(&client, &file_contents, None).await;
817    }
818
819    /// Three xorbs with complex disjoint access patterns.
820    #[tokio::test]
821    async fn test_three_xorbs_complex_disjoint() {
822        let term_spec = &[
823            (1, (0, 2)),
824            (2, (0, 3)),
825            (3, (2, 5)),
826            (1, (5, 8)),
827            (2, (6, 8)),
828            (3, (0, 2)),
829        ];
830        let (client, file_contents) = setup_test_file(term_spec).await;
831        retrieve_and_verify(&client, &file_contents, None).await;
832    }
833}