use crate::kernel::io::{FileExtension, IoFactory};
use crate::kernel::lsm::compactor::{SeekScope, LEVEL_0};
use crate::kernel::lsm::mem_table::KeyValue;
use crate::kernel::lsm::storage::{Config, Gen};
use crate::kernel::lsm::table::loader::TableLoader;
use crate::kernel::lsm::table::meta::TableMeta;
use crate::kernel::lsm::table::scope::Scope;
use crate::kernel::lsm::table::Table;
use crate::kernel::lsm::version::cleaner::CleanTag;
use crate::kernel::lsm::version::edit::{EditType, VersionEdit};
use crate::kernel::lsm::version::meta::VersionMeta;
use crate::kernel::lsm::MAX_LEVEL;
use crate::kernel::{sorted_gen_list, KernelResult};
use itertools::Itertools;
use std::fmt;
use std::sync::Arc;
use tokio::sync::mpsc::UnboundedSender;
use tracing::info;
pub(crate) mod cleaner;
pub(crate) mod edit;
pub(crate) mod iter;
mod meta;
pub(crate) mod status;
#[cfg(test)]
mod test;
pub(crate) const DEFAULT_SS_TABLE_PATH: &str = "ss_table";
pub(crate) const DEFAULT_VERSION_PATH: &str = "version";
pub(crate) const DEFAULT_VERSION_LOG_THRESHOLD: usize = 233;
pub(crate) type LevelSlice = [Vec<Scope>; MAX_LEVEL];
pub(crate) enum SeekOption<T> {
Hit(T),
Miss(Option<SeekScope>),
}
fn snapshot_gen(factory: &IoFactory) -> KernelResult<i64> {
if let Ok(gen_list) = sorted_gen_list(factory.get_path(), FileExtension::Log) {
return Ok(match *gen_list.as_slice() {
[.., old_snapshot, new_snapshot] => {
factory.clean(new_snapshot)?;
old_snapshot
}
[snapshot] => snapshot,
_ => Gen::create(),
});
}
Ok(Gen::create())
}
#[derive(Clone)]
pub(crate) struct Version {
pub(crate) version_num: u64,
table_loader: Arc<TableLoader>,
pub(crate) level_slice: LevelSlice,
pub(crate) meta_data: VersionMeta,
clean_tx: UnboundedSender<CleanTag>,
}
impl Version {
pub(crate) fn len(&self) -> usize {
self.meta_data.len
}
pub(crate) fn is_empty(&self) -> bool {
self.len() == 0
}
pub(crate) fn level_len(&self, level: usize) -> usize {
self.level_slice[level].len()
}
pub(crate) fn size_of_disk(&self) -> u64 {
self.meta_data.size_of_disk
}
pub(crate) fn load_from_log(
vec_log: Vec<VersionEdit>,
ss_table_loader: &Arc<TableLoader>,
clean_tx: UnboundedSender<CleanTag>,
) -> KernelResult<Self> {
let mut version = Self {
version_num: 0,
table_loader: Arc::clone(ss_table_loader),
level_slice: Self::level_slice_new(),
meta_data: VersionMeta {
size_of_disk: 0,
len: 0,
},
clean_tx,
};
version.apply(vec_log)?;
info!("[Version][load_from_log]: {version}");
Ok(version)
}
pub(crate) fn apply(&mut self, vec_version_edit: Vec<VersionEdit>) -> KernelResult<()> {
let mut del_gens = Vec::new();
let mut vec_statistics_sst_meta = Vec::new();
for version_edit in vec_version_edit {
match version_edit {
VersionEdit::DeleteFile((mut vec_gen, level), sst_meta) => {
vec_statistics_sst_meta.push(EditType::Del(sst_meta));
self.level_slice[level].retain(|scope| !vec_gen.contains(&scope.gen()));
del_gens.append(&mut vec_gen);
}
VersionEdit::NewFile((vec_scope, level), index, sst_meta) => {
vec_statistics_sst_meta.push(EditType::Add(sst_meta));
let scope_iter = vec_scope.into_iter().sorted_by_key(Scope::gen);
if level == LEVEL_0 {
for scope in scope_iter {
self.level_slice[level].push(scope);
}
} else {
for scope in scope_iter.rev() {
self.level_slice[level].insert(index, scope);
}
}
}
}
}
self.meta_data
.statistical_process(vec_statistics_sst_meta)?;
self.version_num += 1;
self.clean_tx.send(CleanTag::Add {
version: self.version_num,
gens: del_gens,
})?;
Ok(())
}
fn level_slice_new() -> [Vec<Scope>; MAX_LEVEL] {
[Vec::new(), Vec::new(), Vec::new(), Vec::new()]
}
pub(crate) fn to_vec_edit(&self) -> Vec<VersionEdit> {
fn sst_meta_by_level(level: usize, size_of_disk: u64, len: usize) -> TableMeta {
if LEVEL_0 == level {
TableMeta { size_of_disk, len }
} else {
TableMeta::default()
}
}
#[allow(clippy::filter_map_bool_then)]
self.level_slice
.iter()
.enumerate()
.filter_map(|(level, vec_scope)| {
(!vec_scope.is_empty()).then(|| {
VersionEdit::NewFile(
(vec_scope.clone(), level),
0,
sst_meta_by_level(level, self.size_of_disk(), self.len()),
)
})
})
.collect_vec()
}
pub(crate) fn table(&self, level: usize, offset: usize) -> Option<&dyn Table> {
self.level_slice[level]
.get(offset)
.and_then(|scope| self.table_loader.get(scope.gen()))
}
pub(crate) fn tables_by_level_0(&self) -> Vec<&dyn Table> {
self.level_slice[LEVEL_0]
.iter()
.filter_map(|scope| self.table_loader.get(scope.gen()))
.rev()
.collect_vec()
}
pub(crate) fn tables_by_scopes(
&self,
level: usize,
target_scope: &Scope,
) -> (Vec<&dyn Table>, Vec<Scope>, usize) {
let mut first_index = None;
let (tables, scopes): (Vec<&dyn Table>, Vec<Scope>) = self.level_slice[level]
.iter()
.enumerate()
.filter(|(_, scope)| scope.meet(target_scope))
.filter_map(|(index, scope)| {
self.table_loader.get(scope.gen()).map(|ss_table| {
if first_index.is_none() {
first_index = Some(index);
}
(ss_table, scope.clone())
})
})
.unzip();
(tables, scopes, first_index.unwrap_or(0))
}
pub(crate) fn query(&self, key: &[u8]) -> KernelResult<(Option<KeyValue>, Option<SeekScope>)> {
let table_loader = &self.table_loader;
for scope in self.level_slice[LEVEL_0].iter().rev() {
if let SeekOption::Hit(key_value) =
Self::query_by_scope(key, table_loader, scope, LEVEL_0)?
{
return Ok((Some(key_value), None));
}
}
let mut miss_seek = None;
for level in 1..MAX_LEVEL {
let offset = self.query_meet_index(key, level);
if let Some(scope) = self.level_slice[level].get(offset) {
match Self::query_by_scope(key, table_loader, scope, level)? {
SeekOption::Hit(value) => return Ok((Some(value), miss_seek)),
SeekOption::Miss(Some(seek_scope)) => {
let _ = miss_seek.get_or_insert(seek_scope);
}
_ => (),
}
}
}
Ok((None, miss_seek))
}
fn query_by_scope(
key: &[u8],
table_loader: &Arc<TableLoader>,
scope: &Scope,
level: usize,
) -> KernelResult<SeekOption<KeyValue>> {
if scope.meet_by_key(key) {
if let Some(ss_table) = table_loader.get(scope.gen()) {
if let Some(value) = ss_table.query(key)? {
return Ok(SeekOption::Hit(value));
} else if level > LEVEL_0 && scope.seeks_increase() {
return Ok(SeekOption::Miss(Some((scope.clone(), ss_table.level()))));
}
}
}
Ok(SeekOption::Miss(None))
}
pub(crate) fn query_meet_index(&self, key: &[u8], level: usize) -> usize {
self.level_slice[level]
.binary_search_by(|scope| scope.start.as_ref().cmp(key))
.unwrap_or_else(|index| index.saturating_sub(1))
}
pub(crate) fn is_threshold_exceeded_major(&self, config: &Config, level: usize) -> bool {
self.level_slice[level].len()
>= (config.major_threshold_with_sst_size
* config.level_sst_magnification.pow(level as u32))
}
}
impl fmt::Display for Version {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
writeln!(f, "Version: {}", self.version_num)?;
for level in 0..MAX_LEVEL {
writeln!(f, "Level {level}:")?;
write!(f, "\t")?;
for scope in &self.level_slice[level] {
write!(
f,
"[gen: {}, scope:({:?},{:?})]\t",
scope.gen(),
scope.start,
scope.end
)?;
}
writeln!(f)?;
}
writeln!(f, "{:#?}", self.meta_data)
}
}
impl Drop for Version {
#[allow(clippy::let_underscore_must_use)]
fn drop(&mut self) {
let _ = self.clean_tx.send(CleanTag::Clean(self.version_num));
}
}