use crate::{
block::Block,
bucket::InsertableToBucket,
consts::{
DATA_FILE_NAME, INDEX_FILE_NAME, SIZE_OF_U32, SIZE_OF_U64, SIZE_OF_U8, SIZE_OF_USIZE,
SUMMARY_FILE_NAME,
},
err::Error,
filter::BloomFilter,
fs::{DataFileNode, DataFs, FileAsync, FileNode, IndexFileNode, IndexFs, SummaryFileNode, SummaryFs},
index::{Index, IndexFile, RangeOffset},
key_range::{BiggestKey, SmallestKey},
memtable::{Entry, SkipMapValue},
types::{ByteSerializedEntry, CreatedAt, IsTombStone, Key, SkipMapEntries, ValOffset},
util,
};
use chrono::Utc;
use crossbeam_skiplist::SkipMap;
use std::{
path::{Path, PathBuf},
sync::Arc,
time::SystemTime,
};
use Error::*;
#[derive(Debug, Clone)]
pub struct DataFile<F: DataFs> {
pub(crate) file: F,
pub(crate) path: PathBuf,
}
impl<F: DataFs> DataFile<F> {
pub fn new<P: AsRef<Path> + Send + Sync>(path: P, file: F) -> Self {
Self {
path: path.as_ref().to_path_buf(),
file,
}
}
}
#[derive(Debug, Clone)]
pub struct Table {
pub(crate) dir: PathBuf,
pub(crate) hotness: u64,
pub(crate) size: usize,
pub(crate) created_at: CreatedAt,
pub(crate) data_file: DataFile<DataFileNode>,
pub(crate) index_file: IndexFile<IndexFileNode>,
pub(crate) entries: SkipMapEntries<Key>,
pub(crate) filter: Option<BloomFilter>,
pub(crate) summary: Option<Summary>,
}
impl InsertableToBucket for Table {
fn get_entries(&self) -> SkipMapEntries<Key> {
self.entries.clone()
}
fn size(&self) -> usize {
self.size
}
fn get_filter(&self) -> BloomFilter {
self.filter.as_ref().unwrap().to_owned()
}
}
impl Table {
pub async fn new<P: AsRef<Path> + Send + Sync>(dir: P) -> Result<Table, Error> {
let (data_file_path, index_file_path, created_at) = Table::generate_file_path(dir.as_ref()).await?;
let data_file = DataFileNode::new(data_file_path.to_owned(), crate::fs::FileType::Data)
.await
.unwrap();
let index_file = IndexFileNode::new(index_file_path.to_owned(), crate::fs::FileType::Index)
.await
.unwrap();
Ok(Self {
dir: dir.as_ref().to_path_buf(),
hotness: Default::default(),
index_file: IndexFile::new(index_file_path, index_file),
data_file: DataFile::new(data_file_path, data_file),
created_at,
entries: Arc::new(SkipMap::new()),
size: Default::default(),
filter: None,
summary: None,
})
}
pub fn increase_hotness(&mut self) {
self.hotness += 1;
}
pub fn get_data_file_path(&self) -> PathBuf {
self.data_file.path.clone()
}
pub fn get_hotness(&self) -> u64 {
self.hotness
}
pub async fn generate_file_path<P: AsRef<Path> + Send + Sync>(
dir: P,
) -> Result<(PathBuf, PathBuf, CreatedAt), Error> {
let created_at = Utc::now();
FileNode::create_dir_all(dir.as_ref()).await?;
let data_file_name = format!("{}.db", DATA_FILE_NAME);
let index_file_name = format!("{}.db", INDEX_FILE_NAME);
let data_file_path = dir.as_ref().join(data_file_name);
let index_file_path = dir.as_ref().join(index_file_name);
Ok((data_file_path, index_file_path, created_at))
}
pub(crate) async fn get<K: AsRef<[u8]>>(
&self,
start_offset: u32,
searched_key: K,
) -> Result<Option<(ValOffset, CreatedAt, IsTombStone)>, Error> {
self.data_file
.file
.find_entry(start_offset, searched_key.as_ref())
.await
}
pub(crate) async fn load_entries_from_file(&mut self) -> Result<(), Error> {
let (entries, bytes_read) = self.data_file.file.load_entries().await?;
self.entries = entries;
self.size = bytes_read;
Ok(())
}
pub(crate) async fn build_from<P: AsRef<Path> + Send + Sync + Clone>(
dir: P,
data_file_path: P,
index_file_path: P,
) -> Table {
let mut table = Table {
dir: dir.as_ref().to_path_buf(),
hotness: 1,
created_at: Utc::now(),
data_file: DataFile {
file: DataFileNode::new(data_file_path.to_owned(), crate::fs::FileType::Data)
.await
.unwrap(),
path: data_file_path.as_ref().to_path_buf(),
},
index_file: IndexFile {
file: IndexFileNode::new(index_file_path.to_owned(), crate::fs::FileType::Index)
.await
.unwrap(),
path: index_file_path.as_ref().to_path_buf(),
},
size: Default::default(),
entries: Arc::new(SkipMap::new()),
filter: None,
summary: None,
};
table.size = table.data_file.file.node.size().await;
let modified_time = table
.data_file
.file
.node
.metadata()
.await
.unwrap()
.modified()
.unwrap();
let epoch = SystemTime::UNIX_EPOCH;
let elapsed_nanos = modified_time.duration_since(epoch).unwrap().as_nanos() as u64;
table.created_at = util::milliseconds_to_datetime(elapsed_nanos / 1_000_000);
table
}
pub(crate) async fn write_to_file(&mut self) -> Result<(), Error> {
if self.filter.is_none() {
return Err(FilterNotProvidedForFlush);
}
if self.entries.is_empty() {
return Err(EntriesCannotBeEmptyDuringFlush);
}
let index_file = &self.index_file;
let mut blocks: Vec<Block> = Vec::new();
let mut index = Index::new(self.index_file.path.clone(), index_file.file.clone());
let mut summary = Summary::new(self.dir.to_owned());
let smallest_entry = self.entries.front();
let biggest_entry = self.entries.back();
summary.smallest_key = smallest_entry.unwrap().key().to_vec();
summary.biggest_key = biggest_entry.unwrap().key().to_vec();
summary.write_to_file().await?;
self.summary = Some(summary);
self.filter.as_mut().unwrap().write(self.dir.to_owned()).await?;
self.filter
.as_mut()
.unwrap()
.set_sstable_path(&self.data_file.path);
let mut current_block = Block::new();
if self.size > 0 {
self.reset_size();
}
for e in self.entries.iter() {
let entry = Entry::new(
e.key(),
e.value().val_offset,
e.value().created_at,
e.value().is_tombstone,
);
let entry_size = entry.key.len() + SIZE_OF_U32 + SIZE_OF_U32 + SIZE_OF_U64 + SIZE_OF_U8;
if current_block.is_full(entry_size) {
blocks.push(current_block);
current_block = Block::new();
}
current_block.set_entry(
entry.key.len() as u32,
entry.key,
entry.val_offset as u32,
entry.created_at,
entry.is_tombstone,
)?;
}
for block in blocks.iter() {
self.write_block(block, &mut index).await?;
}
if !current_block.entries.is_empty() {
self.write_block(¤t_block, &mut index).await?;
}
index.write_to_file().await?;
Ok(())
}
async fn write_block(&mut self, block: &Block, table_index: &mut Index) -> Result<(), Error> {
let offset = self.size;
let last_entry = block.get_last_entry();
table_index.insert(last_entry.key_prefix, last_entry.key, offset as u32);
let bytes_written = block.write_to_file(self.data_file.file.node.clone()).await?;
self.size += bytes_written;
Ok(())
}
#[allow(dead_code)]
pub(crate) async fn range(&self, range_offset: RangeOffset) -> Result<Vec<Entry<Key, usize>>, Error> {
self.data_file.file.load_entries_within_range(range_offset).await
}
pub(crate) fn reset_size(&mut self) {
self.size = 0;
}
pub fn size(&self) -> usize {
self.size
}
pub(crate) fn set_entries(&mut self, entries: Arc<SkipMap<Key, SkipMapValue<ValOffset>>>) {
self.entries = entries;
self.set_sst_size_from_entries();
}
pub(crate) fn set_sst_size_from_entries(&mut self) {
self.size = self
.entries
.iter()
.map(|e| e.key().len() + SIZE_OF_USIZE + SIZE_OF_U64 + SIZE_OF_U8)
.sum::<usize>();
}
}
#[derive(Debug, Clone)]
pub struct Summary {
pub path: PathBuf,
pub smallest_key: SmallestKey,
pub biggest_key: BiggestKey,
}
impl Summary {
pub fn new<P: AsRef<Path> + Send + Sync>(path: P) -> Self {
let file_path = path.as_ref().join(format!("{}.db", SUMMARY_FILE_NAME));
Self {
path: file_path,
biggest_key: vec![],
smallest_key: vec![],
}
}
pub async fn write_to_file(&mut self) -> Result<(), Error> {
let file = SummaryFileNode::new(self.path.to_owned(), crate::fs::FileType::Summary)
.await
.unwrap();
let serialized_data = self.serialize();
file.node.write_all(&serialized_data).await?;
Ok(())
}
pub async fn recover(&mut self) -> Result<(), Error> {
let (smallest_key, biggest_key) = SummaryFileNode::recover(self.path.to_owned()).await?;
self.smallest_key = smallest_key;
self.biggest_key = biggest_key;
Ok(())
}
pub(crate) fn serialize(&self) -> ByteSerializedEntry {
let entry_len = SIZE_OF_U32 + SIZE_OF_U32 + self.biggest_key.len() + self.smallest_key.len();
let mut serialized_data = Vec::with_capacity(entry_len);
serialized_data.extend_from_slice(&(self.smallest_key.len() as u32).to_le_bytes());
serialized_data.extend_from_slice(&(self.biggest_key.len() as u32).to_le_bytes());
serialized_data.extend_from_slice(&self.smallest_key);
serialized_data.extend_from_slice(&self.biggest_key);
serialized_data
}
}