use std::{cmp::Ordering, marker::PhantomData};
use bytes::{BufMut, Bytes, BytesMut};
use tempest_core::utils::ByteSize;
use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout, LittleEndian, Ref, U32};
use crate::base::{Comparer, InternalKey, KeyKind, KeyTrailer};
#[derive(IntoBytes, FromBytes, KnownLayout, Immutable)]
#[repr(C)]
pub struct BlockEntryHeader {
shared_key_len: U32<LittleEndian>,
unshared_key_len: U32<LittleEndian>,
value_len: U32<LittleEndian>,
}
#[derive(Debug, PartialEq, Eq)]
pub enum BlockBuilderStatus {
Empty,
Ok,
Full,
}
#[derive(IntoBytes, FromBytes, KnownLayout, Immutable)]
#[repr(C)]
pub struct BlockFooter {
restart_count: U32<LittleEndian>,
}
pub struct BlockBuilder {
buf: BytesMut,
entry_count: u32,
last_key: BytesMut,
restart_offsets: Vec<u32>,
target_size: usize,
restart_interval: u32,
}
impl BlockBuilder {
pub fn new(target_size: usize, restart_interval: u32) -> Self {
Self {
buf: BytesMut::with_capacity(target_size * 2),
entry_count: 0,
last_key: BytesMut::with_capacity(64),
restart_offsets: Vec::new(),
target_size,
restart_interval,
}
}
#[instrument(skip_all, level = "trace", fields(pos=self.buf.len(), entry_count=self.entry_count))]
pub fn write_entry(
&mut self,
key: impl AsRef<[u8]>,
trailer: KeyTrailer,
value: impl AsRef<[u8]>,
) -> BlockBuilderStatus {
let key = key.as_ref();
let value = value.as_ref();
trace!("writing entry");
let restart_point_reached = self.entry_count % self.restart_interval == 0;
if restart_point_reached {
trace!("reached restart point");
self.last_key.clear();
self.restart_offsets.push(self.buf.len() as u32);
}
let shared_key_len = self.shared_key_len(key);
let unshared_key_len = key.len() as u32 - shared_key_len;
let value_len = value.len() as u32;
let header = BlockEntryHeader {
shared_key_len: shared_key_len.into(),
unshared_key_len: unshared_key_len.into(),
value_len: value_len.into(),
};
self.buf.put(header.as_bytes());
self.buf.put(&key[shared_key_len as usize..]);
self.buf.put(trailer.as_bytes());
self.buf.put(value.as_ref());
self.last_key.truncate(shared_key_len as usize);
self.last_key
.extend_from_slice(&key[shared_key_len as usize..]);
self.entry_count += 1;
self.get_status()
}
fn shared_key_len(&self, new_key: &[u8]) -> u32 {
let mut pos = 0;
let limit = self.last_key.len().min(new_key.len());
while pos < limit {
if self.last_key[pos] == new_key[pos] {
pos += 1;
} else {
break;
}
}
pos as u32
}
pub fn get_status(&self) -> BlockBuilderStatus {
if self.buf.is_empty() {
BlockBuilderStatus::Empty
} else if self.buf.len() >= self.target_size {
BlockBuilderStatus::Full
} else {
BlockBuilderStatus::Ok
}
}
pub fn finalize(mut self) -> BytesMut {
self.buf.put(self.restart_offsets.as_slice().as_bytes());
let restart_count = self.restart_offsets.len() as u32;
let footer = BlockFooter {
restart_count: restart_count.into(),
};
self.buf.put(footer.as_bytes());
self.buf
}
}
fn parse_block(buf: &[u8]) -> (&[u8], &[U32<LittleEndian>]) {
let footer_start = buf.len() - size_of::<BlockFooter>();
let count = BlockFooter::read_from_bytes(&buf[footer_start..])
.unwrap()
.restart_count
.get() as usize;
let offsets_start = footer_start - count * size_of::<U32<LittleEndian>>();
let entries = &buf[..offsets_start];
let offsets = <[U32<LittleEndian>]>::ref_from_bytes(&buf[offsets_start..footer_start]).unwrap();
(entries, offsets)
}
fn parse_offsets(buf: &[u8], entries_end: usize) -> &[U32<LittleEndian>] {
let footer_start = buf.len() - size_of::<BlockFooter>();
<[U32<LittleEndian>]>::ref_from_bytes(&buf[entries_end..footer_start]).unwrap()
}
pub struct BlockIterator<C: Comparer> {
buf: Bytes,
pos: usize,
restart_count: usize,
entries_end: usize,
last_key: Vec<u8>,
_marker: PhantomData<C>,
}
impl<C: Comparer> BlockIterator<C> {
pub fn new(buf: Bytes) -> Self {
let (entries, offsets) = parse_block(&buf);
let restart_count = offsets.len();
let entries_end = entries.len();
Self {
buf,
pos: 0,
restart_count,
entries_end,
last_key: Vec::new(),
_marker: PhantomData,
}
}
}
impl<C: Comparer> Iterator for BlockIterator<C> {
type Item = (InternalKey<C>, Bytes);
fn next(&mut self) -> Option<Self::Item> {
if self.pos >= self.entries_end {
return None;
}
let entries = &self.buf[..self.entries_end];
let (header, _) = Ref::<_, BlockEntryHeader>::from_prefix(&entries[self.pos..]).unwrap();
let shared = header.shared_key_len.get() as usize;
let unshared = header.unshared_key_len.get() as usize;
let value_len = header.value_len.get() as usize;
self.pos += size_of::<BlockEntryHeader>();
self.last_key.truncate(shared);
self.last_key
.extend_from_slice(&entries[self.pos..self.pos + unshared]);
self.pos += unshared;
let trailer_bytes: [u8; 8] = entries[self.pos..self.pos + size_of::<KeyTrailer>()]
.try_into()
.unwrap();
let trailer =
KeyTrailer::try_from(trailer_bytes).expect("invalid key trailer in CRC-verified block");
self.pos += size_of::<KeyTrailer>();
let value = self.buf.slice(self.pos..self.pos + value_len);
self.pos += value_len;
let key = InternalKey::new(Bytes::copy_from_slice(&self.last_key), trailer);
Some((key, value))
}
}
pub struct BlockReader<C: Comparer> {
buf: Bytes,
_marker: PhantomData<C>,
}
impl<C: Comparer> BlockReader<C> {
pub fn new(buf: Bytes) -> Self {
Self {
buf,
_marker: PhantomData,
}
}
#[instrument(skip(self), level = "trace")]
pub fn get(&self, search_key: &InternalKey<C, &[u8]>) -> Option<(KeyKind, Bytes)> {
let (entries, offsets) = parse_block(&self.buf);
trace!(
entries_section_size = ?ByteSize(entries.len() as u64),
offsets_count = offsets.len(),
"parsed block"
);
let restart_key = |offset: usize| -> InternalKey<C, &[u8]> {
let (header, rest) =
Ref::<_, BlockEntryHeader>::from_prefix(&entries[offset..]).unwrap();
let unshared = header.unshared_key_len.get() as usize;
let key = &rest[..unshared];
let trailer_bytes: [u8; 8] = (&rest[unshared..unshared + size_of::<KeyTrailer>()])
.try_into()
.unwrap();
let trailer = KeyTrailer::try_from(trailer_bytes)
.expect("invalid key trailer in CRC-verified block");
let key = InternalKey::new(key, trailer);
trace!(offset, ?key, "parsed restart key");
key
};
trace!("starting binary search");
let start_i = match offsets.binary_search_by(|offset| {
let restart_key = restart_key(offset.get() as usize);
trace!("binary search: compare logical");
let cmp = match restart_key.compare_logical(&search_key) {
Ordering::Equal => {
trace!("binary search: logical equal, compare seqnum");
search_key.trailer().cmp(&restart_key.trailer())
}
other => other,
};
match cmp {
Ordering::Less => trace!(?cmp, "binary search: jumping forth"),
Ordering::Equal => trace!(?cmp, "binary search: found"),
Ordering::Greater => trace!(?cmp, "binary search: jumping back"),
}
cmp
}) {
Ok(i) => i,
Err(i) => i.saturating_sub(1),
};
let start_offset = offsets[start_i].get() as usize;
let end_offset = entries.len();
let mut pos = start_offset;
let mut last_key: Vec<u8> = Vec::new();
trace!("starting linear search");
while pos < end_offset {
let (header, _) = Ref::<_, BlockEntryHeader>::from_prefix(&entries[pos..]).unwrap();
let shared = header.shared_key_len.get() as usize;
let unshared = header.unshared_key_len.get() as usize;
let value_len = header.value_len.get() as usize;
pos += size_of::<BlockEntryHeader>();
last_key.truncate(shared);
last_key.extend_from_slice(&entries[pos..pos + unshared]);
pos += unshared;
let trailer_bytes: [u8; 8] = (&entries[pos..pos + size_of::<KeyTrailer>()])
.try_into()
.unwrap();
let trailer = KeyTrailer::try_from(trailer_bytes)
.expect("invalid key trailer in CRC-verified block");
pos += size_of::<KeyTrailer>();
let current = InternalKey::<C, &[u8]>::new(&last_key, trailer);
trace!(found=?current, ?search_key, "linear search: compare logical");
match current.compare_logical(&search_key) {
Ordering::Less => pos += value_len,
Ordering::Equal => {
if current.trailer().seqnum() <= search_key.trailer().seqnum() {
return Some((
current.trailer().kind(),
self.buf.slice(pos..pos + value_len),
));
}
trace!(pos, "value too new, ignoring");
pos += value_len
}
Ordering::Greater => break,
}
}
trace!("key not found during linear search");
None
}
pub fn iter(&self) -> BlockIterator<C> {
BlockIterator::new(self.buf.clone())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
base::{DefaultComparer, InternalKey, KeyKind, KeyTrailer, SeqNum},
config::StorageConfig,
};
use bytes::Bytes;
use tempest_core::test_utils::setup_tracing;
fn make_trailer(seqnum: u64, kind: KeyKind) -> KeyTrailer {
KeyTrailer::new(unsafe { SeqNum::new_unchecked(seqnum) }, kind)
}
fn make_key(s: &str, seqnum: u64) -> InternalKey<DefaultComparer, &[u8]> {
InternalKey::new(s.as_ref(), make_trailer(seqnum, KeyKind::Put))
}
fn build_block<K: AsRef<[u8]>, V: AsRef<[u8]>>(entries: &[(K, u64, V)]) -> Bytes {
let entries = entries
.iter()
.map(|(k, seqnum, v)| (k.as_ref(), *seqnum, v.as_ref()));
let config = StorageConfig::for_testing().sst.write;
let mut builder =
BlockBuilder::new(config.block_target_size, config.block_restart_interval);
for (key, seqnum, value) in entries {
let trailer = make_trailer(seqnum, KeyKind::Put);
builder.write_entry(key, trailer, value);
}
builder.finalize().freeze()
}
#[test]
fn test_get_existing_key() {
let buf = build_block(&[
("apple", 1, "fruit"),
("banana", 1, "yellow"),
("cherry", 1, "red"),
]);
let reader = BlockReader::<DefaultComparer>::new(buf);
let result = reader.get(&make_key("banana", 1));
assert_eq!(result.unwrap().1, "yellow");
}
#[test]
fn test_get_missing_key() {
let buf = build_block(&[("apple", 1, "fruit"), ("cherry", 1, "red")]);
let reader = BlockReader::<DefaultComparer>::new(buf);
assert!(reader.get(&make_key("banana", 1)).is_none());
}
#[test]
fn test_get_key_before_first() {
let buf = build_block(&[("banana", 1, "yellow"), ("cherry", 1, "red")]);
let reader = BlockReader::<DefaultComparer>::new(buf);
assert!(reader.get(&make_key("apple", 1)).is_none());
}
#[test]
fn test_get_key_after_last() {
let buf = build_block(&[("apple", 1, "fruit"), ("banana", 1, "yellow")]);
let reader = BlockReader::<DefaultComparer>::new(buf);
assert!(reader.get(&make_key("cherry", 1)).is_none());
}
#[test]
fn test_prefix_compression_across_restart_interval() {
setup_tracing();
let mut entries: Vec<(String, u64, String)> = Vec::new();
let restart_interval = StorageConfig::for_testing()
.sst
.write
.block_restart_interval;
let count = restart_interval + restart_interval / 2;
info!(count, "filling block with entries");
for i in 0..count {
entries.push((format!("prefix:key:{:04}", i), 1, format!("value:{}", i)));
}
let entries_ref: Vec<(&str, u64, &str)> = entries
.iter()
.map(|(k, s, v)| (k.as_str(), *s, v.as_str()))
.collect();
let buf = build_block(&entries_ref);
let reader = BlockReader::<DefaultComparer>::new(buf);
let spots = [
0, restart_interval / 2, restart_interval - 1, restart_interval, count - 1, ];
info!(?spots, "checking a few entries within block");
for i in spots {
let key = format!("prefix:key:{:04}", i);
let expected = format!("value:{}", i);
let result = reader.get(&make_key(&key, 1));
assert_eq!(result.unwrap().1, expected, "failed at index {}", i);
}
}
#[test]
fn test_restart_offsets_written_in_finalize() {
let target_size = 4096;
let restart_interval = 4;
let mut builder = BlockBuilder::new(target_size, restart_interval);
for i in 0..=restart_interval {
let k = Bytes::copy_from_slice(format!("key:{:04}", i).as_bytes());
let v = Bytes::from("v");
builder.write_entry(&k, make_trailer(1, KeyKind::Put), &v);
}
let buf = builder.finalize().freeze();
let (_, footer) = Ref::<_, BlockFooter>::from_suffix(buf.as_ref()).unwrap();
assert_eq!(footer.restart_count.get(), 2);
}
#[test]
fn test_get_first_and_last_entry() {
let buf = build_block(&[
("aaa", 1, "first"),
("mmm", 1, "middle"),
("zzz", 1, "last"),
]);
let reader = BlockReader::<DefaultComparer>::new(buf);
assert_eq!(reader.get(&make_key("aaa", 1)).unwrap().1, "first");
assert_eq!(reader.get(&make_key("zzz", 1)).unwrap().1, "last");
}
#[test]
fn test_empty_value() {
let buf = build_block(&[("key", 1, "")]);
let reader = BlockReader::<DefaultComparer>::new(buf);
assert_eq!(reader.get(&make_key("key", 1)).unwrap().1, "");
}
#[test]
fn test_iterator_all_entries() {
let buf = build_block(&[
("apple", 1, "fruit"),
("banana", 1, "yellow"),
("cherry", 1, "red"),
]);
let iter = BlockIterator::<DefaultComparer>::new(buf);
let results: Vec<_> = iter.map(|(k, v)| (k.key().clone(), v)).collect();
assert_eq!(results[0], (Bytes::from("apple"), Bytes::from("fruit")));
assert_eq!(results[1], (Bytes::from("banana"), Bytes::from("yellow")));
assert_eq!(results[2], (Bytes::from("cherry"), Bytes::from("red")));
assert_eq!(results.len(), 3);
}
#[test]
fn test_iterator_prefix_compression() {
let buf = build_block(&[
("tempest:key:0001", 1, "a"),
("tempest:key:0002", 1, "b"),
("tempest:key:0003", 1, "c"),
]);
let iter = BlockIterator::<DefaultComparer>::new(buf);
let keys: Vec<_> = iter.map(|(k, _)| k.key().clone()).collect();
assert_eq!(keys[0], "tempest:key:0001");
assert_eq!(keys[1], "tempest:key:0002");
assert_eq!(keys[2], "tempest:key:0003");
}
#[test]
fn test_iterator_across_restart_boundary() {
let mut entries: Vec<(String, u64, String)> = Vec::new();
for i in 0..40u32 {
entries.push((format!("prefix:key:{:04}", i), 1, format!("val:{}", i)));
}
let buf = build_block(&entries);
let results: Vec<_> = BlockIterator::<DefaultComparer>::new(buf)
.map(|(k, v)| (k.key().clone(), v))
.collect();
assert_eq!(results.len(), 40);
for i in 0..40usize {
assert_eq!(
results[i].0,
Bytes::copy_from_slice(format!("prefix:key:{:04}", i).as_bytes())
);
assert_eq!(
results[i].1,
Bytes::copy_from_slice(format!("val:{}", i).as_bytes())
);
}
}
#[test]
fn test_iterator_trailers_preserved() {
let config = StorageConfig::for_testing().sst.write;
let mut builder =
BlockBuilder::new(config.block_target_size, config.block_restart_interval);
let k = Bytes::from("key");
let v = Bytes::from("val");
let trailer = make_trailer(42, KeyKind::Put);
builder.write_entry(&k, trailer, &v);
let buf = builder.finalize().freeze();
let mut iter = BlockIterator::<DefaultComparer>::new(buf);
let (key, _) = iter.next().unwrap();
assert_eq!(key.trailer().seqnum().get(), 42);
assert_eq!(key.trailer().kind(), KeyKind::Put);
assert!(iter.next().is_none());
}
#[test]
fn test_iterator_empty_values() {
let buf = build_block(&[("a", 1, ""), ("b", 1, ""), ("c", 1, "")]);
let results: Vec<_> = BlockIterator::<DefaultComparer>::new(buf)
.map(|(k, v)| (k.key().clone(), v))
.collect();
assert_eq!(results.len(), 3);
for (_, v) in &results {
assert_eq!(*v, Bytes::new());
}
}
#[test]
fn test_iterator_matches_get() {
let mut entries: Vec<(String, u64, String)> = Vec::new();
for i in 0..20u32 {
entries.push((format!("key:{:04}", i), 1, format!("val:{}", i)));
}
let entries_ref: Vec<(&str, u64, &str)> = entries
.iter()
.map(|(k, s, v)| (k.as_str(), *s, v.as_str()))
.collect();
let buf = build_block(&entries_ref);
let reader = BlockReader::<DefaultComparer>::new(buf.clone());
for (key, value) in BlockIterator::<DefaultComparer>::new(buf) {
let found = reader.get(&key.slice_key());
assert_eq!(found.unwrap().1, value);
}
}
#[test]
fn test_get_returns_newest_eligible_version() {
setup_tracing();
let buf = build_block(&[
("apple", 5, "v3"),
("apple", 3, "v2"),
("apple", 1, "v1"),
("banana", 2, "yellow"),
]);
let reader = BlockReader::<DefaultComparer>::new(buf);
assert_eq!(reader.get(&make_key("apple", 5)).unwrap().1, "v3");
assert_eq!(reader.get(&make_key("apple", 4)).unwrap().1, "v2");
assert_eq!(reader.get(&make_key("apple", 3)).unwrap().1, "v2");
assert_eq!(reader.get(&make_key("apple", 2)).unwrap().1, "v1");
assert_eq!(reader.get(&make_key("apple", 1)).unwrap().1, "v1");
assert!(reader.get(&make_key("apple", 0)).is_none());
assert_eq!(reader.get(&make_key("banana", 2)).unwrap().1, "yellow");
}
#[test]
fn test_get_snapshot_across_restart_boundary() {
setup_tracing();
let config = StorageConfig::for_testing().sst.write;
let restart_interval = config.block_restart_interval;
let mut builder = BlockBuilder::new(config.block_target_size, restart_interval);
for i in 0..(restart_interval - 2) {
let k = Bytes::copy_from_slice(format!("aaa:{:04}", i).as_bytes());
let v = Bytes::from("x");
builder.write_entry(&k, make_trailer(1, KeyKind::Put), &v);
}
let k = Bytes::from("zzz");
builder.write_entry(&k, make_trailer(5, KeyKind::Put), "new");
builder.write_entry(&k, make_trailer(3, KeyKind::Put), "mid");
builder.write_entry(&k, make_trailer(1, KeyKind::Put), "old");
let buf = builder.finalize().freeze();
let reader = BlockReader::<DefaultComparer>::new(buf);
assert_eq!(reader.get(&make_key("zzz", 5)).unwrap().1, "new");
assert_eq!(reader.get(&make_key("zzz", 4)).unwrap().1, "mid");
assert_eq!(reader.get(&make_key("zzz", 3)).unwrap().1, "mid");
assert_eq!(reader.get(&make_key("zzz", 2)).unwrap().1, "old");
assert_eq!(reader.get(&make_key("zzz", 1)).unwrap().1, "old");
assert!(reader.get(&make_key("zzz", 0)).is_none());
}
#[test]
fn test_get_does_not_bleed_into_next_user_key() {
let buf = build_block(&[("apple", 5, "fruit"), ("cherry", 5, "red")]);
let reader = BlockReader::<DefaultComparer>::new(buf);
assert!(reader.get(&make_key("banana", 5)).is_none());
}
}