1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use bytes::Bytes;
5use tokio::sync::{Mutex, OnceCell};
6use xet_client::cas_client::{Client, ProgressCallback};
7use xet_client::cas_types::{ChunkRange, Key};
8use xet_client::chunk_cache::ChunkCache;
9use xet_core_structures::merklehash::MerkleHash;
10use xet_runtime::core::xet_config;
11use xet_runtime::utils::UniqueId;
12
13use super::super::error::Result;
14use super::retrieval_urls::{TermBlockRetrievalURLs, XorbURLProvider};
15use crate::progress_tracking::ItemProgressUpdater;
16
17pub struct XorbBlockData {
23 pub chunk_offsets: Vec<(usize, usize)>,
27
28 pub data: Bytes,
30}
31
32#[derive(Debug)]
36pub struct XorbReference {
37 pub term_chunks: ChunkRange,
39 pub uncompressed_size: usize,
41}
42
43pub struct XorbBlock {
50 pub xorb_hash: MerkleHash,
51 pub chunk_ranges: Vec<ChunkRange>,
54 pub xorb_block_index: usize,
56 pub references: Vec<XorbReference>,
59 pub uncompressed_size_if_known: Option<usize>,
62 pub data: OnceCell<Arc<XorbBlockData>>,
63}
64
65impl PartialEq for XorbBlock {
66 fn eq(&self, other: &Self) -> bool {
67 self.xorb_hash == other.xorb_hash
68 && self.chunk_ranges == other.chunk_ranges
69 && self.xorb_block_index == other.xorb_block_index
70 }
71}
72
73impl Eq for XorbBlock {}
74
75fn build_chunk_offsets(chunk_ranges: &[ChunkRange], byte_offsets: &[u32]) -> Vec<(usize, usize)> {
77 let mut chunk_offsets = Vec::new();
78 let mut offset_idx = 0;
79 for range in chunk_ranges {
80 for chunk_idx in range.start..range.end {
81 chunk_offsets.push((chunk_idx as usize, byte_offsets[offset_idx] as usize));
82 offset_idx += 1;
83 }
84 }
85 chunk_offsets
86}
87
88impl XorbBlock {
89 pub async fn retrieve_data(
96 self: Arc<Self>,
97 client: Arc<dyn Client>,
98 url_info: Arc<TermBlockRetrievalURLs>,
99 progress_updater: Option<Arc<ItemProgressUpdater>>,
100 chunk_cache: Option<Arc<dyn ChunkCache>>,
101 ) -> Result<Arc<XorbBlockData>> {
102 let xorb_block_index = self.xorb_block_index;
103 let uncompressed_size_if_known = self.uncompressed_size_if_known;
104 let chunk_ranges = self.chunk_ranges.clone();
105
106 self.data
107 .get_or_try_init(|| async {
108 if let Some(ref cache) = chunk_cache {
113 let cache_key = Key {
114 prefix: xet_config().data.default_prefix.clone(),
115 hash: self.xorb_hash,
116 };
117 let chunk_range = chunk_ranges.first().copied().unwrap_or_default();
118
119 if let Ok(Some(cache_range)) = cache.get(&cache_key, &chunk_range).await {
120 if let Some(ref updater) = progress_updater {
122 let (_, _, http_ranges) = url_info.get_retrieval_url(xorb_block_index).await;
123 let transfer_bytes: u64 = http_ranges.iter().map(|r| r.length()).sum();
124 updater.report_transfer_progress(transfer_bytes);
125 }
126 let chunk_offsets = build_chunk_offsets(&chunk_ranges, &cache_range.offsets);
127 let data = Bytes::from(cache_range.data);
128 return Ok(Arc::new(XorbBlockData { chunk_offsets, data }));
129 }
130 }
131
132 let permit = client.acquire_download_permit().await?;
134
135 let url_provider = XorbURLProvider {
136 client: client.clone(),
137 url_info,
138 xorb_block_index,
139 last_acquisition_id: Mutex::new(UniqueId::null()),
140 };
141
142 let progress_callback: Option<ProgressCallback> = progress_updater.as_ref().map(|updater| {
145 let updater = updater.clone();
146 Arc::new(move |delta: u64, _completed: u64, _total: u64| {
147 updater.report_transfer_progress(delta);
148 }) as ProgressCallback
149 });
150
151 let (data, chunk_byte_offsets) = client
152 .get_file_term_data(Box::new(url_provider), permit, progress_callback, uncompressed_size_if_known)
153 .await?;
154
155 if let Some(cache) = chunk_cache {
157 let cache_key = Key {
158 prefix: xet_config().data.default_prefix.clone(),
159 hash: self.xorb_hash,
160 };
161 let chunk_range = chunk_ranges.first().copied().unwrap_or_default();
162 let data = data.clone();
163 let chunk_byte_offsets = chunk_byte_offsets.clone();
164 tokio::spawn(async move {
165 if let Err(err) = cache.put(&cache_key, &chunk_range, &chunk_byte_offsets, &data).await {
166 tracing::warn!("chunk cache put failed: {err}");
167 }
168 });
169 }
170
171 let chunk_offsets = build_chunk_offsets(&chunk_ranges, &chunk_byte_offsets);
172
173 Ok(Arc::new(XorbBlockData { chunk_offsets, data }))
174 })
175 .await
176 .cloned()
177 }
178
179 pub fn determine_size_if_possible(xorb_ranges: &[ChunkRange], terms: &[XorbReference]) -> Option<usize> {
195 debug_assert!(
196 terms.windows(2).all(|w| w[0].term_chunks.start <= w[1].term_chunks.start),
197 "terms must be sorted by chunk range start"
198 );
199
200 debug_assert!(
201 terms.iter().all(|term| xorb_ranges
202 .iter()
203 .any(|r| term.term_chunks.start >= r.start && term.term_chunks.end <= r.end)),
204 "all terms must fall within one of the xorb ranges"
205 );
206
207 if xorb_ranges.is_empty() {
208 return Some(0);
209 }
210
211 let gap_bridges: BTreeMap<u32, u32> = xorb_ranges
215 .windows(2)
216 .filter(|pair| pair[0].end < pair[1].start)
217 .map(|pair| (pair[0].end, pair[1].start))
218 .collect();
219
220 let mut reachable: BTreeMap<u32, usize> = BTreeMap::new();
223 reachable.insert(xorb_ranges[0].start, 0);
224
225 for term in terms {
227 if let Some(&accumulated) = reachable.get(&term.term_chunks.start) {
228 let new_end = term.term_chunks.end;
229 let new_size = accumulated + term.uncompressed_size;
230
231 reachable.entry(new_end).or_insert(new_size);
232
233 if let Some(&bridge_target) = gap_bridges.get(&new_end) {
236 reachable.entry(bridge_target).or_insert(new_size);
237 }
238 }
239 }
240
241 reachable.get(&xorb_ranges.last().unwrap().end).copied()
243 }
244}
245
246#[cfg(test)]
247mod tests {
248 use xet_client::cas_types::ChunkRange;
249
250 use super::*;
251
252 fn build_refs(pairs: &[(ChunkRange, usize)]) -> Vec<XorbReference> {
253 pairs
254 .iter()
255 .map(|(range, size)| XorbReference {
256 term_chunks: *range,
257 uncompressed_size: *size,
258 })
259 .collect()
260 }
261
262 #[test]
263 fn test_single_term_exact_match() {
264 let ranges = &[ChunkRange::new(0, 5)];
265 let terms = build_refs(&[(ChunkRange::new(0, 5), 1000)]);
266 assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(1000));
267 }
268
269 #[test]
270 fn test_two_terms_chained() {
271 let ranges = &[ChunkRange::new(0, 5)];
272 let terms = build_refs(&[(ChunkRange::new(0, 3), 600), (ChunkRange::new(3, 5), 400)]);
273 assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(1000));
274 }
275
276 #[test]
277 fn test_three_terms_chained() {
278 let ranges = &[ChunkRange::new(0, 6)];
279 let terms = build_refs(&[
280 (ChunkRange::new(0, 2), 200),
281 (ChunkRange::new(2, 4), 300),
282 (ChunkRange::new(4, 6), 500),
283 ]);
284 assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(1000));
285 }
286
287 #[test]
288 fn test_gap_in_chain() {
289 let ranges = &[ChunkRange::new(0, 6)];
290 let terms = build_refs(&[(ChunkRange::new(0, 2), 200), (ChunkRange::new(4, 6), 500)]);
291 assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), None);
292 }
293
294 #[test]
295 fn test_does_not_start_at_xorb_start() {
296 let ranges = &[ChunkRange::new(0, 5)];
297 let terms = build_refs(&[(ChunkRange::new(1, 5), 800)]);
298 assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), None);
299 }
300
301 #[test]
302 fn test_does_not_end_at_xorb_end() {
303 let ranges = &[ChunkRange::new(0, 5)];
304 let terms = build_refs(&[(ChunkRange::new(0, 3), 600)]);
305 assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), None);
306 }
307
308 #[test]
309 fn test_empty_terms() {
310 let ranges = &[ChunkRange::new(0, 5)];
311 let terms: Vec<XorbReference> = vec![];
312 assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), None);
313 }
314
315 #[test]
316 fn test_overlapping_terms_with_exact_cover() {
317 let ranges = &[ChunkRange::new(0, 5)];
320 let terms = build_refs(&[
321 (ChunkRange::new(0, 3), 600),
322 (ChunkRange::new(1, 4), 700),
323 (ChunkRange::new(3, 5), 400),
324 ]);
325 assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(1000));
326 }
327
328 #[test]
329 fn test_duplicate_terms_first_covers() {
330 let ranges = &[ChunkRange::new(0, 5)];
332 let terms = build_refs(&[(ChunkRange::new(0, 5), 1000), (ChunkRange::new(0, 5), 1000)]);
333 assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(1000));
334 }
335
336 #[test]
337 fn test_nonzero_xorb_start() {
338 let ranges = &[ChunkRange::new(3, 8)];
339 let terms = build_refs(&[(ChunkRange::new(3, 5), 400), (ChunkRange::new(5, 8), 600)]);
340 assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(1000));
341 }
342
343 #[test]
344 fn test_nonzero_xorb_start_no_match() {
345 let ranges = &[ChunkRange::new(3, 8)];
346 let terms = build_refs(&[(ChunkRange::new(3, 5), 400)]);
347 assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), None);
348 }
349
350 #[test]
351 fn test_single_chunk_range() {
352 let ranges = &[ChunkRange::new(0, 1)];
353 let terms = build_refs(&[(ChunkRange::new(0, 1), 42)]);
354 assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(42));
355 }
356
357 #[test]
358 fn test_chain_with_overlapping_inner_terms() {
359 let ranges = &[ChunkRange::new(2, 8)];
360 let terms = build_refs(&[
363 (ChunkRange::new(2, 5), 500),
364 (ChunkRange::new(3, 6), 999),
365 (ChunkRange::new(5, 8), 300),
366 ]);
367 assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(800));
368 }
369
370 #[test]
371 fn test_partial_overlap_no_cover() {
372 let ranges = &[ChunkRange::new(0, 10)];
374 let terms = build_refs(&[
375 (ChunkRange::new(0, 4), 400),
376 (ChunkRange::new(3, 7), 400),
377 (ChunkRange::new(6, 10), 400),
378 ]);
379 assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), None);
380 }
381
382 #[test]
383 fn test_same_start_short_then_long_covering_full() {
384 let ranges = &[ChunkRange::new(0, 5)];
386 let terms = build_refs(&[(ChunkRange::new(0, 3), 300), (ChunkRange::new(0, 5), 500)]);
387 assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(500));
388 }
389
390 #[test]
391 fn test_same_start_short_then_long_with_chain() {
392 let ranges = &[ChunkRange::new(0, 6)];
395 let terms = build_refs(&[
396 (ChunkRange::new(0, 2), 200),
397 (ChunkRange::new(0, 3), 300),
398 (ChunkRange::new(3, 6), 300),
399 ]);
400 assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(600));
401 }
402
403 #[test]
404 fn test_same_start_multiple_duplicates_chain_through_second() {
405 let ranges = &[ChunkRange::new(0, 6)];
408 let terms = build_refs(&[
409 (ChunkRange::new(0, 2), 200),
410 (ChunkRange::new(0, 4), 400),
411 (ChunkRange::new(0, 5), 500),
412 (ChunkRange::new(4, 6), 200),
413 ]);
414 assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(600));
415 }
416
417 #[test]
418 fn test_same_start_at_midpoint() {
419 let ranges = &[ChunkRange::new(0, 8)];
422 let terms = build_refs(&[
423 (ChunkRange::new(0, 3), 300),
424 (ChunkRange::new(3, 5), 200),
425 (ChunkRange::new(3, 6), 300),
426 (ChunkRange::new(6, 8), 200),
427 ]);
428 assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(800));
429 }
430
431 #[test]
432 fn test_same_start_none_covers() {
433 let ranges = &[ChunkRange::new(0, 10)];
435 let terms = build_refs(&[
436 (ChunkRange::new(0, 2), 200),
437 (ChunkRange::new(0, 4), 400),
438 (ChunkRange::new(0, 6), 600),
439 ]);
440 assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), None);
441 }
442
443 #[test]
444 fn test_same_start_two_groups_chained() {
445 let ranges = &[ChunkRange::new(0, 6)];
448 let terms = build_refs(&[
449 (ChunkRange::new(0, 2), 200),
450 (ChunkRange::new(0, 3), 300),
451 (ChunkRange::new(3, 5), 200),
452 (ChunkRange::new(3, 6), 300),
453 ]);
454 assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(600));
455 }
456
457 #[test]
458 fn test_multiple_disjoint_ranges_both_covered() {
459 let ranges = &[ChunkRange::new(0, 3), ChunkRange::new(5, 8)];
460 let terms = build_refs(&[(ChunkRange::new(0, 3), 300), (ChunkRange::new(5, 8), 400)]);
461 assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(700));
462 }
463
464 #[test]
465 fn test_multiple_disjoint_ranges_one_uncovered() {
466 let ranges = &[ChunkRange::new(0, 3), ChunkRange::new(5, 8)];
467 let terms = build_refs(&[(ChunkRange::new(0, 3), 300)]);
468 assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), None);
469 }
470}