use crate::kernel::lsm::mem_table::{KeyValue, MemTable};
use crate::kernel::lsm::storage::{Config, StoreInner};
use crate::kernel::lsm::table::meta::TableMeta;
use crate::kernel::lsm::table::scope::Scope;
use crate::kernel::lsm::table::{collect_gen, Table};
use crate::kernel::lsm::version::edit::VersionEdit;
use crate::kernel::lsm::version::status::VersionStatus;
use crate::kernel::lsm::{data_sharding, MAX_LEVEL};
use crate::kernel::KernelResult;
use crate::KernelError;
use bytes::Bytes;
use futures::future;
use itertools::Itertools;
use std::collections::HashSet;
use std::mem;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::oneshot;
use tracing::info;
pub(crate) const LEVEL_0: usize = 0;
pub(crate) type MergeShardingVec = Vec<(i64, Vec<KeyValue>)>;
pub(crate) type DelNode = (Vec<i64>, TableMeta);
pub(crate) type DelNodeTuple = (DelNode, DelNode);
pub type SeekScope = (Scope, usize);
#[derive(Debug)]
pub enum CompactTask {
Seek(SeekScope),
Flush(Option<oneshot::Sender<()>>),
}
pub(crate) struct Compactor {
store_inner: Arc<StoreInner>,
}
impl Compactor {
pub(crate) fn new(store_inner: Arc<StoreInner>) -> Self {
Compactor { store_inner }
}
pub(crate) async fn check_then_compaction(
&mut self,
option_tx: Option<oneshot::Sender<()>>,
) -> KernelResult<()> {
if let Some((gen, values)) = self.mem_table().swap()? {
if !values.is_empty() {
let start = Instant::now();
self.minor_compaction(gen, values).await?;
info!("[Compactor][Compaction Drop][Time: {:?}]", start.elapsed());
}
}
if let Some(tx) = option_tx {
tx.send(()).map_err(|_| KernelError::ChannelClose)?
}
Ok(())
}
pub(crate) async fn minor_compaction(
&self,
gen: i64,
values: Vec<KeyValue>,
) -> KernelResult<()> {
if !values.is_empty() {
let (scope, meta) = self
.ver_status()
.loader()
.create(
gen,
values,
LEVEL_0,
self.config().level_table_type[LEVEL_0],
)
.await?;
self.major_compaction(
LEVEL_0,
scope.clone(),
vec![VersionEdit::NewFile((vec![scope], 0), 0, meta)],
false,
)
.await?;
}
Ok(())
}
pub(crate) async fn major_compaction(
&self,
mut level: usize,
scope: Scope,
mut vec_ver_edit: Vec<VersionEdit>,
mut is_skip_sized: bool,
) -> KernelResult<()> {
let config = self.config();
let mut is_over = false;
if level > MAX_LEVEL - 1 {
return Err(KernelError::LevelOver);
}
while level < MAX_LEVEL && !is_over {
let next_level = level + 1;
if let Some((
index,
((del_gens_l, del_meta_l), (del_gens_ll, del_meta_ll)),
vec_sharding,
)) = self
.data_loading_with_level(level, &scope, mem::replace(&mut is_skip_sized, false))
.await?
{
let start = Instant::now();
let table_futures = vec_sharding.into_iter().map(|(gen, sharding)| {
self.ver_status().loader().create(
gen,
sharding,
next_level,
config.level_table_type[next_level],
)
});
let vec_table_and_scope: Vec<(Scope, TableMeta)> =
future::try_join_all(table_futures).await?;
let (new_scopes, new_metas): (Vec<Scope>, Vec<TableMeta>) =
vec_table_and_scope.into_iter().unzip();
let fusion_meta = TableMeta::fusion(&new_metas);
vec_ver_edit.append(&mut vec![
VersionEdit::NewFile((new_scopes, next_level), index, fusion_meta),
VersionEdit::DeleteFile((del_gens_l, level), del_meta_l),
VersionEdit::DeleteFile((del_gens_ll, next_level), del_meta_ll),
]);
info!(
"[LsmStore][Major Compaction][recreate_sst][Level: {}][Time: {:?}]",
level,
start.elapsed()
);
level += 1;
} else {
is_over = true;
}
if !vec_ver_edit.is_empty() {
self.ver_status()
.log_and_apply(
mem::take(&mut vec_ver_edit),
config.ver_log_snapshot_threshold,
)
.await?;
}
}
Ok(())
}
async fn data_loading_with_level(
&self,
level: usize,
target: &Scope,
is_skip_sized: bool,
) -> KernelResult<Option<(usize, DelNodeTuple, MergeShardingVec)>> {
let version = self.ver_status().current().await;
let config = self.config();
let next_level = level + 1;
if level > MAX_LEVEL - 2
|| !(is_skip_sized || version.is_threshold_exceeded_major(config, level))
{
return Ok(None);
}
let start = Instant::now();
let (tables_l, scopes_l, _) = version.tables_by_scopes(level, target);
if scopes_l.is_empty() {
return Ok(None);
}
let fusion_scope_l = Scope::fusion(&scopes_l).unwrap_or(target.clone());
let (tables_ll, _, index) = version.tables_by_scopes(next_level, &fusion_scope_l);
let del_gen_l = collect_gen(&tables_l)?;
let del_gen_ll = collect_gen(&tables_ll)?;
let vec_merge_sharding =
Self::data_merge_and_sharding(tables_l, tables_ll, config.sst_file_size).await?;
info!(
"[LsmStore][Major Compaction][data_loading_with_level][Time: {:?}]",
start.elapsed()
);
Ok(Some((index, (del_gen_l, del_gen_ll), vec_merge_sharding)))
}
#[allow(clippy::mutable_key_type)]
async fn data_merge_and_sharding(
tables_l: Vec<&dyn Table>,
tables_ll: Vec<&dyn Table>,
file_size: usize,
) -> KernelResult<MergeShardingVec> {
let map_futures_l = tables_l
.iter()
.sorted_unstable_by_key(|table| table.gen())
.map(|table| async { Self::table_load_data(table, |_| true) });
let sharding_l = future::try_join_all(map_futures_l).await?;
let filter_set_l: HashSet<&Bytes> = sharding_l
.iter()
.flatten()
.map(|key_value| &key_value.0)
.collect();
let sharding_ll = future::try_join_all(tables_ll.iter().map(|table| async {
Self::table_load_data(table, |key| !filter_set_l.contains(key))
}))
.await?;
let vec_cmd_data = sharding_ll
.into_iter()
.chain(sharding_l)
.flatten()
.rev()
.unique_by(|(key, _)| key.clone())
.sorted_unstable_by_key(|(key, _)| key.clone())
.collect();
Ok(data_sharding(vec_cmd_data, file_size))
}
fn table_load_data<F>(table: &&dyn Table, fn_is_filter: F) -> KernelResult<Vec<KeyValue>>
where
F: Fn(&Bytes) -> bool,
{
let mut iter = table.iter()?;
let mut vec_cmd = Vec::with_capacity(table.len());
while let Some(item) = iter.try_next()? {
if fn_is_filter(&item.0) {
vec_cmd.push(item)
}
}
Ok(vec_cmd)
}
pub(crate) fn config(&self) -> &Config {
&self.store_inner.config
}
pub(crate) fn mem_table(&self) -> &MemTable {
&self.store_inner.mem_table
}
pub(crate) fn ver_status(&self) -> &VersionStatus {
&self.store_inner.ver_status
}
}
#[cfg(test)]
mod tests {
use crate::kernel::io::{FileExtension, IoFactory, IoType};
use crate::kernel::lsm::compactor::{Compactor, LEVEL_0};
use crate::kernel::lsm::storage::{Config, KipStorage, StoreInner};
use crate::kernel::lsm::table::meta::TableMeta;
use crate::kernel::lsm::table::scope::Scope;
use crate::kernel::lsm::table::ss_table::SSTable;
use crate::kernel::lsm::table::TableType;
use crate::kernel::lsm::trigger::TriggerType;
use crate::kernel::lsm::version::edit::VersionEdit;
use crate::kernel::lsm::version::DEFAULT_SS_TABLE_PATH;
use crate::kernel::utils::lru_cache::ShardingLruCache;
use crate::kernel::{KernelResult, Storage};
use bytes::Bytes;
use itertools::Itertools;
use std::collections::hash_map::RandomState;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::Arc;
use std::time::Instant;
use tempfile::TempDir;
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn test_lsm_major_compactor() -> KernelResult<()> {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let times = 30_000;
let value = b"Stray birds of summer come to my window to sing and fly away.
And yellow leaves of autumn, which have no songs, flutter and fall
there with a sign.";
let config = Config::new(temp_dir.path().to_str().unwrap())
.major_threshold_with_sst_size(4)
.level_sst_magnification(1)
.enable_level_0_memorization()
.minor_trigger_with_threshold(TriggerType::Count, 1000);
let kv_store = KipStorage::open_with_config(config).await?;
let mut vec_kv = Vec::new();
for i in 0..times {
let vec_u8 = bincode::serialize(&i)?;
vec_kv.push((
Bytes::from(vec_u8.clone()),
Bytes::from(vec_u8.into_iter().chain(value.to_vec()).collect_vec()),
));
}
let start = Instant::now();
assert_eq!(times % 1000, 0);
for i in 0..times / 1000 {
for j in 0..1000 {
kv_store
.set(
vec_kv[i * 1000 + j].0.clone(),
vec_kv[i * 1000 + j].1.clone(),
)
.await?;
}
kv_store.flush().await?;
}
println!("[set_for][Time: {:?}]", start.elapsed());
let version = kv_store.current_version().await;
let level_slice = &version.level_slice;
println!("MajorCompaction Test: {:#?}", level_slice);
assert!(!level_slice[0].is_empty());
assert!(
!level_slice[1].is_empty() || !level_slice[2].is_empty() || !level_slice[3].is_empty()
);
for (level, slice) in level_slice.iter().enumerate() {
if !slice.is_empty() && level != LEVEL_0 {
let mut tmp_scope: Option<&Scope> = None;
for scope in slice {
if let Some(last_scope) = tmp_scope {
assert!(last_scope.end < scope.start);
}
tmp_scope = Some(scope);
}
}
}
assert_eq!(kv_store.len().await?, times);
let start = Instant::now();
for kv in vec_kv.iter().take(times) {
assert_eq!(kv_store.get(&kv.0).await?, Some(kv.1.clone()));
}
println!("[get_for][Time: {:?}]", start.elapsed());
kv_store.flush().await?;
Ok(())
}
#[tokio::test]
async fn test_data_merge() -> KernelResult<()> {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let config = Config::new(temp_dir.into_path());
let sst_factory = IoFactory::new(
config.dir_path.join(DEFAULT_SS_TABLE_PATH),
FileExtension::SSTable,
)?;
let cache = Arc::new(ShardingLruCache::new(
config.block_cache_size,
16,
RandomState::default(),
)?);
let ss_table_1 = SSTable::new(
&sst_factory,
&config,
Arc::clone(&cache),
1,
vec![
(Bytes::from_static(b"1"), Some(Bytes::from_static(b"1"))),
(Bytes::from_static(b"2"), Some(Bytes::from_static(b"2"))),
(Bytes::from_static(b"3"), Some(Bytes::from_static(b"31"))),
],
0,
IoType::Direct,
)
.await?;
let ss_table_2 = SSTable::new(
&sst_factory,
&config,
Arc::clone(&cache),
2,
vec![
(Bytes::from_static(b"3"), Some(Bytes::from_static(b"3"))),
(Bytes::from_static(b"4"), Some(Bytes::from_static(b"4"))),
],
0,
IoType::Direct,
)
.await?;
let ss_table_3 = SSTable::new(
&sst_factory,
&config,
Arc::clone(&cache),
3,
vec![
(Bytes::from_static(b"1"), Some(Bytes::from_static(b"11"))),
(Bytes::from_static(b"2"), Some(Bytes::from_static(b"21"))),
],
1,
IoType::Direct,
)
.await?;
let ss_table_4 = SSTable::new(
&sst_factory,
&config,
Arc::clone(&cache),
4,
vec![
(Bytes::from_static(b"3"), Some(Bytes::from_static(b"32"))),
(Bytes::from_static(b"4"), Some(Bytes::from_static(b"41"))),
(Bytes::from_static(b"5"), Some(Bytes::from_static(b"5"))),
],
1,
IoType::Direct,
)
.await?;
let (_, vec_data) = &Compactor::data_merge_and_sharding(
vec![&ss_table_1, &ss_table_2],
vec![&ss_table_3, &ss_table_4],
config.sst_file_size,
)
.await?[0];
assert_eq!(
vec_data,
&vec![
(Bytes::from_static(b"1"), Some(Bytes::from_static(b"1"))),
(Bytes::from_static(b"2"), Some(Bytes::from_static(b"2"))),
(Bytes::from_static(b"3"), Some(Bytes::from_static(b"3"))),
(Bytes::from_static(b"4"), Some(Bytes::from_static(b"4"))),
(Bytes::from_static(b"5"), Some(Bytes::from_static(b"5")))
]
);
Ok(())
}
#[test]
fn test_seek_compaction() -> KernelResult<()> {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let config = Config::new(temp_dir.into_path());
tokio_test::block_on(async move {
let inner = StoreInner::new(config).await?;
let compactor = Compactor::new(Arc::new(inner));
let version_status = compactor.ver_status();
let table_loader = version_status.loader();
let (scope_1, meta_1) = table_loader
.create(
1,
vec![
(Bytes::from_static(b"1"), None),
(Bytes::from_static(b"2"), None),
],
1,
TableType::BTree,
)
.await?;
let (scope_2, meta_2) = table_loader
.create(
2,
vec![
(Bytes::from_static(b"3"), None),
(Bytes::from_static(b"5"), None),
(Bytes::from_static(b"6"), None),
],
1,
TableType::BTree,
)
.await?;
let (scope_3, meta_3) = table_loader
.create(
3,
vec![
(Bytes::from_static(b"1"), None),
(Bytes::from_static(b"2"), None),
],
2,
TableType::BTree,
)
.await?;
let (scope_4, meta_4) = table_loader
.create(
4,
vec![
(Bytes::from_static(b"3"), None),
(Bytes::from_static(b"4"), None),
],
2,
TableType::BTree,
)
.await?;
let (scope_5, meta_5) = table_loader
.create(
5,
vec![
(Bytes::from_static(b"5"), None),
(Bytes::from_static(b"6"), None),
],
2,
TableType::BTree,
)
.await?;
version_status
.log_and_apply(
vec![
VersionEdit::NewFile(
(vec![scope_1, scope_2], 1),
0,
TableMeta::fusion(&[meta_1, meta_2]),
),
VersionEdit::NewFile(
(vec![scope_3, scope_4, scope_5], 2),
0,
TableMeta::fusion(&[meta_3, meta_4, meta_5]),
),
],
114514,
)
.await?;
let version_1 = version_status.current().await;
let mut failure_count = 0;
loop {
failure_count += 1;
if let (_, Some((scope, level))) = version_1.query(b"4")? {
compactor
.major_compaction(level, scope, vec![], true)
.await?;
break;
}
if failure_count >= 200 {
panic!("time out!");
}
}
let version_2 = version_status.current().await;
let level_slice = &version_2.level_slice;
println!("SeekCompaction Test: {version_2}");
assert!(!level_slice[1].is_empty());
assert!(!level_slice[2].is_empty());
let final_scope_1 = &level_slice[2][0];
let final_scope_2 = &level_slice[2][1];
assert_eq!(final_scope_1.start, Bytes::from_static(b"1"));
assert_eq!(final_scope_1.end, Bytes::from_static(b"2"));
assert_eq!(final_scope_2.start, Bytes::from_static(b"3"));
assert_eq!(final_scope_2.end, Bytes::from_static(b"6"));
assert_eq!(
final_scope_1.allowed_seeks.clone().unwrap().load(Relaxed),
0
);
assert_eq!(
final_scope_2.allowed_seeks.clone().unwrap().load(Relaxed),
0
);
Ok(())
})
}
}