use crate::kernel::io::{IoFactory, IoReader, IoType};
use crate::kernel::lsm::iterator::SeekIter;
use crate::kernel::lsm::mem_table::KeyValue;
use crate::kernel::lsm::storage::Config;
use crate::kernel::lsm::table::ss_table::block::{
Block, BlockBuilder, BlockCache, BlockItem, BlockOptions, BlockType, CompressType, Index,
MetaBlock, Value,
};
use crate::kernel::lsm::table::ss_table::footer::{Footer, TABLE_FOOTER_SIZE};
use crate::kernel::lsm::table::ss_table::iter::SSTableIter;
use crate::kernel::lsm::table::Table;
use crate::kernel::utils::bloom_filter::BloomFilter;
use crate::kernel::KernelResult;
use crate::KernelError;
use bytes::Bytes;
use core::slice::SlicePattern;
use parking_lot::Mutex;
use std::io::SeekFrom;
use std::sync::Arc;
use tracing::info;
pub(crate) mod block;
pub(crate) mod block_iter;
mod footer;
pub(crate) mod iter;
pub(crate) struct SSTable {
footer: Footer,
reader: Mutex<Box<dyn IoReader>>,
gen: i64,
meta: MetaBlock,
cache: Arc<BlockCache>,
}
impl SSTable {
pub(crate) async fn new(
io_factory: &IoFactory,
config: &Config,
cache: Arc<BlockCache>,
gen: i64,
vec_data: Vec<KeyValue>,
level: usize,
io_type: IoType,
) -> KernelResult<SSTable> {
let len = vec_data.len();
let data_restart_interval = config.data_restart_interval;
let index_restart_interval = config.index_restart_interval;
let mut filter = BloomFilter::new(len, config.desired_error_prob);
let mut builder = BlockBuilder::new(
BlockOptions::from(config)
.compress_type(CompressType::LZ4)
.data_restart_interval(data_restart_interval)
.index_restart_interval(index_restart_interval),
);
for data in vec_data {
let (key, value) = data;
filter.insert(key.as_slice());
builder.add((key, Value::from(value)));
}
let meta = MetaBlock {
filter,
len,
index_restart_interval,
data_restart_interval,
};
let (mut bytes, data_bytes_len, index_bytes_len) = builder.build().await?;
meta.to_raw(&mut bytes)?;
let footer = Footer {
level: level as u8,
index_offset: data_bytes_len as u32,
index_len: index_bytes_len as u32,
meta_offset: (data_bytes_len + index_bytes_len) as u32,
meta_len: (bytes.len() - data_bytes_len + index_bytes_len) as u32,
size_of_disk: (bytes.len() + TABLE_FOOTER_SIZE) as u32,
};
footer.to_raw(&mut bytes)?;
let mut writer = io_factory.writer(gen, io_type)?;
writer.write_all(&bytes)?;
writer.flush()?;
info!("[SsTable: {}][create][MetaBlock]: {:?}", gen, meta);
let reader = Mutex::new(io_factory.reader(gen, io_type)?);
Ok(SSTable {
footer,
reader,
gen,
meta,
cache,
})
}
pub(crate) fn load_from_file(
mut reader: Box<dyn IoReader>,
cache: Arc<BlockCache>,
) -> KernelResult<Self> {
let gen = reader.get_gen();
let footer = Footer::read_to_file(reader.as_mut())?;
let Footer {
size_of_disk,
meta_offset,
meta_len,
..
} = &footer;
info!(
"[SsTable: {gen}][load_from_file][MetaBlock]: {footer:?}, Size of Disk: {}, IO Type: {:?}",
size_of_disk ,
reader.get_type()
);
let mut buf = vec![0; *meta_len as usize];
let _ = reader.seek(SeekFrom::Start(*meta_offset as u64))?;
let _ = reader.read(&mut buf)?;
let meta = MetaBlock::from_raw(&buf);
let reader = Mutex::new(reader);
Ok(SSTable {
footer,
gen,
reader,
meta,
cache,
})
}
pub(crate) fn data_block(&self, index: Index) -> KernelResult<BlockType> {
Ok(BlockType::Data(Self::loading_block(
self.reader.lock().as_mut(),
index.offset(),
index.len(),
CompressType::LZ4,
self.meta.data_restart_interval,
)?))
}
pub(crate) fn index_block(&self) -> KernelResult<&Block<Index>> {
self.cache
.get_or_insert((self.gen(), None), |_| {
let Footer {
index_offset,
index_len,
..
} = self.footer;
Ok(BlockType::Index(Self::loading_block(
self.reader.lock().as_mut(),
index_offset,
index_len as usize,
CompressType::None,
self.meta.index_restart_interval,
)?))
})
.map(|block_type| match block_type {
BlockType::Index(data_block) => Some(data_block),
_ => None,
})?
.ok_or(KernelError::DataEmpty)
}
fn loading_block<T>(
reader: &mut dyn IoReader,
offset: u32,
len: usize,
compress_type: CompressType,
restart_interval: usize,
) -> KernelResult<Block<T>>
where
T: BlockItem,
{
let mut buf = vec![0; len];
let _ = reader.seek(SeekFrom::Start(offset as u64))?;
reader.read_exact(&mut buf)?;
Block::decode(buf, compress_type, restart_interval)
}
}
impl Table for SSTable {
fn query(&self, key: &[u8]) -> KernelResult<Option<KeyValue>> {
if self.meta.filter.contains(key) {
let index_block = self.index_block()?;
if let BlockType::Data(data_block) = self.cache.get_or_insert(
(self.gen(), Some(index_block.find_with_upper(key))),
|(_, index)| {
let index = (*index).ok_or_else(|| KernelError::DataEmpty)?;
Self::data_block(self, index)
},
)? {
if let (value, true) = data_block.find(key) {
return Ok(Some((Bytes::copy_from_slice(key), value)));
}
}
}
Ok(None)
}
fn len(&self) -> usize {
self.meta.len
}
fn size_of_disk(&self) -> u64 {
self.footer.size_of_disk as u64
}
fn gen(&self) -> i64 {
self.gen
}
fn level(&self) -> usize {
self.footer.level as usize
}
fn iter<'a>(
&'a self,
) -> KernelResult<Box<dyn SeekIter<'a, Item = KeyValue> + 'a + Send + Sync>> {
Ok(SSTableIter::new(self).map(Box::new)?)
}
}
#[cfg(test)]
mod tests {
use crate::kernel::io::{FileExtension, IoFactory, IoType};
use crate::kernel::lsm::log::LogLoader;
use crate::kernel::lsm::mem_table::DEFAULT_WAL_PATH;
use crate::kernel::lsm::storage::Config;
use crate::kernel::lsm::table::loader::TableLoader;
use crate::kernel::lsm::table::ss_table::SSTable;
use crate::kernel::lsm::table::{Table, TableType};
use crate::kernel::lsm::version::DEFAULT_SS_TABLE_PATH;
use crate::kernel::utils::lru_cache::ShardingLruCache;
use crate::kernel::KernelResult;
use bincode::Options;
use bytes::Bytes;
use std::collections::hash_map::RandomState;
use std::sync::Arc;
use tempfile::TempDir;
#[tokio::test]
async fn test_ss_table() -> KernelResult<()> {
let temp_dir = TempDir::new().expect("unable to create temporary working directory");
let value = Bytes::copy_from_slice(
b"If you shed tears when you miss the sun, you also miss the stars.",
);
let config = Config::new(temp_dir.into_path());
let sst_factory = Arc::new(IoFactory::new(
config.dir_path.join(DEFAULT_SS_TABLE_PATH),
FileExtension::SSTable,
)?);
let (log_loader, _) = LogLoader::reload(
config.path(),
(DEFAULT_WAL_PATH, Some(1)),
IoType::Buf,
&mut vec![0],
|_, _| Ok(()),
)?;
let sst_loader = TableLoader::new(config.clone(), sst_factory.clone(), log_loader)?;
let mut vec_data = Vec::new();
let times = 2333;
for i in 0..times {
vec_data.push((
Bytes::from(bincode::options().with_big_endian().serialize(&i)?),
Some(value.clone()),
));
}
let _ = sst_loader
.create(1, vec_data.clone(), 1, TableType::SortedString)
.await?;
assert!(sst_loader.is_table_file_exist(1)?);
let ss_table = sst_loader.get(1).unwrap();
for kv in vec_data.iter().take(times) {
assert_eq!(ss_table.query(&kv.0)?.unwrap().1, Some(value.clone()))
}
let cache = ShardingLruCache::new(config.table_cache_size, 16, RandomState::default())?;
let ss_table =
SSTable::load_from_file(sst_factory.reader(1, IoType::Direct)?, Arc::new(cache))?;
for kv in vec_data.iter().take(times) {
assert_eq!(ss_table.query(&kv.0)?.unwrap().1, Some(value.clone()))
}
Ok(())
}
}