use std::borrow::{Borrow, Cow};
use std::collections::BTreeMap;
use std::ops::Range;
use std::path::{Path, PathBuf};
use parking_lot::Mutex;
use tantivy::directory::OwnedBytes;
#[derive(Clone, PartialOrd, Ord, PartialEq, Eq)]
struct CacheKey<'a, T: ToOwned + ?Sized> {
tag: Cow<'a, T>,
range_start: u64,
}
impl<T: ToOwned + ?Sized> CacheKey<'static, T> {
fn from_owned(tag: T::Owned, range_start: u64) -> Self {
CacheKey {
tag: Cow::Owned(tag),
range_start,
}
}
}
impl<'a, T: ToOwned + ?Sized> CacheKey<'a, T> {
fn from_borrowed(tag: &'a T, range_start: u64) -> Self {
CacheKey {
tag: Cow::Borrowed(tag),
range_start,
}
}
}
struct CacheValue {
range_end: u64,
bytes: OwnedBytes,
}
struct NeedMutByteRangeCache<T: 'static + ToOwned + ?Sized> {
cache: BTreeMap<CacheKey<'static, T>, CacheValue>,
num_items: u64,
num_bytes: u64,
}
impl<T: 'static + ToOwned + ?Sized + Ord> NeedMutByteRangeCache<T>
where
T::Owned: std::fmt::Debug,
{
fn with_infinite_capacity() -> Self {
NeedMutByteRangeCache {
cache: BTreeMap::new(),
num_items: 0,
num_bytes: 0,
}
}
fn get_slice(&mut self, tag: &T, byte_range: Range<u64>) -> Option<OwnedBytes> {
if byte_range.start == byte_range.end {
return Some(OwnedBytes::empty());
}
let key = CacheKey::from_borrowed(tag, byte_range.start);
let (k, v) = if let Some((k, v)) = self.get_block(&key, byte_range.end) {
(k, v)
} else {
return None;
};
let start = (byte_range.start - k.range_start) as usize;
let end = (byte_range.end - k.range_start) as usize;
Some(v.bytes.slice(start..end))
}
fn put_slice(&mut self, tag: T::Owned, byte_range: Range<u64>, bytes: OwnedBytes) {
let len = (byte_range.end - byte_range.start) as usize;
assert_eq!(
len,
bytes.len(),
"declared byte_range {:?} length is not equal to data length {} for tag {:?}",
byte_range,
bytes.len(),
tag
);
if len == 0 {
return;
}
let start_key = CacheKey::from_borrowed(tag.borrow(), byte_range.start);
let end_key = CacheKey::from_borrowed(tag.borrow(), byte_range.end);
let first_matching_block = self.get_block(&start_key, byte_range.start).map(|(k, _v)| k);
let last_matching_block = self.get_block(&end_key, byte_range.end).map(|(k, _v)| k);
if first_matching_block.is_some() && first_matching_block == last_matching_block {
return;
}
let first_matching_block = first_matching_block.unwrap_or(&start_key);
let last_matching_block = last_matching_block.unwrap_or(&end_key);
let overlapping: Vec<Range<u64>> = self
.cache
.range(first_matching_block..=last_matching_block)
.map(|(k, v)| k.range_start..v.range_end)
.collect();
let can_drop_first = overlapping.first().map(|r| byte_range.start <= r.start).unwrap_or(true);
let can_drop_last = overlapping.last().map(|r| byte_range.end >= r.end).unwrap_or(true);
let (final_range, final_bytes) = if can_drop_first && can_drop_last {
(byte_range, bytes)
} else {
let start = if can_drop_first {
byte_range.start
} else {
overlapping.first().expect("impossible").start
};
let end = if can_drop_last {
byte_range.end
} else {
overlapping.last().expect("impossible").end
};
let mut buffer = Vec::with_capacity((end - start) as usize);
if !can_drop_first {
let first_range = overlapping.first().expect("impossible");
let key = CacheKey::from_borrowed(tag.borrow(), first_range.start);
let block = self.cache.get(&key).expect("impossible");
let len = (first_range.end.min(byte_range.start) - first_range.start) as usize;
buffer.extend_from_slice(&block.bytes[..len]);
}
buffer.extend_from_slice(&bytes);
if !can_drop_last {
let last_range = overlapping.last().expect("impossible");
let key = CacheKey::from_borrowed(tag.borrow(), last_range.start);
let block = self.cache.get(&key).expect("impossible");
let start = (last_range.start.max(byte_range.end) - last_range.start) as usize;
buffer.extend_from_slice(&block.bytes[start..]);
}
debug_assert_eq!((end - start) as usize, buffer.len());
(start..end, OwnedBytes::new(buffer))
};
let mut key = CacheKey::from_owned(tag, 0);
for range in overlapping.into_iter() {
key.range_start = range.start;
self.cache.remove(&key);
self.update_counter_drop_item((range.end - range.start) as usize);
}
key.range_start = final_range.start;
let value = CacheValue {
range_end: final_range.end,
bytes: final_bytes,
};
self.cache.insert(key, value);
self.update_counter_record_item((final_range.end - final_range.start) as usize);
}
fn get_block<'a, 'b: 'a>(&'a self, query: &CacheKey<'b, T>, range_end: u64) -> Option<(&CacheKey<'_, T>, &CacheValue)> {
self.cache
.range(..=query)
.next_back()
.filter(|(k, v)| k.tag == query.tag && range_end <= v.range_end)
}
fn update_counter_record_item(&mut self, num_bytes: usize) {
self.num_items += 1;
self.num_bytes += num_bytes as u64;
}
fn update_counter_drop_item(&mut self, num_bytes: usize) {
self.num_items -= 1;
self.num_bytes -= num_bytes as u64;
}
}
pub struct ByteRangeCache {
inner: Mutex<NeedMutByteRangeCache<Path>>,
}
impl ByteRangeCache {
pub fn with_infinite_capacity() -> Self {
ByteRangeCache {
inner: Mutex::new(NeedMutByteRangeCache::with_infinite_capacity()),
}
}
pub fn get_slice(&self, path: &Path, byte_range: Range<u64>) -> Option<OwnedBytes> {
self.inner.lock().get_slice(path, byte_range)
}
pub fn put_slice(&self, path: PathBuf, byte_range: Range<u64>, bytes: OwnedBytes) {
self.inner.lock().put_slice(path, byte_range, bytes)
}
}