Skip to main content

xet_data/file_reconstruction/reconstruction_terms/
xorb_block.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use bytes::Bytes;
5use tokio::sync::{Mutex, OnceCell};
6use xet_client::cas_client::{Client, ProgressCallback};
7use xet_client::cas_types::{ChunkRange, Key};
8use xet_client::chunk_cache::ChunkCache;
9use xet_core_structures::merklehash::MerkleHash;
10use xet_runtime::core::xet_config;
11use xet_runtime::utils::UniqueId;
12
13use super::super::error::Result;
14use super::retrieval_urls::{TermBlockRetrievalURLs, XorbURLProvider};
15use crate::progress_tracking::ItemProgressUpdater;
16
17/// Downloaded and decompressed data for a xorb block, including chunk boundary offsets.
18///
19/// A single `XorbBlockData` may hold data from multiple disjoint chunk ranges
20/// (V2 multi-range fetch). The chunks are concatenated in range order, and
21/// `chunk_offsets` maps each chunk index to its byte position within `data`.
22pub struct XorbBlockData {
23    /// Pairs of (chunk_index, byte_offset) mapping each chunk to its start position
24    /// within `data`. Because the block can span multiple disjoint chunk ranges,
25    /// storing the chunk index alongside the offset avoids ambiguity.
26    pub chunk_offsets: Vec<(usize, usize)>,
27
28    /// The concatenated decompressed chunk data for all ranges in this block.
29    pub data: Bytes,
30}
31
32/// A reference from a file term back to the xorb block it belongs to.
33/// Used by `determine_size_if_possible` to check whether the block's total
34/// uncompressed size can be inferred from the terms that reference it.
35#[derive(Debug)]
36pub struct XorbReference {
37    /// The chunk range within the xorb that this file term covers.
38    pub term_chunks: ChunkRange,
39    /// The uncompressed byte size of this term's data.
40    pub uncompressed_size: usize,
41}
42
43/// A downloadable xorb block identified by hash and chunk ranges, with cached data.
44///
45/// A block may contain multiple disjoint chunk ranges from the same xorb (V2 multi-range).
46/// Multiple file terms may reference the same block. Downloaded data is cached in `data`
47/// so that the first term to request it triggers the download, and subsequent terms
48/// reuse the cached result.
49pub struct XorbBlock {
50    pub xorb_hash: MerkleHash,
51    /// The chunk ranges fetched for this block. For V1 this is a single range;
52    /// for V2 multi-range fetches this may contain multiple disjoint ranges.
53    pub chunk_ranges: Vec<ChunkRange>,
54    /// Index into the parent `TermBlockRetrievalURLs` for URL lookup.
55    pub xorb_block_index: usize,
56    /// All file-term references covered by this block, sorted by chunk range start.
57    /// Populated during `retrieve_file_term_block` and used to compute `uncompressed_size_if_known`.
58    pub references: Vec<XorbReference>,
59    /// Expected total decompressed size across all chunk ranges, if it can be determined
60    /// from the references. Passed to clients as a debug assertion hint.
61    pub uncompressed_size_if_known: Option<usize>,
62    pub data: OnceCell<Arc<XorbBlockData>>,
63}
64
65impl PartialEq for XorbBlock {
66    fn eq(&self, other: &Self) -> bool {
67        self.xorb_hash == other.xorb_hash
68            && self.chunk_ranges == other.chunk_ranges
69            && self.xorb_block_index == other.xorb_block_index
70    }
71}
72
73impl Eq for XorbBlock {}
74
75/// Builds chunk offset pairs from chunk ranges and a flat byte-offset slice.
76fn build_chunk_offsets(chunk_ranges: &[ChunkRange], byte_offsets: &[u32]) -> Vec<(usize, usize)> {
77    let mut chunk_offsets = Vec::new();
78    let mut offset_idx = 0;
79    for range in chunk_ranges {
80        for chunk_idx in range.start..range.end {
81            chunk_offsets.push((chunk_idx as usize, byte_offsets[offset_idx] as usize));
82            offset_idx += 1;
83        }
84    }
85    chunk_offsets
86}
87
88impl XorbBlock {
89    /// Retrieve the xorb block data from the client, caching it for subsequent calls.
90    ///
91    /// Uses single-flight: the first caller acquires a CAS download permit and downloads
92    /// the data; concurrent callers wait on the same result without acquiring permits or
93    /// duplicating work. If the download fails, the cell remains empty and a later caller
94    /// can retry.
95    pub async fn retrieve_data(
96        self: Arc<Self>,
97        client: Arc<dyn Client>,
98        url_info: Arc<TermBlockRetrievalURLs>,
99        progress_updater: Option<Arc<ItemProgressUpdater>>,
100        chunk_cache: Option<Arc<dyn ChunkCache>>,
101    ) -> Result<Arc<XorbBlockData>> {
102        let xorb_block_index = self.xorb_block_index;
103        let uncompressed_size_if_known = self.uncompressed_size_if_known;
104        let chunk_ranges = self.chunk_ranges.clone();
105
106        self.data
107            .get_or_try_init(|| async {
108                // Try the on-disk chunk cache before hitting the network.
109                // NOTE: cache key uses only the first ChunkRange. This works when each
110                // XorbBlock has a single range, but will need rework if multi-range
111                // blocks (multiple disjoint chunk ranges per block) are cached.
112                if let Some(ref cache) = chunk_cache {
113                    let cache_key = Key {
114                        prefix: xet_config().data.default_prefix.clone(),
115                        hash: self.xorb_hash,
116                    };
117                    let chunk_range = chunk_ranges.first().copied().unwrap_or_default();
118
119                    if let Ok(Some(cache_range)) = cache.get(&cache_key, &chunk_range).await {
120                        // Report cached bytes as completed so progress tracking stays consistent.
121                        if let Some(ref updater) = progress_updater {
122                            let (_, _, http_ranges) = url_info.get_retrieval_url(xorb_block_index).await;
123                            let transfer_bytes: u64 = http_ranges.iter().map(|r| r.length()).sum();
124                            updater.report_transfer_progress(transfer_bytes);
125                        }
126                        let chunk_offsets = build_chunk_offsets(&chunk_ranges, &cache_range.offsets);
127                        let data = Bytes::from(cache_range.data);
128                        return Ok(Arc::new(XorbBlockData { chunk_offsets, data }));
129                    }
130                }
131
132                // Cache miss or no cache configured - download from CAS.
133                let permit = client.acquire_download_permit().await?;
134
135                let url_provider = XorbURLProvider {
136                    client: client.clone(),
137                    url_info,
138                    xorb_block_index,
139                    last_acquisition_id: Mutex::new(UniqueId::null()),
140                };
141
142                // Progress callback reports only transfer (network) bytes during get_file_term_data.
143                // Decompressed bytes are reported by the data writer when written to disk.
144                let progress_callback: Option<ProgressCallback> = progress_updater.as_ref().map(|updater| {
145                    let updater = updater.clone();
146                    Arc::new(move |delta: u64, _completed: u64, _total: u64| {
147                        updater.report_transfer_progress(delta);
148                    }) as ProgressCallback
149                });
150
151                let (data, chunk_byte_offsets) = client
152                    .get_file_term_data(Box::new(url_provider), permit, progress_callback, uncompressed_size_if_known)
153                    .await?;
154
155                // Store in chunk cache (best-effort, non-blocking).
156                if let Some(cache) = chunk_cache {
157                    let cache_key = Key {
158                        prefix: xet_config().data.default_prefix.clone(),
159                        hash: self.xorb_hash,
160                    };
161                    let chunk_range = chunk_ranges.first().copied().unwrap_or_default();
162                    let data = data.clone();
163                    let chunk_byte_offsets = chunk_byte_offsets.clone();
164                    tokio::spawn(async move {
165                        if let Err(err) = cache.put(&cache_key, &chunk_range, &chunk_byte_offsets, &data).await {
166                            tracing::warn!("chunk cache put failed: {err}");
167                        }
168                    });
169                }
170
171                let chunk_offsets = build_chunk_offsets(&chunk_ranges, &chunk_byte_offsets);
172
173                Ok(Arc::new(XorbBlockData { chunk_offsets, data }))
174            })
175            .await
176            .cloned()
177    }
178
179    /// Determines the total uncompressed size of the xorb block from the reference terms,
180    /// if possible.
181    ///
182    /// Uses a forward-chaining DP: starting from the first chunk range's start,
183    /// we track which chunk positions are "reachable" (i.e., fully covered by a
184    /// contiguous chain of terms) along with the accumulated uncompressed size.
185    ///
186    /// For multi-range blocks with disjoint chunk ranges (e.g. `[0,3)` and `[5,8)`),
187    /// the gaps between ranges are inserted as zero-cost bridges. This lets the DP
188    /// traverse the full set of ranges in a single pass — a gap `[3,5)` contributes
189    /// no data but connects the end of one range to the start of the next.
190    ///
191    /// Returns `Some(total_size)` if every range is fully covered, `None` otherwise.
192    ///
193    /// The `terms` slice must be sorted by `term_chunks.start`.
194    pub fn determine_size_if_possible(xorb_ranges: &[ChunkRange], terms: &[XorbReference]) -> Option<usize> {
195        debug_assert!(
196            terms.windows(2).all(|w| w[0].term_chunks.start <= w[1].term_chunks.start),
197            "terms must be sorted by chunk range start"
198        );
199
200        debug_assert!(
201            terms.iter().all(|term| xorb_ranges
202                .iter()
203                .any(|r| term.term_chunks.start >= r.start && term.term_chunks.end <= r.end)),
204            "all terms must fall within one of the xorb ranges"
205        );
206
207        if xorb_ranges.is_empty() {
208            return Some(0);
209        }
210
211        // Build a lookup from range-end -> next-range-start for gap bridging.
212        // E.g. for ranges [0,3) and [5,8), maps 3 -> 5, meaning once chunk 3
213        // is reachable we can bridge to chunk 5 at zero cost.
214        let gap_bridges: BTreeMap<u32, u32> = xorb_ranges
215            .windows(2)
216            .filter(|pair| pair[0].end < pair[1].start)
217            .map(|pair| (pair[0].end, pair[1].start))
218            .collect();
219
220        // DP map: chunk position -> accumulated uncompressed size to reach that position.
221        // Seed with the start of the first range.
222        let mut reachable: BTreeMap<u32, usize> = BTreeMap::new();
223        reachable.insert(xorb_ranges[0].start, 0);
224
225        // Process terms in sorted order, extending reachable positions.
226        for term in terms {
227            if let Some(&accumulated) = reachable.get(&term.term_chunks.start) {
228                let new_end = term.term_chunks.end;
229                let new_size = accumulated + term.uncompressed_size;
230
231                reachable.entry(new_end).or_insert(new_size);
232
233                // If this term reaches the end of a range that has a gap bridge,
234                // make the start of the next range reachable at the same accumulated size.
235                if let Some(&bridge_target) = gap_bridges.get(&new_end) {
236                    reachable.entry(bridge_target).or_insert(new_size);
237                }
238            }
239        }
240
241        // The block is fully covered if we can reach the end of the last range.
242        reachable.get(&xorb_ranges.last().unwrap().end).copied()
243    }
244}
245
246#[cfg(test)]
247mod tests {
248    use xet_client::cas_types::ChunkRange;
249
250    use super::*;
251
252    fn build_refs(pairs: &[(ChunkRange, usize)]) -> Vec<XorbReference> {
253        pairs
254            .iter()
255            .map(|(range, size)| XorbReference {
256                term_chunks: *range,
257                uncompressed_size: *size,
258            })
259            .collect()
260    }
261
262    #[test]
263    fn test_single_term_exact_match() {
264        let ranges = &[ChunkRange::new(0, 5)];
265        let terms = build_refs(&[(ChunkRange::new(0, 5), 1000)]);
266        assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(1000));
267    }
268
269    #[test]
270    fn test_two_terms_chained() {
271        let ranges = &[ChunkRange::new(0, 5)];
272        let terms = build_refs(&[(ChunkRange::new(0, 3), 600), (ChunkRange::new(3, 5), 400)]);
273        assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(1000));
274    }
275
276    #[test]
277    fn test_three_terms_chained() {
278        let ranges = &[ChunkRange::new(0, 6)];
279        let terms = build_refs(&[
280            (ChunkRange::new(0, 2), 200),
281            (ChunkRange::new(2, 4), 300),
282            (ChunkRange::new(4, 6), 500),
283        ]);
284        assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(1000));
285    }
286
287    #[test]
288    fn test_gap_in_chain() {
289        let ranges = &[ChunkRange::new(0, 6)];
290        let terms = build_refs(&[(ChunkRange::new(0, 2), 200), (ChunkRange::new(4, 6), 500)]);
291        assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), None);
292    }
293
294    #[test]
295    fn test_does_not_start_at_xorb_start() {
296        let ranges = &[ChunkRange::new(0, 5)];
297        let terms = build_refs(&[(ChunkRange::new(1, 5), 800)]);
298        assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), None);
299    }
300
301    #[test]
302    fn test_does_not_end_at_xorb_end() {
303        let ranges = &[ChunkRange::new(0, 5)];
304        let terms = build_refs(&[(ChunkRange::new(0, 3), 600)]);
305        assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), None);
306    }
307
308    #[test]
309    fn test_empty_terms() {
310        let ranges = &[ChunkRange::new(0, 5)];
311        let terms: Vec<XorbReference> = vec![];
312        assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), None);
313    }
314
315    #[test]
316    fn test_overlapping_terms_with_exact_cover() {
317        // Terms [0..3, 1..4, 3..5] - the chain 0..3, 3..5 covers 0..5.
318        // The overlapping term 1..4 should be skipped.
319        let ranges = &[ChunkRange::new(0, 5)];
320        let terms = build_refs(&[
321            (ChunkRange::new(0, 3), 600),
322            (ChunkRange::new(1, 4), 700),
323            (ChunkRange::new(3, 5), 400),
324        ]);
325        assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(1000));
326    }
327
328    #[test]
329    fn test_duplicate_terms_first_covers() {
330        // Two identical terms covering the full range.
331        let ranges = &[ChunkRange::new(0, 5)];
332        let terms = build_refs(&[(ChunkRange::new(0, 5), 1000), (ChunkRange::new(0, 5), 1000)]);
333        assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(1000));
334    }
335
336    #[test]
337    fn test_nonzero_xorb_start() {
338        let ranges = &[ChunkRange::new(3, 8)];
339        let terms = build_refs(&[(ChunkRange::new(3, 5), 400), (ChunkRange::new(5, 8), 600)]);
340        assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(1000));
341    }
342
343    #[test]
344    fn test_nonzero_xorb_start_no_match() {
345        let ranges = &[ChunkRange::new(3, 8)];
346        let terms = build_refs(&[(ChunkRange::new(3, 5), 400)]);
347        assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), None);
348    }
349
350    #[test]
351    fn test_single_chunk_range() {
352        let ranges = &[ChunkRange::new(0, 1)];
353        let terms = build_refs(&[(ChunkRange::new(0, 1), 42)]);
354        assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(42));
355    }
356
357    #[test]
358    fn test_chain_with_overlapping_inner_terms() {
359        let ranges = &[ChunkRange::new(2, 8)];
360        // The overlapping term [3,6) is within the range but doesn't form
361        // a better chain than [2,5) + [5,8), so it's harmlessly ignored.
362        let terms = build_refs(&[
363            (ChunkRange::new(2, 5), 500),
364            (ChunkRange::new(3, 6), 999),
365            (ChunkRange::new(5, 8), 300),
366        ]);
367        assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(800));
368    }
369
370    #[test]
371    fn test_partial_overlap_no_cover() {
372        // Terms partially overlap but don't form a contiguous chain covering the full range.
373        let ranges = &[ChunkRange::new(0, 10)];
374        let terms = build_refs(&[
375            (ChunkRange::new(0, 4), 400),
376            (ChunkRange::new(3, 7), 400),
377            (ChunkRange::new(6, 10), 400),
378        ]);
379        assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), None);
380    }
381
382    #[test]
383    fn test_same_start_short_then_long_covering_full() {
384        // Short range first, then a long range that covers the full xorb.
385        let ranges = &[ChunkRange::new(0, 5)];
386        let terms = build_refs(&[(ChunkRange::new(0, 3), 300), (ChunkRange::new(0, 5), 500)]);
387        assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(500));
388    }
389
390    #[test]
391    fn test_same_start_short_then_long_with_chain() {
392        // Short range first, then a longer range, where the short range can also chain.
393        // Chain via 0..3 + 3..6 = 600
394        let ranges = &[ChunkRange::new(0, 6)];
395        let terms = build_refs(&[
396            (ChunkRange::new(0, 2), 200),
397            (ChunkRange::new(0, 3), 300),
398            (ChunkRange::new(3, 6), 300),
399        ]);
400        assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(600));
401    }
402
403    #[test]
404    fn test_same_start_multiple_duplicates_chain_through_second() {
405        // Multiple terms at start 0 with different lengths; only the middle one chains.
406        // Chain via 0..4 + 4..6 = 600
407        let ranges = &[ChunkRange::new(0, 6)];
408        let terms = build_refs(&[
409            (ChunkRange::new(0, 2), 200),
410            (ChunkRange::new(0, 4), 400),
411            (ChunkRange::new(0, 5), 500),
412            (ChunkRange::new(4, 6), 200),
413        ]);
414        assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(600));
415    }
416
417    #[test]
418    fn test_same_start_at_midpoint() {
419        // Duplicate starts at a midpoint in the chain, not just at the beginning.
420        // Chain via 0..3 + 3..6 + 6..8 = 800
421        let ranges = &[ChunkRange::new(0, 8)];
422        let terms = build_refs(&[
423            (ChunkRange::new(0, 3), 300),
424            (ChunkRange::new(3, 5), 200),
425            (ChunkRange::new(3, 6), 300),
426            (ChunkRange::new(6, 8), 200),
427        ]);
428        assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(800));
429    }
430
431    #[test]
432    fn test_same_start_none_covers() {
433        // Multiple terms at start 0, but none chain to cover the full range.
434        let ranges = &[ChunkRange::new(0, 10)];
435        let terms = build_refs(&[
436            (ChunkRange::new(0, 2), 200),
437            (ChunkRange::new(0, 4), 400),
438            (ChunkRange::new(0, 6), 600),
439        ]);
440        assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), None);
441    }
442
443    #[test]
444    fn test_same_start_two_groups_chained() {
445        // Two groups of duplicate-start terms that chain together.
446        // Chain via 0..3 + 3..6 = 600
447        let ranges = &[ChunkRange::new(0, 6)];
448        let terms = build_refs(&[
449            (ChunkRange::new(0, 2), 200),
450            (ChunkRange::new(0, 3), 300),
451            (ChunkRange::new(3, 5), 200),
452            (ChunkRange::new(3, 6), 300),
453        ]);
454        assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(600));
455    }
456
457    #[test]
458    fn test_multiple_disjoint_ranges_both_covered() {
459        let ranges = &[ChunkRange::new(0, 3), ChunkRange::new(5, 8)];
460        let terms = build_refs(&[(ChunkRange::new(0, 3), 300), (ChunkRange::new(5, 8), 400)]);
461        assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(700));
462    }
463
464    #[test]
465    fn test_multiple_disjoint_ranges_one_uncovered() {
466        let ranges = &[ChunkRange::new(0, 3), ChunkRange::new(5, 8)];
467        let terms = build_refs(&[(ChunkRange::new(0, 3), 300)]);
468        assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), None);
469    }
470}