use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use std::cmp::Ordering;
use std::io::{Cursor, Read};
pub const DEFAULT_RESTART_INTERVAL: usize = 16;
pub const DEFAULT_HASH_BUCKET_RATIO: f64 = 0.75;
#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BlockType {
Uncompressed = 0,
Snappy = 1,
Lz4 = 2,
Zstd = 3,
}
impl TryFrom<u8> for BlockType {
type Error = ();
fn try_from(value: u8) -> Result<Self, Self::Error> {
match value {
0 => Ok(BlockType::Uncompressed),
1 => Ok(BlockType::Snappy),
2 => Ok(BlockType::Lz4),
3 => Ok(BlockType::Zstd),
_ => Err(()),
}
}
}
impl BlockType {
pub fn from_u8(value: u8) -> Self {
Self::try_from(value).unwrap_or(BlockType::Uncompressed)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct BlockHandle {
pub offset: u64,
pub size: u64,
}
impl BlockHandle {
pub fn new(offset: u64, size: u64) -> Self {
Self { offset, size }
}
pub fn offset(&self) -> u64 {
self.offset
}
pub fn size(&self) -> u64 {
self.size
}
pub fn encode(&self) -> Vec<u8> {
let mut buf = Vec::with_capacity(20);
encode_varint(&mut buf, self.offset);
encode_varint(&mut buf, self.size);
buf
}
pub fn decode(data: &[u8]) -> Option<(Self, usize)> {
let mut cursor = Cursor::new(data);
let offset = decode_varint(&mut cursor)?;
let size = decode_varint(&mut cursor)?;
Some((Self { offset, size }, cursor.position() as usize))
}
}
#[derive(Debug, Clone)]
struct RestartPoint {
offset: u32,
}
#[derive(Debug, Clone)]
struct HashBucket {
restart_index: u8,
}
pub struct BlockBuilder {
buffer: Vec<u8>,
restarts: Vec<u32>,
entries_since_restart: usize,
restart_interval: usize,
last_key: Vec<u8>,
entry_count: usize,
use_hash_index: bool,
keys_for_hash: Vec<Vec<u8>>,
}
impl BlockBuilder {
pub fn new(restart_interval: usize) -> Self {
Self {
buffer: Vec::with_capacity(4096),
restarts: vec![0], entries_since_restart: 0,
restart_interval,
last_key: Vec::new(),
entry_count: 0,
use_hash_index: false,
keys_for_hash: Vec::new(),
}
}
pub fn with_hash_index(restart_interval: usize) -> Self {
Self {
use_hash_index: true,
..Self::new(restart_interval)
}
}
pub fn add(&mut self, key: &[u8], value: &[u8]) {
debug_assert!(
self.entry_count == 0 || key > self.last_key.as_slice(),
"Keys must be added in sorted order"
);
let shared = if self.entries_since_restart >= self.restart_interval {
self.restarts.push(self.buffer.len() as u32);
self.entries_since_restart = 0;
0
} else {
self.shared_prefix_len(&self.last_key, key)
};
let non_shared = key.len() - shared;
let value_len = value.len();
encode_varint(&mut self.buffer, shared as u64);
encode_varint(&mut self.buffer, non_shared as u64);
encode_varint(&mut self.buffer, value_len as u64);
self.buffer.extend_from_slice(&key[shared..]);
self.buffer.extend_from_slice(value);
self.last_key.clear();
self.last_key.extend_from_slice(key);
self.entries_since_restart += 1;
self.entry_count += 1;
if self.use_hash_index {
self.keys_for_hash.push(key.to_vec());
}
}
fn shared_prefix_len(&self, a: &[u8], b: &[u8]) -> usize {
let mut shared = 0;
let min_len = a.len().min(b.len());
while shared < min_len && a[shared] == b[shared] {
shared += 1;
}
shared
}
pub fn finish(&mut self) -> Vec<u8> {
let mut result = std::mem::take(&mut self.buffer);
if self.use_hash_index && self.entry_count > 0 {
self.build_hash_index(&mut result);
}
for restart in &self.restarts {
result.write_u32::<LittleEndian>(*restart).unwrap();
}
result
.write_u32::<LittleEndian>(self.restarts.len() as u32)
.unwrap();
result
}
fn build_hash_index(&self, data: &mut Vec<u8>) {
let num_buckets = ((self.entry_count as f64 * DEFAULT_HASH_BUCKET_RATIO) as usize).max(1);
let mut buckets = vec![0xFFu8; num_buckets];
for (key_idx, key) in self.keys_for_hash.iter().enumerate() {
let restart_idx = key_idx / self.restart_interval;
let bucket = Self::hash_key(key) as usize % num_buckets;
let mut probe = bucket;
for _ in 0..num_buckets {
if buckets[probe] == 0xFF {
buckets[probe] = restart_idx as u8;
break;
}
probe = (probe + 1) % num_buckets;
}
}
data.extend_from_slice(&buckets);
data.write_u32::<LittleEndian>(num_buckets as u32).unwrap();
}
fn hash_key(key: &[u8]) -> u32 {
twox_hash::xxh3::hash64(key) as u32
}
pub fn is_empty(&self) -> bool {
self.entry_count == 0
}
pub fn estimated_size(&self) -> usize {
self.buffer.len() + self.restarts.len() * 4 + 4
}
pub fn reset(&mut self) {
self.buffer.clear();
self.restarts.clear();
self.restarts.push(0);
self.entries_since_restart = 0;
self.last_key.clear();
self.entry_count = 0;
self.keys_for_hash.clear();
}
}
impl Default for BlockBuilder {
fn default() -> Self {
Self::new(DEFAULT_RESTART_INTERVAL)
}
}
pub struct Block {
data: Vec<u8>,
restarts_offset: usize,
num_restarts: usize,
num_hash_buckets: usize,
hash_index_offset: usize,
}
impl Block {
pub fn new(data: Vec<u8>) -> Option<Self> {
if data.len() < 4 {
return None;
}
let num_restarts = {
let mut cursor = Cursor::new(&data[data.len() - 4..]);
cursor.read_u32::<LittleEndian>().ok()? as usize
};
if num_restarts == 0 || data.len() < 4 + num_restarts * 4 {
return None;
}
let restarts_offset = data.len() - 4 - num_restarts * 4;
let (num_hash_buckets, hash_index_offset) =
Self::detect_hash_index(&data, restarts_offset, num_restarts);
Some(Self {
data,
restarts_offset,
num_restarts,
num_hash_buckets,
hash_index_offset,
})
}
fn detect_hash_index(
data: &[u8],
restarts_offset: usize,
num_restarts: usize,
) -> (usize, usize) {
if restarts_offset < 4 {
return (0, restarts_offset);
}
let nb_offset = restarts_offset - 4;
let candidate = u32::from_le_bytes([
data[nb_offset],
data[nb_offset + 1],
data[nb_offset + 2],
data[nb_offset + 3],
]) as usize;
if candidate == 0 || candidate > nb_offset {
return (0, restarts_offset);
}
let hash_start = nb_offset - candidate;
let all_valid = data[hash_start..nb_offset]
.iter()
.all(|&b| b == 0xFF || (b as usize) < num_restarts);
if all_valid {
(candidate, hash_start)
} else {
(0, restarts_offset)
}
}
fn restart_offset(&self, index: usize) -> u32 {
debug_assert!(index < self.num_restarts);
let offset = self.restarts_offset + index * 4;
let mut cursor = Cursor::new(&self.data[offset..offset + 4]);
cursor.read_u32::<LittleEndian>().unwrap()
}
pub fn seek(&self, target: &[u8]) -> BlockIterator<'_> {
let mut left = 0;
let mut right = self.num_restarts;
while left < right {
let mid = left + (right - left) / 2;
let offset = self.restart_offset(mid) as usize;
let key = self.read_key_at(offset);
match key.as_slice().cmp(target) {
Ordering::Less => left = mid + 1,
Ordering::Greater => right = mid,
Ordering::Equal => {
return BlockIterator::new(self, offset);
}
}
}
let start_restart = if left > 0 { left - 1 } else { 0 };
let start_offset = self.restart_offset(start_restart) as usize;
let mut iter = BlockIterator::new(self, start_offset);
while iter.valid() {
if iter.key() >= target {
break;
}
iter.next();
}
iter
}
pub fn get(&self, key: &[u8]) -> Option<Vec<u8>> {
let iter = self.seek(key);
if iter.valid() && iter.key() == key {
Some(iter.value().to_vec())
} else {
None
}
}
fn read_key_at(&self, offset: usize) -> Vec<u8> {
let mut cursor = Cursor::new(&self.data[offset..self.hash_index_offset]);
let shared = decode_varint(&mut cursor).unwrap_or(0) as usize;
let non_shared = decode_varint(&mut cursor).unwrap_or(0) as usize;
let _value_len = decode_varint(&mut cursor);
debug_assert_eq!(shared, 0, "Expected restart point (shared = 0)");
let pos = cursor.position() as usize;
self.data[offset + pos..offset + pos + non_shared].to_vec()
}
pub fn iter(&self) -> BlockIterator<'_> {
BlockIterator::new(self, 0)
}
pub fn data(&self) -> &[u8] {
&self.data
}
}
pub struct BlockIterator<'a> {
block: &'a Block,
offset: usize,
key: Vec<u8>,
value_start: usize,
value_len: usize,
valid: bool,
}
impl<'a> BlockIterator<'a> {
pub fn new(block: &'a Block, offset: usize) -> Self {
let mut iter = Self {
block,
offset,
key: Vec::new(),
value_start: 0,
value_len: 0,
valid: false,
};
iter.parse_entry();
iter
}
pub fn valid(&self) -> bool {
self.valid
}
pub fn key(&self) -> &[u8] {
&self.key
}
pub fn value(&self) -> &[u8] {
&self.block.data[self.value_start..self.value_start + self.value_len]
}
pub fn next(&mut self) {
if !self.valid {
return;
}
self.offset = self.value_start + self.value_len;
self.parse_entry();
}
fn parse_entry(&mut self) {
let entries_end = self.block.hash_index_offset;
if self.offset >= entries_end {
self.valid = false;
return;
}
let mut cursor = Cursor::new(&self.block.data[self.offset..entries_end]);
let shared = match decode_varint(&mut cursor) {
Some(v) => v as usize,
None => {
self.valid = false;
return;
}
};
let non_shared = match decode_varint(&mut cursor) {
Some(v) => v as usize,
None => {
self.valid = false;
return;
}
};
let value_len = match decode_varint(&mut cursor) {
Some(v) => v as usize,
None => {
self.valid = false;
return;
}
};
let header_len = cursor.position() as usize;
let data_start = self.offset + header_len;
if data_start + non_shared + value_len > entries_end {
self.valid = false;
return;
}
self.key.truncate(shared);
self.key
.extend_from_slice(&self.block.data[data_start..data_start + non_shared]);
self.value_start = data_start + non_shared;
self.value_len = value_len;
self.valid = true;
}
pub fn seek(&mut self, target: &[u8]) {
let new_iter = self.block.seek(target);
self.offset = new_iter.offset;
self.key = new_iter.key;
self.value_start = new_iter.value_start;
self.value_len = new_iter.value_len;
self.valid = new_iter.valid;
}
pub fn seek_to_first(&mut self) {
self.offset = 0;
self.key.clear();
self.parse_entry();
}
}
fn encode_varint(buf: &mut Vec<u8>, mut value: u64) {
while value >= 0x80 {
buf.push((value as u8) | 0x80);
value >>= 7;
}
buf.push(value as u8);
}
fn decode_varint<R: Read>(reader: &mut R) -> Option<u64> {
let mut result: u64 = 0;
let mut shift = 0;
loop {
let byte = reader.read_u8().ok()?;
result |= ((byte & 0x7F) as u64) << shift;
if byte & 0x80 == 0 {
break;
}
shift += 7;
if shift >= 64 {
return None; }
}
Some(result)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_block_builder_single_entry() {
let mut builder = BlockBuilder::new(16);
builder.add(b"key1", b"value1");
let data = builder.finish();
let block = Block::new(data).unwrap();
assert_eq!(block.get(b"key1"), Some(b"value1".to_vec()));
assert_eq!(block.get(b"key2"), None);
}
#[test]
fn test_block_builder_multiple_entries() {
let mut builder = BlockBuilder::new(4);
for i in 0..20 {
let key = format!("key{:02}", i);
let value = format!("value{:02}", i);
builder.add(key.as_bytes(), value.as_bytes());
}
let data = builder.finish();
let block = Block::new(data).unwrap();
for i in 0..20 {
let key = format!("key{:02}", i);
let expected_value = format!("value{:02}", i);
assert_eq!(block.get(key.as_bytes()), Some(expected_value.into_bytes()));
}
}
#[test]
fn test_block_iterator() {
let mut builder = BlockBuilder::new(4);
builder.add(b"apple", b"1");
builder.add(b"banana", b"2");
builder.add(b"cherry", b"3");
builder.add(b"date", b"4");
let data = builder.finish();
let block = Block::new(data).unwrap();
let mut iter = block.iter();
let mut count = 0;
while iter.valid() {
count += 1;
iter.next();
}
assert_eq!(count, 4);
}
#[test]
fn test_block_seek() {
let mut builder = BlockBuilder::new(2);
builder.add(b"a", b"1");
builder.add(b"c", b"2");
builder.add(b"e", b"3");
builder.add(b"g", b"4");
let data = builder.finish();
let block = Block::new(data).unwrap();
let iter = block.seek(b"c");
assert!(iter.valid());
assert_eq!(iter.key(), b"c");
let iter = block.seek(b"d");
assert!(iter.valid());
assert_eq!(iter.key(), b"e");
let iter = block.seek(b"z");
assert!(!iter.valid());
}
#[test]
fn test_prefix_compression() {
let mut builder = BlockBuilder::new(16);
builder.add(b"user:1000:age", b"30");
builder.add(b"user:1000:email", b"alice@example.com");
builder.add(b"user:1000:name", b"Alice");
builder.add(b"user:1001:name", b"Bob");
let data = builder.finish();
let uncompressed_size = b"user:1000:age".len()
+ b"30".len()
+ b"user:1000:email".len()
+ b"alice@example.com".len()
+ b"user:1000:name".len()
+ b"Alice".len()
+ b"user:1001:name".len()
+ b"Bob".len();
assert!(data.len() < uncompressed_size + 50);
let block = Block::new(data).unwrap();
assert_eq!(block.get(b"user:1000:age"), Some(b"30".to_vec()));
assert_eq!(
block.get(b"user:1000:email"),
Some(b"alice@example.com".to_vec())
);
assert_eq!(block.get(b"user:1000:name"), Some(b"Alice".to_vec()));
assert_eq!(block.get(b"user:1001:name"), Some(b"Bob".to_vec()));
}
#[test]
fn test_varint_encoding() {
let test_values = [0, 1, 127, 128, 255, 256, 16383, 16384, u64::MAX];
for &value in &test_values {
let mut buf = Vec::new();
encode_varint(&mut buf, value);
let mut cursor = Cursor::new(&buf);
let decoded = decode_varint(&mut cursor).unwrap();
assert_eq!(value, decoded, "Failed for value {}", value);
}
}
#[test]
fn test_block_handle() {
let handle = BlockHandle::new(12345, 67890);
let encoded = handle.encode();
let (decoded, len) = BlockHandle::decode(&encoded).unwrap();
assert_eq!(handle, decoded);
assert_eq!(len, encoded.len());
}
}