use std::collections::BTreeMap;
use std::sync::Arc;
use bytes::Bytes;
use tokio::sync::{Mutex, OnceCell};
use xet_client::cas_client::{Client, ProgressCallback};
use xet_client::cas_types::{ChunkRange, Key};
use xet_client::chunk_cache::ChunkCache;
use xet_core_structures::merklehash::MerkleHash;
use xet_runtime::core::xet_config;
use xet_runtime::utils::UniqueId;
use super::super::error::Result;
use super::retrieval_urls::{TermBlockRetrievalURLs, XorbURLProvider};
use crate::progress_tracking::ItemProgressUpdater;
pub struct XorbBlockData {
pub chunk_offsets: Vec<(usize, usize)>,
pub data: Bytes,
}
#[derive(Debug)]
pub struct XorbReference {
pub term_chunks: ChunkRange,
pub uncompressed_size: usize,
}
pub struct XorbBlock {
pub xorb_hash: MerkleHash,
pub chunk_ranges: Vec<ChunkRange>,
pub xorb_block_index: usize,
pub references: Vec<XorbReference>,
pub uncompressed_size_if_known: Option<usize>,
pub data: OnceCell<Arc<XorbBlockData>>,
}
impl PartialEq for XorbBlock {
fn eq(&self, other: &Self) -> bool {
self.xorb_hash == other.xorb_hash
&& self.chunk_ranges == other.chunk_ranges
&& self.xorb_block_index == other.xorb_block_index
}
}
impl Eq for XorbBlock {}
fn build_chunk_offsets(chunk_ranges: &[ChunkRange], byte_offsets: &[u32]) -> Vec<(usize, usize)> {
let mut chunk_offsets = Vec::new();
let mut offset_idx = 0;
for range in chunk_ranges {
for chunk_idx in range.start..range.end {
chunk_offsets.push((chunk_idx as usize, byte_offsets[offset_idx] as usize));
offset_idx += 1;
}
}
chunk_offsets
}
impl XorbBlock {
pub async fn retrieve_data(
self: Arc<Self>,
client: Arc<dyn Client>,
url_info: Arc<TermBlockRetrievalURLs>,
progress_updater: Option<Arc<ItemProgressUpdater>>,
chunk_cache: Option<Arc<dyn ChunkCache>>,
) -> Result<Arc<XorbBlockData>> {
let xorb_block_index = self.xorb_block_index;
let uncompressed_size_if_known = self.uncompressed_size_if_known;
let chunk_ranges = self.chunk_ranges.clone();
self.data
.get_or_try_init(|| async {
if let Some(ref cache) = chunk_cache {
let cache_key = Key {
prefix: xet_config().data.default_prefix.clone(),
hash: self.xorb_hash,
};
let chunk_range = chunk_ranges.first().copied().unwrap_or_default();
if let Ok(Some(cache_range)) = cache.get(&cache_key, &chunk_range).await {
if let Some(ref updater) = progress_updater {
let (_, _, http_ranges) = url_info.get_retrieval_url(xorb_block_index).await;
let transfer_bytes: u64 = http_ranges.iter().map(|r| r.length()).sum();
updater.report_transfer_progress(transfer_bytes);
}
let chunk_offsets = build_chunk_offsets(&chunk_ranges, &cache_range.offsets);
let data = Bytes::from(cache_range.data);
return Ok(Arc::new(XorbBlockData { chunk_offsets, data }));
}
}
let permit = client.acquire_download_permit().await?;
let url_provider = XorbURLProvider {
client: client.clone(),
url_info,
xorb_block_index,
last_acquisition_id: Mutex::new(UniqueId::null()),
};
let progress_callback: Option<ProgressCallback> = progress_updater.as_ref().map(|updater| {
let updater = updater.clone();
Arc::new(move |delta: u64, _completed: u64, _total: u64| {
updater.report_transfer_progress(delta);
}) as ProgressCallback
});
let (data, chunk_byte_offsets) = client
.get_file_term_data(Box::new(url_provider), permit, progress_callback, uncompressed_size_if_known)
.await?;
if let Some(cache) = chunk_cache {
let cache_key = Key {
prefix: xet_config().data.default_prefix.clone(),
hash: self.xorb_hash,
};
let chunk_range = chunk_ranges.first().copied().unwrap_or_default();
let data = data.clone();
let chunk_byte_offsets = chunk_byte_offsets.clone();
tokio::spawn(async move {
if let Err(err) = cache.put(&cache_key, &chunk_range, &chunk_byte_offsets, &data).await {
tracing::warn!("chunk cache put failed: {err}");
}
});
}
let chunk_offsets = build_chunk_offsets(&chunk_ranges, &chunk_byte_offsets);
Ok(Arc::new(XorbBlockData { chunk_offsets, data }))
})
.await
.cloned()
}
pub fn determine_size_if_possible(xorb_ranges: &[ChunkRange], terms: &[XorbReference]) -> Option<usize> {
debug_assert!(
terms.windows(2).all(|w| w[0].term_chunks.start <= w[1].term_chunks.start),
"terms must be sorted by chunk range start"
);
debug_assert!(
terms.iter().all(|term| xorb_ranges
.iter()
.any(|r| term.term_chunks.start >= r.start && term.term_chunks.end <= r.end)),
"all terms must fall within one of the xorb ranges"
);
if xorb_ranges.is_empty() {
return Some(0);
}
let gap_bridges: BTreeMap<u32, u32> = xorb_ranges
.windows(2)
.filter(|pair| pair[0].end < pair[1].start)
.map(|pair| (pair[0].end, pair[1].start))
.collect();
let mut reachable: BTreeMap<u32, usize> = BTreeMap::new();
reachable.insert(xorb_ranges[0].start, 0);
for term in terms {
if let Some(&accumulated) = reachable.get(&term.term_chunks.start) {
let new_end = term.term_chunks.end;
let new_size = accumulated + term.uncompressed_size;
reachable.entry(new_end).or_insert(new_size);
if let Some(&bridge_target) = gap_bridges.get(&new_end) {
reachable.entry(bridge_target).or_insert(new_size);
}
}
}
reachable.get(&xorb_ranges.last().unwrap().end).copied()
}
}
#[cfg(test)]
mod tests {
use xet_client::cas_types::ChunkRange;
use super::*;
fn build_refs(pairs: &[(ChunkRange, usize)]) -> Vec<XorbReference> {
pairs
.iter()
.map(|(range, size)| XorbReference {
term_chunks: *range,
uncompressed_size: *size,
})
.collect()
}
#[test]
fn test_single_term_exact_match() {
let ranges = &[ChunkRange::new(0, 5)];
let terms = build_refs(&[(ChunkRange::new(0, 5), 1000)]);
assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(1000));
}
#[test]
fn test_two_terms_chained() {
let ranges = &[ChunkRange::new(0, 5)];
let terms = build_refs(&[(ChunkRange::new(0, 3), 600), (ChunkRange::new(3, 5), 400)]);
assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(1000));
}
#[test]
fn test_three_terms_chained() {
let ranges = &[ChunkRange::new(0, 6)];
let terms = build_refs(&[
(ChunkRange::new(0, 2), 200),
(ChunkRange::new(2, 4), 300),
(ChunkRange::new(4, 6), 500),
]);
assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(1000));
}
#[test]
fn test_gap_in_chain() {
let ranges = &[ChunkRange::new(0, 6)];
let terms = build_refs(&[(ChunkRange::new(0, 2), 200), (ChunkRange::new(4, 6), 500)]);
assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), None);
}
#[test]
fn test_does_not_start_at_xorb_start() {
let ranges = &[ChunkRange::new(0, 5)];
let terms = build_refs(&[(ChunkRange::new(1, 5), 800)]);
assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), None);
}
#[test]
fn test_does_not_end_at_xorb_end() {
let ranges = &[ChunkRange::new(0, 5)];
let terms = build_refs(&[(ChunkRange::new(0, 3), 600)]);
assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), None);
}
#[test]
fn test_empty_terms() {
let ranges = &[ChunkRange::new(0, 5)];
let terms: Vec<XorbReference> = vec![];
assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), None);
}
#[test]
fn test_overlapping_terms_with_exact_cover() {
let ranges = &[ChunkRange::new(0, 5)];
let terms = build_refs(&[
(ChunkRange::new(0, 3), 600),
(ChunkRange::new(1, 4), 700),
(ChunkRange::new(3, 5), 400),
]);
assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(1000));
}
#[test]
fn test_duplicate_terms_first_covers() {
let ranges = &[ChunkRange::new(0, 5)];
let terms = build_refs(&[(ChunkRange::new(0, 5), 1000), (ChunkRange::new(0, 5), 1000)]);
assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(1000));
}
#[test]
fn test_nonzero_xorb_start() {
let ranges = &[ChunkRange::new(3, 8)];
let terms = build_refs(&[(ChunkRange::new(3, 5), 400), (ChunkRange::new(5, 8), 600)]);
assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(1000));
}
#[test]
fn test_nonzero_xorb_start_no_match() {
let ranges = &[ChunkRange::new(3, 8)];
let terms = build_refs(&[(ChunkRange::new(3, 5), 400)]);
assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), None);
}
#[test]
fn test_single_chunk_range() {
let ranges = &[ChunkRange::new(0, 1)];
let terms = build_refs(&[(ChunkRange::new(0, 1), 42)]);
assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(42));
}
#[test]
fn test_chain_with_overlapping_inner_terms() {
let ranges = &[ChunkRange::new(2, 8)];
let terms = build_refs(&[
(ChunkRange::new(2, 5), 500),
(ChunkRange::new(3, 6), 999),
(ChunkRange::new(5, 8), 300),
]);
assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(800));
}
#[test]
fn test_partial_overlap_no_cover() {
let ranges = &[ChunkRange::new(0, 10)];
let terms = build_refs(&[
(ChunkRange::new(0, 4), 400),
(ChunkRange::new(3, 7), 400),
(ChunkRange::new(6, 10), 400),
]);
assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), None);
}
#[test]
fn test_same_start_short_then_long_covering_full() {
let ranges = &[ChunkRange::new(0, 5)];
let terms = build_refs(&[(ChunkRange::new(0, 3), 300), (ChunkRange::new(0, 5), 500)]);
assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(500));
}
#[test]
fn test_same_start_short_then_long_with_chain() {
let ranges = &[ChunkRange::new(0, 6)];
let terms = build_refs(&[
(ChunkRange::new(0, 2), 200),
(ChunkRange::new(0, 3), 300),
(ChunkRange::new(3, 6), 300),
]);
assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(600));
}
#[test]
fn test_same_start_multiple_duplicates_chain_through_second() {
let ranges = &[ChunkRange::new(0, 6)];
let terms = build_refs(&[
(ChunkRange::new(0, 2), 200),
(ChunkRange::new(0, 4), 400),
(ChunkRange::new(0, 5), 500),
(ChunkRange::new(4, 6), 200),
]);
assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(600));
}
#[test]
fn test_same_start_at_midpoint() {
let ranges = &[ChunkRange::new(0, 8)];
let terms = build_refs(&[
(ChunkRange::new(0, 3), 300),
(ChunkRange::new(3, 5), 200),
(ChunkRange::new(3, 6), 300),
(ChunkRange::new(6, 8), 200),
]);
assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(800));
}
#[test]
fn test_same_start_none_covers() {
let ranges = &[ChunkRange::new(0, 10)];
let terms = build_refs(&[
(ChunkRange::new(0, 2), 200),
(ChunkRange::new(0, 4), 400),
(ChunkRange::new(0, 6), 600),
]);
assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), None);
}
#[test]
fn test_same_start_two_groups_chained() {
let ranges = &[ChunkRange::new(0, 6)];
let terms = build_refs(&[
(ChunkRange::new(0, 2), 200),
(ChunkRange::new(0, 3), 300),
(ChunkRange::new(3, 5), 200),
(ChunkRange::new(3, 6), 300),
]);
assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(600));
}
#[test]
fn test_multiple_disjoint_ranges_both_covered() {
let ranges = &[ChunkRange::new(0, 3), ChunkRange::new(5, 8)];
let terms = build_refs(&[(ChunkRange::new(0, 3), 300), (ChunkRange::new(5, 8), 400)]);
assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), Some(700));
}
#[test]
fn test_multiple_disjoint_ranges_one_uncovered() {
let ranges = &[ChunkRange::new(0, 3), ChunkRange::new(5, 8)];
let terms = build_refs(&[(ChunkRange::new(0, 3), 300)]);
assert_eq!(XorbBlock::determine_size_if_possible(ranges, &terms), None);
}
}