use crate::kernel::io::{IoFactory, IoType};
use crate::kernel::lsm::compactor::LEVEL_0;
use crate::kernel::lsm::log::LogLoader;
use crate::kernel::lsm::mem_table::KeyValue;
use crate::kernel::lsm::storage::Config;
use crate::kernel::lsm::table::btree_table::BTreeTable;
use crate::kernel::lsm::table::meta::TableMeta;
use crate::kernel::lsm::table::scope::Scope;
use crate::kernel::lsm::table::ss_table::block::{BlockCache, Entry, Value};
use crate::kernel::lsm::table::ss_table::SSTable;
use crate::kernel::lsm::table::{BoxTable, Table, TableType};
use crate::kernel::utils::lru_cache::ShardingLruCache;
use crate::kernel::KernelResult;
use bytes::Bytes;
use std::collections::hash_map::RandomState;
use std::io::Cursor;
use std::mem;
use std::sync::Arc;
use tracing::warn;
#[derive(Clone)]
pub(crate) struct TableLoader {
inner: Arc<ShardingLruCache<i64, BoxTable>>,
factory: Arc<IoFactory>,
config: Config,
wal: LogLoader,
cache: Arc<BlockCache>,
}
impl TableLoader {
pub(crate) fn new(
config: Config,
factory: Arc<IoFactory>,
wal: LogLoader,
) -> KernelResult<Self> {
let inner = Arc::new(ShardingLruCache::new(
config.table_cache_size,
16,
RandomState::default(),
)?);
let cache = Arc::new(ShardingLruCache::new(
config.block_cache_size,
16,
RandomState::default(),
)?);
Ok(TableLoader {
inner,
factory,
config,
wal,
cache,
})
}
#[allow(clippy::match_single_binding)]
pub(crate) async fn create(
&self,
gen: i64,
vec_data: Vec<KeyValue>,
level: usize,
table_type: TableType,
) -> KernelResult<(Scope, TableMeta)> {
let scope = Scope::from_sorted_vec_data(gen, &vec_data)?;
let table: Box<dyn Table> = match table_type {
TableType::SortedString => Box::new(self.create_ss_table(gen, vec_data, level).await?),
TableType::BTree => Box::new(BTreeTable::new(level, gen, vec_data)),
};
let table_meta = TableMeta::from(table.as_ref());
let _ = self.inner.put(gen, table);
Ok((scope, table_meta))
}
pub(crate) fn get(&self, gen: i64) -> Option<&dyn Table> {
self.inner
.get_or_insert(gen, |gen| {
let table_factory = &self.factory;
let table: Box<dyn Table> = match table_factory
.reader(*gen, IoType::Direct)
.and_then(|reader| SSTable::load_from_file(reader, Arc::clone(&self.cache)))
{
Ok(ss_table) => Box::new(ss_table),
Err(err) => {
warn!(
"[LSMStore][Load Table: {}][try to reload with wal]: {:?}",
gen, err
);
let mut reload_data = Vec::new();
self.wal.load(*gen, &mut reload_data, |bytes, records| {
for (_, Entry { key, item, .. }) in
Entry::<Value>::batch_decode(&mut Cursor::new(mem::take(bytes)))?
{
records.push((key, item.bytes));
}
Ok(())
})?;
Box::new(BTreeTable::new(LEVEL_0, *gen, reload_data))
}
};
Ok(table)
})
.map(Box::as_ref)
.ok()
}
async fn create_ss_table(
&self,
gen: i64,
reload_data: Vec<(Bytes, Option<Bytes>)>,
level: usize,
) -> KernelResult<SSTable> {
SSTable::new(
&self.factory,
&self.config,
Arc::clone(&self.cache),
gen,
reload_data,
level,
IoType::Direct,
)
.await
}
pub(crate) fn remove(&self, gen: &i64) -> Option<BoxTable> {
self.inner.remove(gen)
}
#[allow(dead_code)]
pub(crate) fn is_emtpy(&self) -> bool {
self.inner.is_empty()
}
pub(crate) fn clean(&self, gen: i64) -> KernelResult<()> {
let _ = self.remove(&gen);
self.factory.clean(gen)?;
self.wal.clean(gen)?;
Ok(())
}
#[allow(dead_code)]
pub(crate) fn is_table_file_exist(&self, gen: i64) -> KernelResult<bool> {
self.factory.exists(gen)
}
}
#[cfg(test)]
mod tests {
use crate::kernel::io::{FileExtension, IoFactory, IoType};
use crate::kernel::lsm::log::LogLoader;
use crate::kernel::lsm::mem_table::{data_to_bytes, DEFAULT_WAL_PATH};
use crate::kernel::lsm::storage::Config;
use crate::kernel::lsm::table::loader::{TableLoader, TableType};
use crate::kernel::lsm::version::DEFAULT_SS_TABLE_PATH;
use crate::kernel::KernelResult;
use bincode::Options;
use bytes::Bytes;
use std::sync::Arc;
use tempfile::TempDir;
#[tokio::test]
async fn test_ss_table_loader() -> KernelResult<()> {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let value = Bytes::copy_from_slice(
b"If you shed tears when you miss the sun, you also miss the stars.",
);
let config = Config::new(temp_dir.into_path());
let sst_factory = Arc::new(IoFactory::new(
config.dir_path.join(DEFAULT_SS_TABLE_PATH),
FileExtension::SSTable,
)?);
let mut vec_data = Vec::new();
let times = 2333;
let (log_loader, _) = LogLoader::reload(
config.path(),
(DEFAULT_WAL_PATH, Some(1)),
IoType::Buf,
&mut vec![0],
|_, _| Ok(()),
)?;
let mut log_writer = log_loader.writer(1)?;
for i in 0..times {
let key_value = (
Bytes::from(bincode::options().with_big_endian().serialize(&i)?),
Some(value.clone()),
);
let _ = log_writer.add_record(&data_to_bytes(key_value.clone())?)?;
vec_data.push(key_value);
}
let repeat_data = (vec_data[0].0.clone(), None);
let _ = log_writer.add_record(&data_to_bytes(repeat_data.clone())?)?;
vec_data[0] = repeat_data.clone();
log_writer.flush()?;
let sst_loader = TableLoader::new(config, sst_factory.clone(), log_loader.clone())?;
let _ = sst_loader
.create(1, vec_data.clone(), 0, TableType::SortedString)
.await?;
assert!(sst_loader.remove(&1).is_some());
assert!(sst_loader.is_emtpy());
let ss_table_loaded = sst_loader.get(1).unwrap();
assert_eq!(
ss_table_loaded.query(&repeat_data.0)?,
Some(repeat_data.clone())
);
for kv in vec_data.iter().take(times).skip(1) {
assert_eq!(
ss_table_loaded.query(&kv.0)?.unwrap().1,
Some(value.clone())
)
}
assert!(sst_loader.remove(&1).is_some());
assert!(sst_loader.is_emtpy());
clean_sst(1, &sst_loader).unwrap();
assert!(!sst_factory.exists(1).unwrap());
let ss_table_backup = sst_loader.get(1).unwrap();
assert_eq!(
ss_table_backup.query(&repeat_data.0)?,
Some(repeat_data.clone())
);
for kv in vec_data.iter().take(times).skip(1) {
assert_eq!(
ss_table_backup.query(&kv.0)?.unwrap().1,
Some(value.clone())
)
}
Ok(())
}
fn clean_sst(gen: i64, loader: &TableLoader) -> KernelResult<()> {
loader.factory.clean(gen)?;
Ok(())
}
}