use core::ops::Deref;
use vpb::{
checksum::Checksumer,
kvstructs::{
bytes::{BufMut, BytesMut},
KeyExt, ValueExt,
},
BlockOffset, Checksum, TableIndex,
};
use crate::bloom::{bloom_bits_per_key, hash, Filter};
use super::*;
#[cfg(feature = "std")]
mod standard;
#[cfg(feature = "std")]
pub use standard::*;
#[cfg(not(feature = "std"))]
mod no_std;
#[cfg(not(feature = "std"))]
pub use no_std::*;
pub struct SimpleBuildData {
data: Bytes,
}
pub struct SimpleBuilder {
buf: BytesMut,
base_key: Key,
last_block_offset: u32,
key_hashes: Vec<u32>,
entry_offsets: Vec<u32>,
table_index: TableIndex,
stale_data_size: usize,
max_version: u64,
len_offsets: u32,
opts: RefCounter<Options>,
}
impl super::TableBuilder for SimpleBuilder {
type TableData = SimpleBuildData;
fn new(opts: RefCounter<Options>) -> Result<Self>
where
Self: Sized,
{
SimpleBuilder::new_in(opts)
}
fn options(&self) -> RefCounter<Options> {
self.opts.clone()
}
#[inline]
fn insert_stale(&mut self, key: &Key, val: &Value, value_len: u32) {
self.stale_data_size += key.len()
+ val.len()
+ 4 + 4;
self.insert_in(key, val, value_len, true)
}
#[inline]
fn insert(&mut self, key: &Key, val: &Value, value_len: u32) {
self.insert_in(key, val, value_len, false)
}
#[inline]
fn is_empty(&self) -> bool {
self.key_hashes.is_empty()
}
#[inline]
fn len(&self) -> usize {
self.key_hashes.len()
}
#[inline]
fn reached_capacity(&self) -> bool {
let block_size = self.buf.len() as u32
+ (self.entry_offsets.len() * 4) as u32
+ 4 + Checksum::ENCODED_SIZE as u32;
let estimated_size = block_size
+ 4 + self.len_offsets;
(estimated_size as u64) > self.opts.table_capacity()
}
#[inline]
fn build(mut self) -> Result<Option<Self::TableData>> {
self.finish_block();
if self.buf.is_empty() {
return Ok(None);
}
if self.opts.bloom_ratio() > 0.0 {
let bits = bloom_bits_per_key(self.key_hashes.len(), self.opts.bloom_ratio());
self.table_index.bloom_filter =
Filter::new(self.key_hashes.as_slice(), bits).into_bytes();
}
let index = self.table_index.marshal();
let index_len = index.len();
assert!(index_len < u32::MAX as usize);
self.buf.put_slice(&index);
self.buf.put_u32(index_len as u32);
let cks = index
.as_slice()
.checksum(Options::checksum(&self.opts))
.marshal();
let cks_len = cks.len() as u32;
self.buf.put_slice(&cks);
self.buf.put_u32(cks_len as u32);
Ok(Some(SimpleBuildData {
data: self.buf.freeze(),
}))
}
}
impl SimpleBuilder {
#[inline]
fn insert_in(&mut self, key: &Key, val: &Value, value_len: u32, is_stale: bool) {
let val_encoded_size = val.encoded_size();
if self.should_finish_block(key.len(), val_encoded_size) {
if is_stale {
self.stale_data_size += key.len()
+ 4 + 4; }
self.finish_block();
self.base_key.clear();
assert!(self.buf.len() < u32::MAX as usize);
self.last_block_offset = self.buf.len() as u32;
self.entry_offsets.clear();
}
self.insert_helper(key, val, value_len, val_encoded_size)
}
fn finish_block(&mut self) {
let entries_len = self.entry_offsets.len();
if entries_len == 0 {
return;
}
self.buf.put_slice(u32_slice_to_bytes(&self.entry_offsets));
self.buf.put_u32(entries_len as u32);
let checksum = (&self.buf[self.last_block_offset as usize..])
.checksum(Options::checksum(&self.opts))
.marshal();
self.buf.put_slice(&checksum);
self.buf.put_u32(checksum.len() as u32);
self.len_offsets += (((self.base_key.len() as f64) / 4f64) * 4f64).ceil() as u32 + 40;
self.table_index.offsets.push(BlockOffset {
key: self.base_key.deref().clone(),
offset: self.last_block_offset,
len: self.buf.len() as u32 - self.last_block_offset,
});
self.table_index.key_count += entries_len as u32;
}
fn should_finish_block(&self, key_size: usize, val_encoded_size: u32) -> bool {
let len = self.entry_offsets.len();
if len == 0 {
return false;
}
assert!(((len as u32) + 1) * 4 + 4 + (Checksum::ENCODED_SIZE as u32) < u32::MAX);
let entires_offset_size = ((len as u32) + 1)
* 4
+ 4 + (Checksum::ENCODED_SIZE as u32);
let estimated_size = (self.buf.len() as u32 - self.last_block_offset)
+ 6 + key_size as u32
+ val_encoded_size
+ entires_offset_size;
assert!((self.buf.len() as u64) + (estimated_size as u64) < u32::MAX as u64);
estimated_size > self.opts.block_size() as u32
}
#[inline]
fn key_diff<'a>(&self, new_key: &'a Key) -> &'a [u8] {
let (new_key_len, base_key_len) = (new_key.len(), self.base_key.len());
let mut idx = 0;
while idx < new_key_len && idx < base_key_len {
if new_key[idx] != self.base_key[idx] {
break;
}
idx += 1;
}
&new_key[idx..]
}
fn insert_helper(&mut self, key: &Key, val: &Value, vplen: u32, val_encoded_size: u32) {
self.key_hashes.push(hash(key.parse_key()));
let version = key.parse_timestamp();
if version > self.max_version {
self.max_version = version;
}
let diff_key = if self.base_key.is_empty() {
self.base_key = key.clone();
key
} else {
self.key_diff(key)
};
let key_len = key.len();
let diff_key_len = diff_key.len();
assert!(key_len - diff_key_len <= u16::MAX as usize);
assert!(diff_key_len <= u16::MAX as usize);
let header = Header {
overlap: (key_len - diff_key_len) as u16,
diff: diff_key_len as u16,
};
self.entry_offsets
.push(self.buf.len() as u32 - self.last_block_offset);
header.encode_to_buf(&mut self.buf);
self.buf.put_slice(diff_key);
val.encode_to_buf(&mut self.buf);
let sst_size = val_encoded_size + diff_key_len as u32 + 4;
self.table_index.estimated_size += sst_size as u32 + vplen;
self.table_index.max_version = self.table_index.max_version.max(key.parse_timestamp());
}
}