use tokio::sync::RwLock;
use crate::{
err::Error,
sst::Table,
types::{self},
};
use std::{
cmp::Ordering,
collections::HashMap,
path::{Path, PathBuf},
sync::Arc,
};
pub type BiggestKey = types::Key;
pub type SmallestKey = types::Key;
#[derive(Clone, Debug)]
pub struct KeyRange {
pub key_ranges: Arc<RwLock<HashMap<PathBuf, Range>>>,
pub restored_ranges: Arc<RwLock<HashMap<PathBuf, Range>>>,
}
#[derive(Clone, Debug)]
pub struct Range {
pub smallest_key: SmallestKey,
pub biggest_key: BiggestKey,
pub sst: Table,
}
impl Range {
pub fn new<T: AsRef<[u8]>>(smallest_key: T, biggest_key: T, sst: Table) -> Self {
Self {
smallest_key: smallest_key.as_ref().to_vec(),
biggest_key: biggest_key.as_ref().to_vec(),
sst,
}
}
}
impl Default for KeyRange {
fn default() -> Self {
Self::new()
}
}
impl KeyRange {
pub fn new() -> Self {
Self {
key_ranges: Arc::new(RwLock::new(HashMap::new())),
restored_ranges: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn set<P: AsRef<Path> + Send + Sync, T: AsRef<[u8]>>(
&self,
sst_dir: P,
smallest_key: T,
biggest_key: T,
table: Table,
) -> bool {
self.key_ranges
.write()
.await
.insert(
sst_dir.as_ref().to_path_buf(),
Range::new(smallest_key.as_ref(), biggest_key.as_ref(), table),
)
.is_some()
}
pub async fn remove<P: AsRef<Path> + Send + Sync>(&self, sst_path: P) -> bool {
self.key_ranges.write().await.remove(sst_path.as_ref()).is_some()
}
pub async fn filter_sstables_by_key_range<K: AsRef<[u8]> + std::fmt::Debug>(
&self,
key: K,
) -> Result<Vec<Table>, Error> {
let mut filtered_ssts: Vec<Table> = Vec::new();
let has_restored_ranges = !self.restored_ranges.read().await.is_empty();
if has_restored_ranges {
filtered_ssts = self.check_restored_key_ranges(key.as_ref()).await?;
}
let mut restored_range_map: HashMap<PathBuf, Range> = HashMap::new();
for (_, range) in self.key_ranges.read().await.iter() {
if has_restored_ranges
&& self
.restored_ranges
.read()
.await
.contains_key(range.sst.dir.as_path())
{
continue;
}
let searched_key = key.as_ref().to_vec();
if searched_key >= range.smallest_key && searched_key <= range.biggest_key {
if range.sst.filter.as_ref().unwrap().sst_dir.is_none() {
let mut mut_range = range.to_owned();
let mut filter = mut_range.sst.filter.as_ref().unwrap().to_owned();
filter.recover_meta().await?;
filter.sst_dir = Some(mut_range.sst.dir.to_owned());
mut_range.sst.load_entries_from_file().await?;
filter.build_filter_from_entries(&mut_range.sst.entries);
mut_range.sst.entries.clear();
mut_range.sst.filter = Some(filter.to_owned());
restored_range_map.insert(mut_range.sst.dir.to_owned(), mut_range.to_owned());
if filter.contains(key.as_ref()) {
filtered_ssts.push(mut_range.sst);
continue;
}
}
if range.sst.filter.as_ref().unwrap().contains(key.as_ref()) {
filtered_ssts.push(range.sst.to_owned())
}
}
}
if !restored_range_map.is_empty() {
let restored_ranges = self.restored_ranges.clone();
tokio::spawn(async move {
restored_range_map.clone_into(&mut (*restored_ranges.write().await));
drop(restored_ranges);
});
}
Ok(filtered_ssts)
}
pub async fn check_restored_key_ranges<K: AsRef<[u8]>>(&self, key: K) -> Result<Vec<Table>, Error> {
let mut filtered_ssts: Vec<Table> = Vec::new();
let key_ranges = self.restored_ranges.read().await;
for (_, range) in key_ranges.iter() {
let searched_key = key.as_ref().to_vec();
if searched_key >= range.smallest_key
&& searched_key <= range.biggest_key
&& range.sst.filter.as_ref().unwrap().contains(key.as_ref())
{
filtered_ssts.push(range.sst.to_owned())
}
}
Ok(filtered_ssts)
}
pub async fn update_key_range(&self) {
let restored_ranges = self.restored_ranges.read().await;
if !restored_ranges.is_empty() {
for (path, range) in restored_ranges.iter() {
self.key_ranges
.write()
.await
.insert(path.to_owned(), range.to_owned());
}
drop(restored_ranges);
self.restored_ranges.write().await.clear();
}
}
pub async fn range_query_scan<T: AsRef<[u8]>>(&self, start_key: T, end_key: T) -> Vec<Range> {
self.key_ranges
.read()
.await
.iter()
.filter(|(_, range)| {
(range.smallest_key.as_slice().cmp(start_key.as_ref()) == Ordering::Less
|| range.smallest_key.as_slice().cmp(start_key.as_ref()) == Ordering::Equal)
|| (range.biggest_key.as_slice().cmp(end_key.as_ref()) == Ordering::Greater
|| range.biggest_key.as_slice().cmp(end_key.as_ref()) == Ordering::Equal)
})
.map(|(_, path)| path.to_owned())
.collect()
}
}