use super::*;
use crate::{
bloom::{bloom_bits_per_key, hash, Filter},
sync::{AtomicU32, Ordering},
};
use alloc::boxed::Box;
use alloc::vec::Vec;
use core::{ops::Deref, ptr::NonNull};
use vpb::{
checksum::Checksumer,
compression::Compressor,
encrypt::{random_iv, Encryptor},
kvstructs::{Key, KeyExt, Value, ValueExt},
BlockOffset, Checksum, Compression, Encryption, Marshaller, TableIndex,
};
use zallocator::Buffer;
#[cfg(feature = "std")]
mod standard;
#[cfg(not(feature = "std"))]
mod no_std;
pub const MAX_ALLOCATOR_INITIAL_SIZE: usize = 256 << 20;
const PADDING: usize = 256;
pub struct BuildData {
alloc: Allocator,
opts: RefCounter<Options>,
block_list: Vec<BBlock>,
index: Vec<u8>,
checksum: Vec<u8>,
checksum_size: u32,
size: u32,
}
#[derive(Debug, Clone)]
#[repr(transparent)]
pub(crate) struct BBlock {
inner: RefCounter<NonNull<BBlockInner>>,
}
unsafe impl Send for BBlock {}
unsafe impl Sync for BBlock {}
struct BBlockInner {
data: Buffer,
base_key: Key,
entry_offsets: Vec<u32>,
end: usize,
}
impl BBlockInner {
#[inline]
pub(crate) const fn new(buf: Buffer) -> Self {
Self {
data: buf,
base_key: Key::new(),
entry_offsets: Vec::new(),
end: 0,
}
}
}
impl Drop for BBlock {
fn drop(&mut self) {
if RefCounter::count(&self.inner) == 1 && !self.inner.as_ptr().is_null() {
unsafe {
core::ptr::drop_in_place(self.inner.as_ptr());
}
}
}
}
impl Default for BBlock {
fn default() -> Self {
Self::dangling()
}
}
impl BBlock {
#[inline(always)]
pub(crate) fn new(buf: Buffer) -> Self {
Self {
inner: RefCounter::new(unsafe {
NonNull::new_unchecked(Box::into_raw(Box::new(BBlockInner::new(buf))))
}),
}
}
#[inline(always)]
pub(crate) fn dangling() -> Self {
Self {
inner: RefCounter::new(unsafe { NonNull::new_unchecked(core::ptr::null_mut()) }),
}
}
#[inline(always)]
pub(crate) fn data(&self) -> &Buffer {
&self.inner().data
}
#[inline(always)]
pub(crate) fn set_data(&self, data: Buffer) {
self.inner_mut().data = data;
}
#[inline(always)]
pub(crate) fn increase_end(&self, add: usize) -> usize {
let end = self.inner_mut().end;
self.inner_mut().end += add;
end
}
#[inline(always)]
pub(crate) fn end(&self) -> usize {
self.inner().end
}
#[inline(always)]
pub(crate) fn set_end(&self, end: usize) {
self.inner_mut().end = end;
}
#[inline(always)]
pub(crate) fn base_key(&self) -> &Key {
&self.inner().base_key
}
#[inline(always)]
pub(crate) fn set_base_key(&self, key: Key) {
self.inner_mut().base_key = key;
}
#[inline(always)]
pub(crate) fn entry_offsets(&self) -> &[u32] {
&self.inner().entry_offsets
}
#[inline(always)]
pub(crate) fn len(&self) -> usize {
self.inner().entry_offsets.len()
}
#[inline(always)]
pub(crate) fn push_entry_offset(&self, offset: u32) {
self.inner_mut().entry_offsets.push(offset);
}
#[inline(always)]
fn inner(&self) -> &BBlockInner {
unsafe { &*(self.inner.deref().as_ptr()) }
}
#[inline(always)]
#[allow(clippy::mut_from_ref)]
fn inner_mut(&self) -> &mut BBlockInner {
unsafe { &mut *(self.inner.deref().as_ptr()) }
}
}
pub struct Builder {
alloc: Allocator,
cur_block: BBlock,
#[cfg(feature = "std")]
compressed_size: RefCounter<AtomicU32>,
#[cfg(not(feature = "std"))]
compressed_size: AtomicU32,
uncompressed_size: AtomicU32,
len_offsets: u32,
key_hashes: Vec<u32>,
opts: RefCounter<super::options::Options>,
max_version: u64,
on_disk_size: u32,
stale_data_size: usize,
#[cfg(feature = "std")]
wg: Option<crossbeam_utils::sync::WaitGroup>,
#[cfg(feature = "std")]
block_tx: Option<crossbeam_channel::Sender<BBlock>>,
block_list: Vec<BBlock>,
}
impl TableBuilder for Builder {
type TableData = BuildData;
fn new(opts: RefCounter<Options>) -> Result<Self>
where
Self: Sized,
{
Builder::new_in(opts)
}
fn options(&self) -> RefCounter<Options> {
self.opts.clone()
}
fn insert_stale(&mut self, key: &Key, val: &Value, value_len: u32) {
self.stale_data_size += key.len()
+ val.len()
+ 4 + 4;
self.insert_in(key, val, value_len, true)
}
fn insert(&mut self, key: &Key, val: &Value, value_len: u32) {
self.insert_in(key, val, value_len, false)
}
#[inline]
fn is_empty(&self) -> bool {
self.key_hashes.len() == 0
}
#[inline]
fn len(&self) -> usize {
self.key_hashes.len()
}
fn reached_capacity(&self) -> bool {
let mut sum_block_size = self.compressed_size.load(Ordering::SeqCst);
if self.opts.compression().is_none() && self.opts.encryption().is_none() {
sum_block_size = self.uncompressed_size.load(Ordering::SeqCst);
}
let block_size = sum_block_size
+ (self.cur_block.len() * core::mem::size_of::<u32>()) as u32
+ 4 + Checksum::ENCODED_SIZE as u32;
let estimated_size = block_size
+ 4 + self.len_offsets;
(estimated_size as u64) > self.opts.table_capacity()
}
fn build(mut self) -> Result<Option<BuildData>> {
self.finish_block(false);
#[cfg(feature = "std")]
{
if self.block_tx.is_some() {
self.block_tx.take();
self.wg.unwrap().wait();
}
}
if self.block_list.is_empty() {
return Ok(None);
}
let key_hashes_len = self.key_hashes.len();
let mut tbi = TableIndex::new();
if self.opts.bloom_ratio() > 0.0 {
let bits = bloom_bits_per_key(key_hashes_len, self.opts.bloom_ratio());
tbi.bloom_filter = Filter::new(self.key_hashes.as_slice(), bits).into_bytes();
}
let mut num_entries = 0;
let mut data_sz = 0;
tbi.offsets = self
.block_list
.iter()
.map(|bblk| {
num_entries += bblk.len();
let end = bblk.end() as u32;
let bo = BlockOffset {
key: bblk.base_key().deref().clone(),
offset: data_sz,
len: end,
};
data_sz += end;
bo
})
.collect();
self.on_disk_size += data_sz;
tbi.uncompressed_size = self.uncompressed_size.load(Ordering::SeqCst);
tbi.key_count = key_hashes_len as u32;
tbi.max_version = self.max_version;
tbi.stale_data_size = self.stale_data_size as u32;
let data = tbi.marshal();
let encryption = self.opts.encryption();
if encryption.is_some() {
let index = data.as_slice().encrypt_to_vec(
encryption.secret(),
&random_iv(),
encryption.algorithm(),
)?;
let cks = index
.as_slice()
.checksum(Options::checksum(&self.opts))
.marshal();
let cks_size = cks.len();
let size = data_sz + (index.len() + cks_size + 4 + 4) as u32;
Ok(Some(BuildData {
alloc: self.alloc,
opts: self.opts,
block_list: self.block_list,
index,
checksum: cks,
checksum_size: cks_size as u32,
size,
}))
} else {
let cks = data
.as_slice()
.checksum(Options::checksum(&self.opts))
.marshal();
let cks_size = cks.len();
let size = data_sz + (data.len() + cks_size + 4 + 4) as u32;
Ok(Some(BuildData {
alloc: self.alloc,
opts: self.opts,
block_list: self.block_list,
index: data,
checksum: cks,
checksum_size: cks_size as u32,
size,
}))
}
}
}
impl Builder {
#[inline]
fn insert_in(&mut self, key: &Key, val: &Value, value_len: u32, is_stale: bool) {
let val_encoded_size = val.encoded_size();
if self.should_finish_block(key.len(), val_encoded_size) {
if is_stale {
self.stale_data_size += key.len()
+ 4 + 4; }
self.finish_block(true)
}
self.insert_helper(key, val, value_len, val_encoded_size)
}
fn finish_block(&mut self, start_new: bool) {
let entries = self.cur_block.entry_offsets();
let entries_len = entries.len();
if entries_len == 0 {
return;
}
let cur_block_end = self.cur_block.end();
self.append(u32_slice_to_bytes(entries));
self.append(&(entries_len as u32).to_be_bytes());
let checksum = (&self.cur_block.data().as_slice()[..cur_block_end])
.checksum(Options::checksum(&self.opts))
.marshal();
self.append(&checksum);
self.append(&(checksum.len() as u32).to_be_bytes());
self.uncompressed_size
.fetch_add(cur_block_end as u32, Ordering::SeqCst);
self.len_offsets +=
(((self.cur_block.base_key().len() as f64) / 4f64) * 4f64).ceil() as u32 + 40;
self.block_list.push(self.cur_block.clone());
let old_block = if start_new {
core::mem::replace(
&mut self.cur_block,
BBlock::new(
self.alloc
.allocate_unchecked((self.opts.block_size() + PADDING) as u64),
),
)
} else {
core::mem::take(&mut self.cur_block)
};
#[cfg(feature = "std")]
{
if let Some(ref tx) = self.block_tx {
if let Err(e) = tx.send(old_block) {
#[cfg(feature = "tracing")]
tracing::error!(target: "table_builder", info = "fail send bblock to the processor", err = ?e);
panic!("fail send bblock to the processor: {e}");
}
}
}
#[cfg(not(feature = "std"))]
{
if self.opts.compression().is_some() || self.opts.encryption().is_some() {
Self::process_block(
&self.alloc,
old_block,
&self.compressed_size,
self.opts.compression(),
self.opts.encryption(),
)
}
}
}
fn should_finish_block(&self, key_size: usize, val_encoded_size: u32) -> bool {
let current_block_end = self.cur_block.end() as u32;
let len = self.cur_block.len();
if len == 0 {
return false;
}
assert!(((len as u32) + 1) * 4 + 4 + (Checksum::ENCODED_SIZE as u32) < u32::MAX);
let entires_offset_size = ((len as u32) + 1)
* 4
+ 4 + (Checksum::ENCODED_SIZE as u32);
let estimated_size = current_block_end
+ 6 + key_size as u32
+ val_encoded_size
+ entires_offset_size
+ self.opts.encryption().block_size() as u32;
assert!((current_block_end as u64) + (estimated_size as u64) < u32::MAX as u64);
estimated_size > self.opts.block_size() as u32
}
#[inline]
fn key_diff<'a>(&self, new_key: &'a [u8]) -> &'a [u8] {
let base_key = self.cur_block.base_key();
let (new_key_len, base_key_len) = (new_key.len(), base_key.len());
let mut idx = 0;
while idx < new_key_len && idx < base_key_len {
if new_key[idx] != base_key[idx] {
break;
}
idx += 1;
}
&new_key[idx..]
}
fn insert_helper(&mut self, key: &Key, val: &Value, vplen: u32, val_encoded_size: u32) {
self.key_hashes.push(hash(key.parse_key()));
let version = key.parse_timestamp();
if version > self.max_version {
self.max_version = version;
}
let base_key = self.cur_block.base_key();
let diff_key = if base_key.is_empty() {
self.cur_block.set_base_key(key.clone());
key
} else {
self.key_diff(key)
};
let key_len = key.len();
let diff_key_len = diff_key.len();
assert!(key_len - diff_key_len <= u16::MAX as usize);
assert!(diff_key_len <= u16::MAX as usize);
let header = Header {
overlap: (key_len - diff_key_len) as u16,
diff: diff_key_len as u16,
};
self.cur_block
.push_entry_offset(self.cur_block.end() as u32);
self.append(&header.encode());
self.append(diff_key);
let dst = self.allocate(val_encoded_size as usize);
val.encode(dst.as_mut_slice());
self.on_disk_size += vplen;
}
fn allocate(&self, need: usize) -> Buffer {
let data = self.cur_block.data();
let prev_end = self.cur_block.end();
if data.as_ref()[prev_end..].len() < need {
let sz = (2 * data.len() as u64)
.min(zallocator::Zallocator::MAX_ALLOC)
.max((prev_end + need) as u64);
let tmp = self.alloc.allocate_unchecked(sz);
unsafe {
core::ptr::copy_nonoverlapping(
data.as_ref()[..prev_end].as_ptr(),
tmp.as_mut_ptr(),
prev_end,
);
}
self.cur_block.set_data(tmp);
}
self.cur_block.increase_end(need);
self.cur_block.data().slice(prev_end..prev_end + need)
}
fn append(&self, data: &[u8]) {
let dst = self.allocate(data.len());
Buffer::copy_from_slice(&dst, data)
}
#[inline(always)]
fn process_block(
alloc: &Allocator,
item: BBlock,
compressed_size: &AtomicU32,
compression: Compression,
encryption: &Encryption,
) {
let end = item.end();
let mut block_buf = item.data().slice(..end);
block_buf = Self::compress_data(alloc, block_buf, compression);
block_buf = Self::encrypt_data(alloc, block_buf, encryption);
let allocated_space = vpb::compression::max_encoded_len(compression, end) + PADDING + 1;
let cap = block_buf.capacity();
assert!(cap <= allocated_space);
item.set_data(block_buf);
item.set_end(cap);
compressed_size.fetch_add(cap as u32, Ordering::SeqCst);
}
#[inline(always)]
fn compress_data(alloc: &Allocator, data: Buffer, compression_algo: Compression) -> Buffer {
if compression_algo.is_none() {
return data;
}
let buffer = alloc.allocate_unchecked(data.max_encoded_len(compression_algo) as u64);
let end = data
.compress_to(buffer.as_mut_slice(), compression_algo)
.map_err(|e| {
#[cfg(feature = "tracing")]
{
tracing::error!(target: "table_builder", err=%e, info = "error while compressing block in table builder.");
}
e
})
.unwrap();
buffer.slice(..end)
}
#[inline(always)]
fn encrypt_data(_alloc: &Allocator, data: Buffer, encryption: &Encryption) -> Buffer {
let algo = encryption.algorithm();
match algo {
#[cfg(any(feature = "aes", feature = "aes-std"))]
vpb::EncryptionAlgorithm::Aes => {
let iv = vpb::encrypt::random_iv();
let key = encryption.secret();
let buffer = _alloc.allocate_unchecked((data.capacity() + iv.len()) as u64);
let slice = buffer.as_mut_slice();
data.encrypt_to(&mut slice[..data.capacity()], key, &iv, algo)
.map_err(|e| {
#[cfg(feature = "tracing")]
{
tracing::error!(target: "table_builder", err=%e, info = "error while encrypting block in table builder.");
}
e
})
.unwrap();
slice[data.capacity()..].copy_from_slice(&iv);
buffer
}
_ => data,
}
}
}