xet-data 1.5.2

Data processing pipeline for chunking, deduplication, and file reconstruction; used in the Hugging Face Xet client tools. Intended to be used through the API in the hf-xet package.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
use std::collections::BTreeMap;
use std::sync::Arc;

use bytes::Bytes;
use tokio::sync::{Mutex, OnceCell};
use xet_client::cas_client::{Client, ProgressCallback};
use xet_client::cas_types::{ChunkRange, Key};
use xet_client::chunk_cache::ChunkCache;
use xet_core_structures::merklehash::MerkleHash;
use xet_runtime::core::xet_config;
use xet_runtime::utils::UniqueId;

use super::super::error::Result;
use super::retrieval_urls::{TermBlockRetrievalURLs, XorbURLProvider};
use crate::progress_tracking::ItemProgressUpdater;

/// Downloaded and decompressed data for a xorb block, including chunk boundary offsets.
///
/// A single `XorbBlockData` may hold data from multiple disjoint chunk ranges
/// (V2 multi-range fetch). The chunks are concatenated in range order, and
/// `chunk_offsets` maps each chunk index to its byte position within `data`.
pub struct XorbBlockData {
    /// Pairs of (chunk_index, byte_offset) mapping each chunk to its start position
    /// within `data`. Because the block can span multiple disjoint chunk ranges,
    /// storing the chunk index alongside the offset avoids ambiguity.
    pub chunk_offsets: Vec<(usize, usize)>,

    /// The concatenated decompressed chunk data for all ranges in this block.
    pub data: Bytes,
}

/// A reference from a file term back to the xorb block it belongs to.
/// Used by `determine_size_if_possible` to check whether the block's total
/// uncompressed size can be inferred from the terms that reference it.
#[derive(Debug)]
pub struct XorbReference {
    /// The chunk range within the xorb that this file term covers.
    pub term_chunks: ChunkRange,
    /// The uncompressed byte size of this term's data.
    pub uncompressed_size: usize,
}

/// A downloadable xorb block identified by hash and chunk ranges, with cached data.
///
/// A block may contain multiple disjoint chunk ranges from the same xorb (V2 multi-range).
/// Multiple file terms may reference the same block. Downloaded data is cached in `data`
/// so that the first term to request it triggers the download, and subsequent terms
/// reuse the cached result.
pub struct XorbBlock {
    pub xorb_hash: MerkleHash,
    /// The chunk ranges fetched for this block. For V1 this is a single range;
    /// for V2 multi-range fetches this may contain multiple disjoint ranges.
    pub chunk_ranges: Vec<ChunkRange>,
    /// Index into the parent `TermBlockRetrievalURLs` for URL lookup.
    pub xorb_block_index: usize,
    /// All file-term references covered by this block, sorted by chunk range start.
    /// Populated during `retrieve_file_term_block` and used to compute `uncompressed_size_if_known`.
    pub references: Vec<XorbReference>,
    /// Expected total decompressed size across all chunk ranges, if it can be determined
    /// from the references. Passed to clients as a debug assertion hint.
    pub uncompressed_size_if_known: Option<usize>,
    pub data: OnceCell<Arc<XorbBlockData>>,
}

impl PartialEq for XorbBlock {
    fn eq(&self, other: &Self) -> bool {
        self.xorb_hash == other.xorb_hash
            && self.chunk_ranges == other.chunk_ranges
            && self.xorb_block_index == other.xorb_block_index
    }
}

impl Eq for XorbBlock {}

/// Builds chunk offset pairs from chunk ranges and a flat byte-offset slice.
fn build_chunk_offsets(chunk_ranges: &[ChunkRange], byte_offsets: &[u32]) -> Vec<(usize, usize)> {
    let mut chunk_offsets = Vec::new();
    let mut offset_idx = 0;
    for range in chunk_ranges {
        for chunk_idx in range.start..range.end {
            chunk_offsets.push((chunk_idx as usize, byte_offsets[offset_idx] as usize));
            offset_idx += 1;
        }
    }
    chunk_offsets
}

impl XorbBlock {
    /// Retrieve the xorb block data from the client, caching it for subsequent calls.
    ///
    /// Uses single-flight: the first caller acquires a CAS download permit and downloads
    /// the data; concurrent callers wait on the same result without acquiring permits or
    /// duplicating work. If the download fails, the cell remains empty and a later caller
    /// can retry.
    pub async fn retrieve_data(
        self: Arc<Self>,
        client: Arc<dyn Client>,
        url_info: Arc<TermBlockRetrievalURLs>,
        progress_updater: Option<Arc<ItemProgressUpdater>>,
        chunk_cache: Option<Arc<dyn ChunkCache>>,
    ) -> Result<Arc<XorbBlockData>> {
        let xorb_block_index = self.xorb_block_index;
        let uncompressed_size_if_known = self.uncompressed_size_if_known;
        let chunk_ranges = self.chunk_ranges.clone();

        self.data
            .get_or_try_init(|| async {
                // Try the on-disk chunk cache before hitting the network.
                // NOTE: cache key uses only the first ChunkRange. This works when each
                // XorbBlock has a single range, but will need rework if multi-range
                // blocks (multiple disjoint chunk ranges per block) are cached.
                if let Some(ref cache) = chunk_cache {
                    let cache_key = Key {
                        prefix: xet_config().data.default_prefix.clone(),
                        hash: self.xorb_hash,
                    };
                    let chunk_range = chunk_ranges.first().copied().unwrap_or_default();

                    if let Ok(Some(cache_range)) = cache.get(&cache_key, &chunk_range).await {
                        // Report cached bytes as completed so progress tracking stays consistent.
                        if let Some(ref updater) = progress_updater {
                            let (_, _, http_ranges) = url_info.get_retrieval_url(xorb_block_index).await;
                            let transfer_bytes: u64 = http_ranges.iter().map(|r| r.length()).sum();
                            updater.report_transfer_progress(transfer_bytes);
                        }
                        let chunk_offsets = build_chunk_offsets(&chunk_ranges, &cache_range.offsets);
                        let data = Bytes::from(cache_range.data);
                        return Ok(Arc::new(XorbBlockData { chunk_offsets, data }));
                    }
                }

                // Cache miss or no cache configured - download from CAS.
                let permit = client.acquire_download_permit().await?;

                let url_provider = XorbURLProvider {
                    client: client.clone(),
                    url_info,
                    xorb_block_index,
                    last_acquisition_id: Mutex::new(UniqueId::null()),
                };

                // Progress callback reports only transfer (network) bytes during get_file_term_data.
                // Decompressed bytes are reported by the data writer when written to disk.
                let progress_callback: Option<ProgressCallback> = progress_updater.as_ref().map(|updater| {
                    let updater = updater.clone();
                    Arc::new(move |delta: u64, _completed: u64, _total: u64| {
                        updater.report_transfer_progress(delta);
                    }) as ProgressCallback
                });

                let (data, chunk_byte_offsets) = client
                    .get_file_term_data(Box::new(url_provider), permit, progress_callback, uncompressed_size_if_known)
                    .await?;

                // Store in chunk cache (best-effort, non-blocking).
                if let Some(cache) = chunk_cache {
                    let cache_key = Key {
                        prefix: xet_config().data.default_prefix.clone(),
                        hash: self.xorb_hash,
                    };
                    let chunk_range = chunk_ranges.first().copied().unwrap_or_default();
                    let data = data.clone();
                    let chunk_byte_offsets = chunk_byte_offsets.clone();
                    tokio::spawn(async move {
                        if let Err(err) = cache.put(&cache_key, &chunk_range, &chunk_byte_offsets, &data).await {
                            tracing::warn!("chunk cache put failed: {err}");
                        }
                    });
                }

                let chunk_offsets = build_chunk_offsets(&chunk_ranges, &chunk_byte_offsets);

                Ok(Arc::new(XorbBlockData { chunk_offsets, data }))
            })
            .await
            .cloned()
    }

    /// Determines the total uncompressed size of the xorb block from the reference terms,
    /// if possible.
    ///
    /// Uses a forward-chaining DP: starting from the first chunk range's start,
    /// we track which chunk positions are "reachable" (i.e., fully covered by a
    /// contiguous chain of terms) along with the accumulated uncompressed size.
    ///
    /// For multi-range blocks with disjoint chunk ranges (e.g. `[0,3)` and `[5,8)`),
    /// the gaps between ranges are inserted as zero-cost bridges. This lets the DP
    /// traverse the full set of ranges in a single pass — a gap `[3,5)` contributes
    /// no data but connects the end of one range to the start of the next.
    ///
    /// Returns `Some(total_size)` if every range is fully covered, `None` otherwise.
    ///
    /// The `terms` slice must be sorted by `term_chunks.start`.
    pub fn determine_size_if_possible(xorb_ranges: &[ChunkRange], terms: &[XorbReference]) -> Option<usize> {
        debug_assert!(
            terms.windows(2).all(|w| w[0].term_chunks.start <= w[1].term_chunks.start),
            "terms must be sorted by chunk range start"
        );

        debug_assert!(
            terms.iter().all(|term| xorb_ranges
                .iter()
                .any(|r| term.term_chunks.start >= r.start && term.term_chunks.end <= r.end)),
            "all terms must fall within one of the xorb ranges"
        );

        if xorb_ranges.is_empty() {
            return Some(0);
        }

        // Build a lookup from range-end -> next-range-start for gap bridging.
        // E.g. for ranges [0,3) and [5,8), maps 3 -> 5, meaning once chunk 3
        // is reachable we can bridge to chunk 5 at zero cost.
        let gap_bridges: BTreeMap<u32, u32> = xorb_ranges
            .windows(2)
            .filter(|pair| pair[0].end < pair[1].start)
            .map(|pair| (pair[0].end, pair[1].start))
            .collect();

        // DP map: chunk position -> accumulated uncompressed size to reach that position.
        // Seed with the start of the first range.
        let mut reachable: BTreeMap<u32, usize> = BTreeMap::new();
        reachable.insert(xorb_ranges[0].start, 0);

        // Process terms in sorted order, extending reachable positions.
        for term in terms {
            if let Some(&accumulated) = reachable.get(&term.term_chunks.start) {
                let new_end = term.term_chunks.end;
                let new_size = accumulated + term.uncompressed_size;

                reachable.entry(new_end).or_insert(new_size);

                // If this term reaches the end of a range that has a gap bridge,
                // make the start of the next range reachable at the same accumulated size.
                if let Some(&bridge_target) = gap_bridges.get(&new_end) {
                    reachable.entry(bridge_target).or_insert(new_size);
                }
            }
        }

        // The block is fully covered if we can reach the end of the last range.
        reachable.get(&xorb_ranges.last().unwrap().end).copied()
    }
}

#[cfg(test)]
mod tests {
    use xet_client::cas_types::ChunkRange;

    use super::*;

    fn build_refs(pairs: &[(ChunkRange, usize)]) -> Vec<XorbReference> {
        pairs
            .iter()
            .map(|(range, size)| XorbReference {
                term_chunks: *range,
                uncompressed_size: *size,
            })
            .collect()
    }

    #[test]
    fn test_single_term_exact_match() {
        let ranges = &[ChunkRange::new(0, 5)];
        let terms = build_refs(&[(ChunkRange::new(0, 5), 1000)]);
        assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(1000));
    }

    #[test]
    fn test_two_terms_chained() {
        let ranges = &[ChunkRange::new(0, 5)];
        let terms = build_refs(&[(ChunkRange::new(0, 3), 600), (ChunkRange::new(3, 5), 400)]);
        assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(1000));
    }

    #[test]
    fn test_three_terms_chained() {
        let ranges = &[ChunkRange::new(0, 6)];
        let terms = build_refs(&[
            (ChunkRange::new(0, 2), 200),
            (ChunkRange::new(2, 4), 300),
            (ChunkRange::new(4, 6), 500),
        ]);
        assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(1000));
    }

    #[test]
    fn test_gap_in_chain() {
        let ranges = &[ChunkRange::new(0, 6)];
        let terms = build_refs(&[(ChunkRange::new(0, 2), 200), (ChunkRange::new(4, 6), 500)]);
        assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), None);
    }

    #[test]
    fn test_does_not_start_at_xorb_start() {
        let ranges = &[ChunkRange::new(0, 5)];
        let terms = build_refs(&[(ChunkRange::new(1, 5), 800)]);
        assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), None);
    }

    #[test]
    fn test_does_not_end_at_xorb_end() {
        let ranges = &[ChunkRange::new(0, 5)];
        let terms = build_refs(&[(ChunkRange::new(0, 3), 600)]);
        assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), None);
    }

    #[test]
    fn test_empty_terms() {
        let ranges = &[ChunkRange::new(0, 5)];
        let terms: Vec<XorbReference> = vec![];
        assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), None);
    }

    #[test]
    fn test_overlapping_terms_with_exact_cover() {
        // Terms [0..3, 1..4, 3..5] - the chain 0..3, 3..5 covers 0..5.
        // The overlapping term 1..4 should be skipped.
        let ranges = &[ChunkRange::new(0, 5)];
        let terms = build_refs(&[
            (ChunkRange::new(0, 3), 600),
            (ChunkRange::new(1, 4), 700),
            (ChunkRange::new(3, 5), 400),
        ]);
        assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(1000));
    }

    #[test]
    fn test_duplicate_terms_first_covers() {
        // Two identical terms covering the full range.
        let ranges = &[ChunkRange::new(0, 5)];
        let terms = build_refs(&[(ChunkRange::new(0, 5), 1000), (ChunkRange::new(0, 5), 1000)]);
        assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(1000));
    }

    #[test]
    fn test_nonzero_xorb_start() {
        let ranges = &[ChunkRange::new(3, 8)];
        let terms = build_refs(&[(ChunkRange::new(3, 5), 400), (ChunkRange::new(5, 8), 600)]);
        assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(1000));
    }

    #[test]
    fn test_nonzero_xorb_start_no_match() {
        let ranges = &[ChunkRange::new(3, 8)];
        let terms = build_refs(&[(ChunkRange::new(3, 5), 400)]);
        assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), None);
    }

    #[test]
    fn test_single_chunk_range() {
        let ranges = &[ChunkRange::new(0, 1)];
        let terms = build_refs(&[(ChunkRange::new(0, 1), 42)]);
        assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(42));
    }

    #[test]
    fn test_chain_with_overlapping_inner_terms() {
        let ranges = &[ChunkRange::new(2, 8)];
        // The overlapping term [3,6) is within the range but doesn't form
        // a better chain than [2,5) + [5,8), so it's harmlessly ignored.
        let terms = build_refs(&[
            (ChunkRange::new(2, 5), 500),
            (ChunkRange::new(3, 6), 999),
            (ChunkRange::new(5, 8), 300),
        ]);
        assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(800));
    }

    #[test]
    fn test_partial_overlap_no_cover() {
        // Terms partially overlap but don't form a contiguous chain covering the full range.
        let ranges = &[ChunkRange::new(0, 10)];
        let terms = build_refs(&[
            (ChunkRange::new(0, 4), 400),
            (ChunkRange::new(3, 7), 400),
            (ChunkRange::new(6, 10), 400),
        ]);
        assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), None);
    }

    #[test]
    fn test_same_start_short_then_long_covering_full() {
        // Short range first, then a long range that covers the full xorb.
        let ranges = &[ChunkRange::new(0, 5)];
        let terms = build_refs(&[(ChunkRange::new(0, 3), 300), (ChunkRange::new(0, 5), 500)]);
        assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(500));
    }

    #[test]
    fn test_same_start_short_then_long_with_chain() {
        // Short range first, then a longer range, where the short range can also chain.
        // Chain via 0..3 + 3..6 = 600
        let ranges = &[ChunkRange::new(0, 6)];
        let terms = build_refs(&[
            (ChunkRange::new(0, 2), 200),
            (ChunkRange::new(0, 3), 300),
            (ChunkRange::new(3, 6), 300),
        ]);
        assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(600));
    }

    #[test]
    fn test_same_start_multiple_duplicates_chain_through_second() {
        // Multiple terms at start 0 with different lengths; only the middle one chains.
        // Chain via 0..4 + 4..6 = 600
        let ranges = &[ChunkRange::new(0, 6)];
        let terms = build_refs(&[
            (ChunkRange::new(0, 2), 200),
            (ChunkRange::new(0, 4), 400),
            (ChunkRange::new(0, 5), 500),
            (ChunkRange::new(4, 6), 200),
        ]);
        assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(600));
    }

    #[test]
    fn test_same_start_at_midpoint() {
        // Duplicate starts at a midpoint in the chain, not just at the beginning.
        // Chain via 0..3 + 3..6 + 6..8 = 800
        let ranges = &[ChunkRange::new(0, 8)];
        let terms = build_refs(&[
            (ChunkRange::new(0, 3), 300),
            (ChunkRange::new(3, 5), 200),
            (ChunkRange::new(3, 6), 300),
            (ChunkRange::new(6, 8), 200),
        ]);
        assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(800));
    }

    #[test]
    fn test_same_start_none_covers() {
        // Multiple terms at start 0, but none chain to cover the full range.
        let ranges = &[ChunkRange::new(0, 10)];
        let terms = build_refs(&[
            (ChunkRange::new(0, 2), 200),
            (ChunkRange::new(0, 4), 400),
            (ChunkRange::new(0, 6), 600),
        ]);
        assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), None);
    }

    #[test]
    fn test_same_start_two_groups_chained() {
        // Two groups of duplicate-start terms that chain together.
        // Chain via 0..3 + 3..6 = 600
        let ranges = &[ChunkRange::new(0, 6)];
        let terms = build_refs(&[
            (ChunkRange::new(0, 2), 200),
            (ChunkRange::new(0, 3), 300),
            (ChunkRange::new(3, 5), 200),
            (ChunkRange::new(3, 6), 300),
        ]);
        assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(600));
    }

    #[test]
    fn test_multiple_disjoint_ranges_both_covered() {
        let ranges = &[ChunkRange::new(0, 3), ChunkRange::new(5, 8)];
        let terms = build_refs(&[(ChunkRange::new(0, 3), 300), (ChunkRange::new(5, 8), 400)]);
        assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(700));
    }

    #[test]
    fn test_multiple_disjoint_ranges_one_uncovered() {
        let ranges = &[ChunkRange::new(0, 3), ChunkRange::new(5, 8)];
        let terms = build_refs(&[(ChunkRange::new(0, 3), 300)]);
        assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), None);
    }
}