use super::varint;
const RESTART_INTERVAL: usize = crate::constants::SSTABLE_RESTART_INTERVAL;
pub struct BlockBuilder {
buffer: Vec<u8>,
restarts: Vec<u32>,
counter: usize,
last_key: Vec<u8>,
}
impl BlockBuilder {
pub fn new() -> Self {
Self {
buffer: Vec::new(),
restarts: vec![0],
counter: 0,
last_key: Vec::new(),
}
}
pub fn add(&mut self, key: &[u8], value: &[u8]) {
let mut shared_length = self
.last_key
.iter()
.zip(key.iter())
.take_while(|(a, b)| a == b)
.count();
if self.counter == RESTART_INTERVAL {
shared_length = 0;
self.restarts.push(self.buffer.len() as u32);
self.counter = 0;
}
let unshared_length = key.len() - shared_length;
varint::encode_u32(shared_length as u32, &mut self.buffer);
varint::encode_u32(unshared_length as u32, &mut self.buffer);
varint::encode_u32(value.len() as u32, &mut self.buffer);
self.buffer.extend_from_slice(&key[shared_length..]);
self.buffer.extend_from_slice(value);
self.counter += 1;
self.last_key = key.to_vec();
}
pub fn finish(&mut self) -> &[u8] {
for &offset in &self.restarts {
self.buffer.extend_from_slice(&offset.to_le_bytes());
}
let num_restarts = self.restarts.len() as u32;
self.buffer.extend_from_slice(&num_restarts.to_le_bytes());
&self.buffer
}
pub fn is_block_maxed(&self) -> bool {
self.buffer.len() >= crate::constants::SSTABLE_BLOCK_SIZE
}
pub fn last_key(&self) -> Vec<u8> {
self.last_key.clone()
}
pub fn buffer_len(&self) -> usize {
self.buffer.len()
}
}
pub struct BlockReader<'a> {
pub(crate) data: &'a [u8],
pub(crate) restarts_offset: usize,
pub num_restarts: usize,
}
impl<'a> BlockReader<'a> {
pub fn new(data: &'a [u8]) -> Self {
let len = data.len();
let num_restarts = u32::from_le_bytes(data[len - 4..].try_into().unwrap()) as usize;
let restarts_offset = len - (num_restarts * 4) - 4;
Self {
data,
restarts_offset,
num_restarts,
}
}
pub fn get(&self, search_key: &[u8]) -> Option<&'a [u8]> {
if self.num_restarts == 0 {
return None;
}
let mut left = 0;
let mut right = self.num_restarts - 1;
let mut best_restart_index = 0;
while left <= right {
let mid = left + (right - left) / 2;
let offset = self.read_restart_offset(mid) as usize;
let mut ptr = offset;
let (_, len1) = varint::decode_u32(&self.data[ptr..])?;
ptr += len1;
let (unshared_len, len2) = varint::decode_u32(&self.data[ptr..])?;
ptr += len2;
let (_, len3) = varint::decode_u32(&self.data[ptr..])?;
ptr += len3;
let key_at_mid = &self.data[ptr..ptr + unshared_len as usize];
if key_at_mid < search_key {
best_restart_index = mid;
left = mid + 1;
} else if key_at_mid > search_key {
if mid == 0 {
break;
}
right = mid - 1;
} else {
best_restart_index = mid;
break;
}
}
let mut ptr = self.read_restart_offset(best_restart_index) as usize;
let mut current_key = Vec::new();
while ptr < self.restarts_offset {
let (shared_len, len1) = varint::decode_u32(&self.data[ptr..])?;
ptr += len1;
let (unshared_len, len2) = varint::decode_u32(&self.data[ptr..])?;
ptr += len2;
let (value_len, len3) = varint::decode_u32(&self.data[ptr..])?;
ptr += len3;
current_key.truncate(shared_len as usize);
let unshared_bytes = &self.data[ptr..ptr + unshared_len as usize];
current_key.extend_from_slice(unshared_bytes);
ptr += unshared_len as usize;
let value_bytes = &self.data[ptr..ptr + value_len as usize];
ptr += value_len as usize;
if current_key.as_slice() == search_key {
return Some(value_bytes);
} else if current_key.as_slice() > search_key {
return None;
}
}
None
}
pub fn lookup(&self, search_key: &[u8]) -> Option<&'a [u8]> {
if self.num_restarts == 0 {
return None;
}
let mut left = 0;
let mut right = self.num_restarts - 1;
let mut best_restart_index = 0;
while left <= right {
let mid = left + (right - left) / 2;
let offset = self.read_restart_offset(mid) as usize;
let mut ptr = offset;
let (_, len1) = varint::decode_u32(&self.data[ptr..])?;
ptr += len1;
let (unshared_len, len2) = varint::decode_u32(&self.data[ptr..])?;
ptr += len2;
let (_, len3) = varint::decode_u32(&self.data[ptr..])?;
ptr += len3;
let key_at_mid = &self.data[ptr..ptr + unshared_len as usize];
if key_at_mid < search_key {
best_restart_index = mid;
left = mid + 1;
} else {
if mid == 0 {
break;
}
right = mid - 1;
}
}
let mut ptr = self.read_restart_offset(best_restart_index) as usize;
let mut current_key = Vec::new();
while ptr < self.restarts_offset {
let (shared_len, len1) = varint::decode_u32(&self.data[ptr..])?;
ptr += len1;
let (unshared_len, len2) = varint::decode_u32(&self.data[ptr..])?;
ptr += len2;
let (value_len, len3) = varint::decode_u32(&self.data[ptr..])?;
ptr += len3;
current_key.truncate(shared_len as usize);
let unshared_bytes = &self.data[ptr..ptr + unshared_len as usize];
current_key.extend_from_slice(unshared_bytes);
ptr += unshared_len as usize;
let value_bytes = &self.data[ptr..ptr + value_len as usize];
ptr += value_len as usize;
if current_key.as_slice() >= search_key {
return Some(value_bytes);
}
}
None
}
fn read_restart_offset(&self, index: usize) -> u32 {
let offset = self.restarts_offset + index * 4;
u32::from_le_bytes(self.data[offset..offset + 4].try_into().unwrap())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_block_builder_add() {
let mut builder = BlockBuilder::new();
builder.add(b"apple", b"val1");
builder.add(b"appstore", b"val2");
assert_eq!(builder.counter, 2);
assert_eq!(builder.last_key, b"appstore");
assert!(builder.buffer.len() > 0);
}
#[test]
fn test_block_builder_restarts() {
let mut builder = BlockBuilder::new();
for i in 0..17 {
let key = format!("key{:02}", i); builder.add(key.as_bytes(), b"val");
}
assert_eq!(builder.restarts.len(), 2);
assert_eq!(builder.restarts[0], 0);
assert!(builder.restarts[1] > 0);
assert_eq!(builder.counter, 1);
assert_eq!(builder.last_key, b"key16");
}
#[test]
fn test_block_builder_finish() {
let mut builder = BlockBuilder::new();
builder.add(b"apple", b"val1");
builder.add(b"appstore", b"val2");
let data = builder.finish();
let len = data.len();
let num_restarts_bytes = &data[len - 4..];
let num_restarts = u32::from_le_bytes(num_restarts_bytes.try_into().unwrap());
assert_eq!(num_restarts, 1);
let first_restart_bytes = &data[len - 8..len - 4];
let first_restart = u32::from_le_bytes(first_restart_bytes.try_into().unwrap());
assert_eq!(first_restart, 0);
}
#[test]
fn test_block_reader_init() {
let mut builder = BlockBuilder::new();
builder.add(b"apple", b"val1");
builder.add(b"appstore", b"val2");
let data = builder.finish();
let reader = BlockReader::new(data);
assert_eq!(reader.num_restarts, 1);
assert_eq!(reader.restarts_offset, 24);
}
#[test]
fn test_block_reader_restart_offset() {
let mut builder = BlockBuilder::new();
for i in 0..17 {
let key = format!("key{:02}", i);
builder.add(key.as_bytes(), b"val");
}
let data = builder.finish();
let reader = BlockReader::new(data);
assert_eq!(reader.read_restart_offset(0), 0);
assert!(reader.read_restart_offset(1) > 0);
}
#[test]
fn test_block_reader_get() {
let mut builder = BlockBuilder::new();
builder.add(b"apple", b"val_apple");
builder.add(b"appstore", b"val_appstore");
builder.add(b"banana", b"val_banana");
builder.add(b"bear", b"val_bear");
builder.add(b"cat", b"val_cat");
let data = builder.finish();
let reader = BlockReader::new(data);
assert_eq!(reader.get(b"apple"), Some(b"val_apple".as_slice()));
assert_eq!(reader.get(b"appstore"), Some(b"val_appstore".as_slice()));
assert_eq!(reader.get(b"cat"), Some(b"val_cat".as_slice()));
assert_eq!(reader.get(b"a"), None); assert_eq!(reader.get(b"bat"), None); assert_eq!(reader.get(b"zebra"), None); }
}