use crate::kernel::lsm::storage::Config;
use crate::kernel::utils::bloom_filter::BloomFilter;
use crate::kernel::utils::lru_cache::ShardingLruCache;
use crate::kernel::KernelResult;
use crate::KernelError;
use bytes::{Buf, BufMut, Bytes};
use integer_encoding::{FixedInt, FixedIntWriter, VarIntReader, VarIntWriter};
use itertools::Itertools;
use lz4::Decoder;
use std::cmp::min;
use std::io::{Cursor, Read, Write};
use std::mem;
#[allow(dead_code)]
pub(crate) type BlockCache = ShardingLruCache<(i64, Option<Index>), BlockType>;
pub(crate) const DEFAULT_BLOCK_SIZE: usize = 4 * 1024;
pub(crate) const DEFAULT_DATA_RESTART_INTERVAL: usize = 16;
pub(crate) const DEFAULT_INDEX_RESTART_INTERVAL: usize = 2;
const CRC_SIZE: usize = 4;
pub(crate) type KeyValue<T> = (Bytes, T);
pub(crate) enum BlockType {
Data(Block<Value>),
Index(Block<Index>),
}
#[derive(Debug, PartialEq, Eq, Clone)]
pub(crate) struct Entry<T> {
unshared_len: usize,
shared_len: usize,
pub(crate) key: Bytes,
pub(crate) item: T,
}
impl<T> Entry<T>
where
T: BlockItem,
{
pub(crate) fn new(shared_len: usize, unshared_len: usize, key: Bytes, item: T) -> Self {
Entry {
unshared_len,
shared_len,
key,
item,
}
}
pub(crate) fn encode(&self, bytes: &mut Vec<u8>) -> KernelResult<()> {
bytes.write_varint(self.unshared_len as u32)?;
bytes.write_varint(self.shared_len as u32)?;
bytes.write_all(&self.key)?;
self.item.encode(bytes)?;
Ok(())
}
pub(crate) fn batch_decode(cursor: &mut Cursor<Vec<u8>>) -> KernelResult<Vec<(usize, Self)>> {
let mut vec_entry = Vec::new();
let mut index = 0;
while !cursor.is_empty() {
vec_entry.push((index, Self::decode(cursor)?));
index += 1;
}
Ok(vec_entry)
}
pub(crate) fn decode<R: Read>(reader: &mut R) -> KernelResult<Entry<T>> {
let unshared_len = reader.read_varint::<u32>()? as usize;
let shared_len = reader.read_varint::<u32>()? as usize;
let mut bytes = vec![0u8; unshared_len];
let _ = reader.read(&mut bytes)?;
Ok(Self {
unshared_len,
shared_len,
key: Bytes::from(bytes),
item: T::decode(reader)?,
})
}
}
#[derive(Debug, PartialEq, Eq, Clone)]
pub(crate) struct Value {
value_len: usize,
pub(crate) bytes: Option<Bytes>,
}
impl From<Option<Bytes>> for Value {
fn from(bytes: Option<Bytes>) -> Self {
let value_len = bytes.as_ref().map_or(0, Bytes::len);
Value { value_len, bytes }
}
}
#[derive(Debug, PartialEq, Eq, Copy, Clone, Hash)]
pub(crate) struct Index {
offset: u32,
len: usize,
}
impl Index {
fn new(offset: u32, len: usize) -> Self {
Index { offset, len }
}
pub(crate) fn offset(&self) -> u32 {
self.offset
}
pub(crate) fn len(&self) -> usize {
self.len
}
}
pub(crate) trait BlockItem: Sized + Clone {
fn decode<T>(reader: &mut T) -> KernelResult<Self>
where
T: Read + ?Sized;
fn encode(&self, bytes: &mut Vec<u8>) -> KernelResult<()>;
}
impl BlockItem for Value {
fn decode<T>(mut reader: &mut T) -> KernelResult<Self>
where
T: Read + ?Sized,
{
let value_len = reader.read_varint::<u32>()? as usize;
let bytes = (value_len > 0)
.then(|| {
let mut value = vec![0u8; value_len];
reader.read(&mut value).ok().map(|_| Bytes::from(value))
})
.flatten();
Ok(Value { value_len, bytes })
}
fn encode(&self, bytes: &mut Vec<u8>) -> KernelResult<()> {
bytes.write_varint(self.value_len as u32)?;
if let Some(value) = &self.bytes {
bytes.write_all(value)?;
}
Ok(())
}
}
impl BlockItem for Index {
fn decode<T>(mut reader: &mut T) -> KernelResult<Self>
where
T: Read + ?Sized,
{
let offset = reader.read_varint::<u32>()?;
let len = reader.read_varint::<u32>()? as usize;
Ok(Index { offset, len })
}
fn encode(&self, bytes: &mut Vec<u8>) -> KernelResult<()> {
bytes.write_varint(self.offset)?;
bytes.write_varint(self.len as u32)?;
Ok(())
}
}
#[derive(Clone, Copy)]
pub(crate) enum CompressType {
None,
LZ4,
}
#[derive(Debug)]
pub(crate) struct MetaBlock {
pub(crate) filter: BloomFilter<[u8]>,
pub(crate) len: usize,
pub(crate) index_restart_interval: usize,
pub(crate) data_restart_interval: usize,
}
impl MetaBlock {
pub(crate) fn to_raw(&self, bytes: &mut Vec<u8>) -> KernelResult<()> {
bytes.write_fixedint(self.len as u32)?;
bytes.write_fixedint(self.index_restart_interval as u32)?;
bytes.write_fixedint(self.data_restart_interval as u32)?;
self.filter.to_raw(bytes)?;
Ok(())
}
pub(crate) fn from_raw(bytes: &[u8]) -> Self {
let len = u32::decode_fixed(&bytes[0..4]) as usize;
let index_restart_interval = u32::decode_fixed(&bytes[4..8]) as usize;
let data_restart_interval = u32::decode_fixed(&bytes[8..12]) as usize;
let filter = BloomFilter::from_raw(&bytes[12..]);
Self {
filter,
len,
index_restart_interval,
data_restart_interval,
}
}
}
#[derive(Debug, PartialEq, Eq, Clone)]
pub(crate) struct Block<T> {
restart_interval: usize,
vec_entry: Vec<(usize, Entry<T>)>,
}
#[derive(Clone)]
pub(crate) struct BlockOptions {
block_size: usize,
compress_type: CompressType,
data_restart_interval: usize,
index_restart_interval: usize,
}
impl From<&Config> for BlockOptions {
fn from(config: &Config) -> Self {
BlockOptions {
block_size: config.block_size,
compress_type: CompressType::None,
data_restart_interval: config.data_restart_interval,
index_restart_interval: config.index_restart_interval,
}
}
}
impl BlockOptions {
#[allow(dead_code)]
pub(crate) fn new() -> Self {
BlockOptions {
block_size: DEFAULT_BLOCK_SIZE,
compress_type: CompressType::None,
data_restart_interval: DEFAULT_DATA_RESTART_INTERVAL,
index_restart_interval: DEFAULT_INDEX_RESTART_INTERVAL,
}
}
#[allow(dead_code)]
pub(crate) fn block_size(mut self, block_size: usize) -> Self {
self.block_size = block_size;
self
}
#[allow(dead_code)]
pub(crate) fn compress_type(mut self, compress_type: CompressType) -> Self {
self.compress_type = compress_type;
self
}
#[allow(dead_code)]
pub(crate) fn data_restart_interval(mut self, data_restart_interval: usize) -> Self {
self.data_restart_interval = data_restart_interval;
self
}
#[allow(dead_code)]
pub(crate) fn index_restart_interval(mut self, index_restart_interval: usize) -> Self {
self.index_restart_interval = index_restart_interval;
self
}
}
struct BlockBuf {
bytes_size: usize,
vec_key_value: Vec<KeyValue<Value>>,
}
impl BlockBuf {
fn new() -> Self {
BlockBuf {
bytes_size: 0,
vec_key_value: Vec::new(),
}
}
fn add(&mut self, key_value: KeyValue<Value>) {
if let Some(last_key) = self.last_key() {
assert!(last_key.cmp(&key_value.0).is_lt());
}
self.bytes_size += key_value_bytes_len(&key_value);
self.vec_key_value.push(key_value);
}
fn last_key(&self) -> Option<&Bytes> {
self.vec_key_value.last().map(|key_value| &key_value.0)
}
fn flush(&mut self) -> (Vec<KeyValue<Value>>, Option<Bytes>) {
self.bytes_size = 0;
let last_key = self.last_key().cloned();
(mem::take(&mut self.vec_key_value), last_key)
}
}
pub(crate) struct BlockBuilder {
options: BlockOptions,
len: usize,
buf: BlockBuf,
vec_block: Vec<(Block<Value>, Bytes)>,
}
fn key_value_bytes_len(key_value: &KeyValue<Value>) -> usize {
let (key, value) = key_value;
key.len() + value.bytes.as_ref().map_or(0, Bytes::len)
}
impl BlockBuilder {
pub(crate) fn new(options: BlockOptions) -> Self {
BlockBuilder {
options,
len: 0,
buf: BlockBuf::new(),
vec_block: Vec::new(),
}
}
#[allow(dead_code)]
pub(crate) fn len(&self) -> usize {
self.len
}
pub(crate) fn add(&mut self, key_value: KeyValue<Value>) {
self.buf.add(key_value);
self.len += 1;
if self.is_out_of_byte() {
self._build();
}
}
fn is_out_of_byte(&self) -> bool {
self.buf.bytes_size >= self.options.block_size
}
fn _build(&mut self) {
if let (vec_kv, Some(last_key)) = self.buf.flush() {
self.vec_block.push((
Block::new(vec_kv, self.options.data_restart_interval),
last_key,
));
}
}
pub(crate) async fn build(mut self) -> KernelResult<(Vec<u8>, usize, usize)> {
self._build();
let mut blocks_bytes = vec![];
let mut offset = 0u32;
let mut indexes = Vec::with_capacity(self.vec_block.len());
for (block, last_key) in self.vec_block {
block.encode(self.options.compress_type, &mut blocks_bytes)?;
let len = blocks_bytes.len() - offset as usize;
indexes.push((last_key, Index::new(offset, len)));
offset += len as u32;
}
let data_bytes_len = blocks_bytes.len();
Block::new(indexes, self.options.index_restart_interval)
.encode(CompressType::None, &mut blocks_bytes)?;
let index_bytes_len = blocks_bytes.len() - data_bytes_len;
Ok((blocks_bytes, data_bytes_len, index_bytes_len))
}
}
impl Block<Value> {
pub(crate) fn find(&self, key: &[u8]) -> (Option<Bytes>, bool) {
self.binary_search(key)
.ok()
.and_then(|index| {
self.vec_entry
.get(index)
.map(|(_, entry)| (entry.item.bytes.clone(), true))
})
.unwrap_or((None, false))
}
}
impl<T> Block<T> {
#[allow(dead_code)]
pub(crate) fn entry_len(&self) -> usize {
self.vec_entry.len()
}
pub(crate) fn shared_key_prefix(&self, index: usize, shared_len: usize) -> &[u8] {
&self.vec_entry[index - index % self.restart_interval].1.key[0..shared_len]
}
pub(crate) fn restart_interval(&self) -> usize {
self.restart_interval
}
pub(crate) fn get_entry(&self, index: usize) -> &Entry<T> {
&self.vec_entry[index].1
}
pub(crate) fn restart_shared_len(&self, index: usize) -> usize {
if index % self.restart_interval != 0 {
self.get_entry(index).shared_len
} else {
self.vec_entry
.get(index + 1)
.map_or(0, |(_, entry)| entry.shared_len)
}
}
}
impl<T> Block<T>
where
T: BlockItem,
{
pub(crate) fn new(vec_kv: Vec<KeyValue<T>>, restart_interval: usize) -> Block<T> {
let vec_sharding_len = sharding_shared_len(&vec_kv, restart_interval);
let vec_entry = vec_kv
.into_iter()
.enumerate()
.map(|(index, (key, item))| {
let shared_len = if index % restart_interval == 0 {
0
} else {
vec_sharding_len[index / restart_interval]
};
(
index,
Entry::new(
shared_len,
key.len() - shared_len,
Bytes::copy_from_slice(&key[shared_len..]),
item,
),
)
})
.collect_vec();
Block {
restart_interval,
vec_entry,
}
}
pub(crate) fn find_with_upper(&self, key: &[u8]) -> T {
let entries_len = self.vec_entry.len();
let index = self
.binary_search(key)
.unwrap_or_else(|index| min(entries_len - 1, index));
self.vec_entry[index].1.item.clone()
}
pub(crate) fn binary_search(&self, key: &[u8]) -> Result<usize, usize> {
self.vec_entry.binary_search_by(|(index, entry)| {
if entry.shared_len > 0 {
let shared_len = min(entry.shared_len, key.len());
key[0..shared_len]
.cmp(self.shared_key_prefix(*index, shared_len))
.then_with(|| key[shared_len..].cmp(&entry.key))
} else {
key.cmp(&entry.key)
}
.reverse()
})
}
pub(crate) fn encode(
&self,
compress_type: CompressType,
bytes: &mut Vec<u8>,
) -> KernelResult<()> {
match compress_type {
CompressType::None => self.to_raw(bytes)?,
CompressType::LZ4 => {
let mut buf = Vec::new();
self.to_raw(&mut buf)?;
let mut encoder = lz4::EncoderBuilder::new().level(4).build(bytes.writer())?;
let _ = encoder.write(&buf[..])?;
let (_, result) = encoder.finish();
result?;
}
}
Ok(())
}
pub(crate) fn decode(
buf: Vec<u8>,
compress_type: CompressType,
restart_interval: usize,
) -> KernelResult<Self> {
let buf = match compress_type {
CompressType::None => buf,
CompressType::LZ4 => {
let mut decoder = Decoder::new(buf.reader())?;
let mut decoded = Vec::with_capacity(DEFAULT_BLOCK_SIZE);
let _ = decoder.read_to_end(&mut decoded)?;
decoded
}
};
Self::from_raw(buf, restart_interval)
}
pub(crate) fn from_raw(mut buf: Vec<u8>, restart_interval: usize) -> KernelResult<Self> {
assert!(!buf.is_empty());
let date_bytes_len = buf.len() - CRC_SIZE;
if crc32fast::hash(&buf) == u32::decode_fixed(&buf[date_bytes_len..]) {
return Err(KernelError::CrcMisMatch);
}
buf.truncate(date_bytes_len);
let mut cursor = Cursor::new(buf);
let vec_entry = Entry::<T>::batch_decode(&mut cursor)?;
Ok(Self {
restart_interval,
vec_entry,
})
}
pub(crate) fn to_raw(&self, bytes: &mut Vec<u8>) -> KernelResult<()> {
let start = bytes.len();
for (_, entry) in &self.vec_entry {
entry.encode(bytes)?;
}
bytes.append(&mut crc32fast::hash(&bytes[start..]).encode_fixed_vec());
Ok(())
}
}
fn sharding_shared_len<T>(vec_kv: &[KeyValue<T>], restart_interval: usize) -> Vec<usize>
where
T: BlockItem,
{
let mut vec_shared_key =
Vec::with_capacity((vec_kv.len() + restart_interval - 1) / restart_interval);
for (_, group) in &vec_kv
.iter()
.enumerate()
.group_by(|(i, _)| i / restart_interval)
{
vec_shared_key.push(longest_shared_len(
group.map(|(_, item)| item).collect_vec(),
))
}
vec_shared_key
}
fn longest_shared_len<T>(sharding: Vec<&KeyValue<T>>) -> usize {
if sharding.is_empty() {
return 0;
}
let mut min_len = usize::MAX;
for kv in &sharding {
min_len = min(min_len, kv.0.len());
}
let mut low = 0;
let mut high = min_len;
while low < high {
let mid = (high - low + 1) / 2 + low;
if is_common_prefix(&sharding, mid) {
low = mid;
} else {
high = mid - 1;
}
}
return low;
fn is_common_prefix<T>(sharding: &[&KeyValue<T>], len: usize) -> bool {
let first = sharding[0];
for kv in sharding.iter().skip(1) {
for i in 0..len {
if first.0[i] != kv.0[i] {
return false;
}
}
}
true
}
}
#[cfg(test)]
mod tests {
use crate::kernel::lsm::table::ss_table::block::{
Block, BlockBuilder, BlockOptions, CompressType, Entry, Index, Value,
};
use crate::kernel::utils::lru_cache::LruCache;
use crate::kernel::KernelResult;
use bincode::Options;
use bytes::Bytes;
use std::io::Cursor;
#[test]
fn test_entry_serialization() -> KernelResult<()> {
let entry1 = Entry::new(
0,
1,
Bytes::from(vec![b'1']),
Value::from(Some(Bytes::from(vec![b'1']))),
);
let entry2 = Entry::new(
0,
1,
Bytes::from(vec![b'1']),
Value::from(Some(Bytes::from(vec![b'1']))),
);
let mut bytes = Vec::new();
entry1.encode(&mut bytes)?;
entry2.encode(&mut bytes)?;
let vec_entry = Entry::batch_decode(&mut Cursor::new(bytes))?;
assert_eq!(vec![(0, entry1), (1, entry2)], vec_entry);
Ok(())
}
#[tokio::test]
async fn test_block() -> KernelResult<()> {
let value = Bytes::from_static(b"Let life be beautiful like summer flowers");
let mut vec_data = Vec::new();
let times = 2333;
let options = BlockOptions::new();
let mut builder = BlockBuilder::new(options.clone());
for i in 0..times {
let mut key = b"KipDB-".to_vec();
key.append(&mut bincode::options().with_big_endian().serialize(&i)?);
vec_data.push((Bytes::from(key), Some(value.clone())));
}
for data in vec_data.iter().cloned() {
let (key, value) = data;
builder.add((key, Value::from(value)));
}
let block = builder.vec_block[0].0.clone();
let (full_bytes, data_len, _) = builder.build().await?;
let index_block = Block::<Index>::decode(
full_bytes[data_len..].to_vec(),
CompressType::None,
options.index_restart_interval,
)?;
let mut cache = LruCache::new(5)?;
for kv in vec_data.iter().take(times) {
let key = &kv.0;
let data_block = cache.get_or_insert(index_block.find_with_upper(key), |index| {
let &Index { offset, len } = index;
let target_block = Block::<Value>::decode(
full_bytes[offset as usize..offset as usize + len].to_vec(),
options.compress_type,
options.data_restart_interval,
)?;
Ok(target_block)
})?;
assert_eq!(data_block.find(key), (Some(value.clone()), true))
}
test_block_serialization_(
block.clone(),
CompressType::None,
options.data_restart_interval,
)?;
test_block_serialization_(
block.clone(),
CompressType::LZ4,
options.data_restart_interval,
)?;
Ok(())
}
fn test_block_serialization_(
block: Block<Value>,
compress_type: CompressType,
restart_interval: usize,
) -> KernelResult<()> {
let mut bytes = Vec::new();
block.encode(compress_type, &mut bytes)?;
let de_block = Block::decode(bytes, compress_type, restart_interval)?;
assert_eq!(block, de_block);
Ok(())
}
}