use bytes::{Buf, BufMut, BytesMut};
use crate::error::Error;
use crate::sstable::error::SSTableError;
use crate::sstable::table::TableFormat;
use crate::{CompressionType, InternalKey, Result};
#[derive(Debug, Clone)]
pub(crate) struct Properties {
pub(crate) id: u64,
pub(crate) table_format: TableFormat,
pub(crate) num_entries: u64,
pub(crate) num_deletions: u64,
pub(crate) data_size: u64,
pub(crate) oldest_vlog_file_id: u64,
pub(crate) num_data_blocks: u64,
pub(crate) index_size: u64,
pub(crate) index_partitions: u64,
pub(crate) top_level_index_size: u64,
pub(crate) filter_size: u64,
pub(crate) raw_key_size: u64,
pub(crate) raw_value_size: u64,
pub(crate) created_at: u128,
pub(crate) item_count: u64,
pub(crate) key_count: u64,
pub(crate) tombstone_count: u64,
pub(crate) num_soft_deletes: u64,
pub(crate) num_range_deletions: u64,
pub(crate) block_size: u32,
pub(crate) block_count: u32,
pub(crate) compression: CompressionType,
pub(crate) seqnos: (u64, u64),
pub(crate) oldest_key_time: Option<u64>,
pub(crate) newest_key_time: Option<u64>,
}
impl Properties {
pub(crate) fn new() -> Self {
Properties {
id: 0,
table_format: TableFormat::LSMV1,
num_entries: 0,
num_deletions: 0,
data_size: 0,
oldest_vlog_file_id: 0,
num_data_blocks: 0,
index_size: 0,
index_partitions: 0,
top_level_index_size: 0,
filter_size: 0,
raw_key_size: 0,
raw_value_size: 0,
created_at: 0,
item_count: 0,
key_count: 0,
tombstone_count: 0,
num_soft_deletes: 0,
num_range_deletions: 0,
block_size: 0,
block_count: 0,
compression: CompressionType::None,
seqnos: (0, 0),
oldest_key_time: None,
newest_key_time: None,
}
}
pub(crate) fn encode(&self) -> Vec<u8> {
let mut buf = BytesMut::with_capacity(256);
buf.put_u64(self.id);
buf.put_u8(self.table_format as u8);
buf.put_u64(self.num_entries);
buf.put_u64(self.num_deletions);
buf.put_u64(self.data_size);
buf.put_u64(self.oldest_vlog_file_id);
buf.put_u64(self.num_data_blocks);
buf.put_u64(self.index_size);
buf.put_u64(self.index_partitions);
buf.put_u64(self.top_level_index_size);
buf.put_u64(self.filter_size);
buf.put_u64(self.raw_key_size);
buf.put_u64(self.raw_value_size);
buf.put_u128(self.created_at);
buf.put_u64(self.item_count);
buf.put_u64(self.key_count);
buf.put_u64(self.tombstone_count);
buf.put_u64(self.num_soft_deletes);
buf.put_u64(self.num_range_deletions);
buf.put_u32(self.block_size);
buf.put_u32(self.block_count);
buf.put_u8(self.compression as u8);
buf.put_u64(self.seqnos.0);
buf.put_u64(self.seqnos.1);
buf.put_u64(self.oldest_key_time.unwrap_or(0));
buf.put_u64(self.newest_key_time.unwrap_or(0));
buf.to_vec()
}
pub(crate) fn decode(buf: Vec<u8>) -> Result<Self> {
let mut buf = &buf[..];
let id = buf.get_u64();
let table_format = buf.get_u8();
let num_entries = buf.get_u64();
let num_deletions = buf.get_u64();
let data_size = buf.get_u64();
let oldest_vlog_file_id = buf.get_u64();
let num_data_blocks = buf.get_u64();
let index_size = buf.get_u64();
let index_partitions = buf.get_u64();
let top_level_index_size = buf.get_u64();
let filter_size = buf.get_u64();
let raw_key_size = buf.get_u64();
let raw_value_size = buf.get_u64();
let created_at = buf.get_u128();
let item_count = buf.get_u64();
let key_count = buf.get_u64();
let tombstone_count = buf.get_u64();
let num_soft_deletes = buf.get_u64();
let num_range_deletions = buf.get_u64();
let block_size = buf.get_u32();
let block_count = buf.get_u32();
let compression = buf.get_u8();
let seqno_start = buf.get_u64();
let seqno_end = buf.get_u64();
let oldest_key_time = Some(buf.get_u64());
let newest_key_time = Some(buf.get_u64());
Ok(Self {
id,
table_format: TableFormat::from_u8(table_format)?,
num_entries,
num_deletions,
data_size,
oldest_vlog_file_id,
num_data_blocks,
index_size,
index_partitions,
top_level_index_size,
filter_size,
raw_key_size,
raw_value_size,
created_at,
item_count,
key_count,
tombstone_count,
num_soft_deletes,
num_range_deletions,
block_size,
block_count,
compression: CompressionType::try_from(compression)?,
seqnos: (seqno_start, seqno_end),
oldest_key_time,
newest_key_time,
})
}
}
#[derive(Debug, Clone)]
pub struct TableMetadata {
pub(crate) has_point_keys: Option<bool>,
pub(crate) smallest_seq_num: Option<u64>,
pub(crate) largest_seq_num: Option<u64>,
pub(crate) properties: Properties,
pub(crate) smallest_point: Option<InternalKey>,
pub(crate) largest_point: Option<InternalKey>,
}
impl TableMetadata {
pub(crate) fn new() -> Self {
TableMetadata {
smallest_point: None,
largest_point: None,
has_point_keys: None,
smallest_seq_num: None,
largest_seq_num: None,
properties: Properties::new(),
}
}
pub(crate) fn set_smallest_point_key(&mut self, k: InternalKey) {
self.smallest_point = Some(k);
self.has_point_keys = Some(true);
}
pub(crate) fn set_largest_point_key(&mut self, k: InternalKey) {
self.largest_point = Some(k);
self.has_point_keys = Some(true);
}
pub(crate) fn update_seq_num(&mut self, seq_num: u64) {
self.smallest_seq_num = Some(self.smallest_seq_num.map_or(seq_num, |s| s.min(seq_num)));
self.largest_seq_num = Some(self.largest_seq_num.map_or(seq_num, |l| l.max(seq_num)));
}
pub(crate) fn encode(&self) -> Vec<u8> {
let mut buf = BytesMut::new();
match self.has_point_keys {
None => buf.put_u8(0),
Some(true) => buf.put_u8(1),
Some(false) => buf.put_u8(2),
}
buf.put_u64(self.smallest_seq_num.unwrap_or(0));
buf.put_u64(self.largest_seq_num.unwrap_or(0));
let properties_encoded = self.properties.encode();
let properties_encoded_len = properties_encoded.len() as u64;
buf.put_u64(properties_encoded_len);
buf.extend_from_slice(&properties_encoded);
match &self.smallest_point {
None => buf.put_u8(0),
Some(key) => {
buf.put_u8(1);
let key_encoded = key.encode();
buf.put_u64(key_encoded.len() as u64); buf.extend_from_slice(&key_encoded); }
}
match &self.largest_point {
None => buf.put_u8(0),
Some(key) => {
buf.put_u8(1);
let key_encoded = key.encode();
buf.put_u64(key_encoded.len() as u64); buf.extend_from_slice(&key_encoded); }
}
buf.to_vec()
}
pub(crate) fn decode(src: &[u8]) -> Result<TableMetadata> {
let mut cursor = std::io::Cursor::new(src);
let has_point_keys = match cursor.get_u8() {
0 => None,
1 => Some(true),
2 => Some(false),
value => {
return Err(Error::from(SSTableError::InvalidHasPointKeysValue {
value,
}))
}
};
let smallest_seq_num = Some(cursor.get_u64());
let largest_seq_num = Some(cursor.get_u64());
let properties_len = cursor.get_u64() as usize;
let mut properties_bytes = vec![0u8; properties_len];
cursor.copy_to_slice(&mut properties_bytes);
let properties = Properties::decode(properties_bytes)?;
let smallest_point = match cursor.get_u8() {
0 => None,
1 => {
let key_len: usize = cursor.get_u64() as usize;
let mut key_bytes = vec![0u8; key_len];
cursor.copy_to_slice(&mut key_bytes);
Some(InternalKey::decode(&key_bytes))
}
value => {
return Err(Error::from(SSTableError::InvalidSmallestPointValue {
value,
}))
}
};
let largest_point = match cursor.get_u8() {
0 => None,
1 => {
let key_len = cursor.get_u64() as usize;
let mut key_bytes = vec![0u8; key_len];
cursor.copy_to_slice(&mut key_bytes);
Some(InternalKey::decode(&key_bytes))
}
value => {
return Err(Error::from(SSTableError::InvalidLargestPointValue {
value,
}))
}
};
Ok(TableMetadata {
has_point_keys,
smallest_seq_num,
largest_seq_num,
properties,
smallest_point,
largest_point,
})
}
}