use std::cmp;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::rc::Rc;
use integer_encoding::{FixedInt, VarInt};
use crate::config::SIZE_OF_U32_BYTES;
use crate::key::{InternalKey, RainDbKeyType};
use crate::tables::errors::BuilderError;
pub(crate) struct BlockBuilder<K>
where
K: RainDbKeyType,
{
prefix_compression_restart_interval: usize,
buffer: Vec<u8>,
restart_points: Vec<u32>,
curr_compressed_count: usize,
block_finalized: bool,
last_key_bytes: Vec<u8>,
key_type_marker: PhantomData<K>,
}
impl<K> BlockBuilder<K>
where
K: RainDbKeyType,
{
pub(crate) fn new(prefix_compression_restart_interval: usize) -> Self {
assert!(prefix_compression_restart_interval > 0, "Attempted to create a block builder with a prefix compression restart interval of 0. Only values > 1 are accepted.");
let restart_points = vec![0];
Self {
prefix_compression_restart_interval,
buffer: vec![],
restart_points,
curr_compressed_count: 0,
block_finalized: false,
last_key_bytes: vec![],
key_type_marker: PhantomData,
}
}
pub(crate) fn add_entry(&mut self, key: Rc<K>, value: &[u8]) {
let key_bytes = key.as_bytes();
assert!(
!self.block_finalized,
"Attempted to add a key-value pair to a finalized block."
);
assert!(
self.curr_compressed_count <= self.prefix_compression_restart_interval,
"Attempted to add too many consecutive compressed entries."
);
assert!(
self.buffer.is_empty()
|| InternalKey::try_from(self.last_key_bytes.clone()).unwrap()
< InternalKey::try_from(key_bytes.clone()).unwrap(),
"{}",
BuilderError::OutOfOrder
);
let mut shared_prefix_size: usize = 0;
if self.curr_compressed_count < self.prefix_compression_restart_interval {
let min_prefix_length = cmp::min(self.last_key_bytes.len(), key_bytes.len());
while shared_prefix_size < min_prefix_length
&& self.last_key_bytes[shared_prefix_size] == key_bytes[shared_prefix_size]
{
shared_prefix_size += 1;
}
} else {
self.restart_points.push(self.buffer.len() as u32);
self.curr_compressed_count = 0;
}
let num_non_shared_bytes = (key_bytes.len() - shared_prefix_size) as u32;
self.buffer
.extend(u32::encode_var_vec(shared_prefix_size as u32));
self.buffer
.extend(u32::encode_var_vec(num_non_shared_bytes));
self.buffer.extend(u32::encode_var_vec(value.len() as u32));
self.buffer
.extend_from_slice(&key_bytes[shared_prefix_size..]);
self.buffer.extend_from_slice(value);
self.last_key_bytes.truncate(shared_prefix_size);
self.last_key_bytes
.extend_from_slice(&key_bytes[shared_prefix_size..]);
assert!(
self.last_key_bytes == key_bytes,
"The reconstructed key was not the same as the key being added."
);
self.curr_compressed_count += 1;
}
pub(crate) fn reset(&mut self) {
self.restart_points.clear();
self.restart_points.push(0);
self.buffer.clear();
self.curr_compressed_count = 0;
self.block_finalized = false;
self.last_key_bytes.clear();
}
pub(crate) fn finalize(&mut self) -> Vec<u8> {
for point in self.restart_points.iter() {
self.buffer.extend(u32::encode_fixed_vec(*point));
}
self.buffer
.extend(u32::encode_fixed_vec(self.restart_points.len() as u32));
self.block_finalized = true;
self.buffer.clone()
}
pub(crate) fn approximate_size(&self) -> usize {
let size_of_data = self.buffer.len();
let size_of_restart_points = self.restart_points.len() * SIZE_OF_U32_BYTES;
let size_of_restart_points_length = SIZE_OF_U32_BYTES;
size_of_data + size_of_restart_points + size_of_restart_points_length
}
pub(crate) fn is_empty(&self) -> bool {
self.buffer.is_empty()
}
}
impl<K> Debug for BlockBuilder<K>
where
K: RainDbKeyType,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BlockBuilder")
.field(
"prefix_compression_restart_interval",
&self.prefix_compression_restart_interval,
)
.field(
"buffer",
&format!("buffer[current len={}", self.buffer.len()),
)
.field("restart_points", &self.restart_points)
.field("curr_compressed_count", &self.curr_compressed_count)
.field("block_finalized", &self.block_finalized)
.field("key_type_marker", &self.key_type_marker)
.finish_non_exhaustive()
}
}
#[cfg(test)]
mod tests {
use pretty_assertions::assert_eq;
use crate::config::PREFIX_COMPRESSION_RESTART_INTERVAL;
use crate::key::InternalKey;
use crate::Operation;
use super::*;
#[test]
fn can_add_entries_as_expected() {
let mut block_builder: BlockBuilder<InternalKey> =
BlockBuilder::new(PREFIX_COMPRESSION_RESTART_INTERVAL);
assert!(block_builder.is_empty());
assert_eq!(block_builder.restart_points.len(), 1);
for idx in 0..PREFIX_COMPRESSION_RESTART_INTERVAL {
let num = idx + 100_000;
let key = InternalKey::new(
num.to_string().as_bytes().to_vec(),
idx as u64,
Operation::Put,
);
block_builder.add_entry(Rc::new(key), &u64::encode_fixed_vec(num as u64));
}
assert_eq!(block_builder.restart_points.len(), 1);
let reset_prefix_key = InternalKey::new(
200_000_u32.to_string().as_bytes().to_vec(),
200_u64,
Operation::Put,
);
block_builder.add_entry(Rc::new(reset_prefix_key), &u64::encode_fixed_vec(200_u64));
assert_eq!(
block_builder.restart_points.len(),
2,
"A new restart point should have been created"
);
let key_after_reset = InternalKey::new(
201_000_u32.to_string().as_bytes().to_vec(),
201_u64,
Operation::Put,
);
block_builder.add_entry(Rc::new(key_after_reset), &u64::encode_fixed_vec(201_u64));
assert_eq!(
block_builder.restart_points.len(),
2,
"There should not be another reset point"
);
assert!(
block_builder.approximate_size() >= block_builder.buffer.len(),
"The approximate size should be for the finalized size of the block"
);
}
#[test]
#[should_panic(expected = "Attempted to add a key but it was out of order.")]
fn panics_if_entries_are_not_added_in_order() {
let mut block_builder: BlockBuilder<InternalKey> =
BlockBuilder::new(PREFIX_COMPRESSION_RESTART_INTERVAL);
block_builder.add_entry(
Rc::new(InternalKey::new(b"def".to_vec(), 399, Operation::Put)),
b"123",
);
block_builder.add_entry(
Rc::new(InternalKey::new(b"abc".to_vec(), 400, Operation::Put)),
b"456",
);
}
#[test]
fn finalize_works_as_expected() {
let mut block_builder: BlockBuilder<InternalKey> =
BlockBuilder::new(PREFIX_COMPRESSION_RESTART_INTERVAL);
for idx in 0..(PREFIX_COMPRESSION_RESTART_INTERVAL + 2) {
let num = idx + 100_000;
let key = InternalKey::new(
num.to_string().as_bytes().to_vec(),
idx as u64,
Operation::Put,
);
block_builder.add_entry(Rc::new(key), &u64::encode_fixed_vec(num as u64));
}
let finalized_block = block_builder.finalize();
assert!(
finalized_block.len() <= block_builder.approximate_size(),
"The buffer should be larger after calling `finalize` because of the addition of the \
serialized restart points."
);
}
#[test]
fn given_a_prefix_compression_restart_interval_of_one_can_build_a_block_as_expected() {
let mut block_builder: BlockBuilder<InternalKey> = BlockBuilder::new(1);
for idx in 0..30_usize {
let num = idx + 100_000;
let key = InternalKey::new(
num.to_string().as_bytes().to_vec(),
idx as u64,
Operation::Put,
);
block_builder.add_entry(Rc::new(key), &u64::encode_fixed_vec(num as u64));
}
assert_eq!(block_builder.restart_points.len(), 30);
}
}