use std::cmp::Ordering;
use std::sync::Arc;
use ruc::*;
use crate::error::{Error, Result};
pub fn decode_varint(data: &[u8]) -> Result<(u32, usize)> {
let mut result: u32 = 0;
let mut shift = 0;
for (i, &byte) in data.iter().enumerate() {
if i >= 5 {
return Err(eg!(Error::Corruption("varint too long".to_string())));
}
result |= ((byte & 0x7F) as u32) << shift;
if byte & 0x80 == 0 {
return Ok((result, i + 1));
}
shift += 7;
}
Err(eg!(Error::Corruption("unterminated varint".to_string())))
}
pub fn encode_varint(buf: &mut [u8], mut value: u32) -> usize {
let mut i = 0;
while value >= 0x80 {
buf[i] = (value as u8) | 0x80;
value >>= 7;
i += 1;
}
buf[i] = value as u8;
i + 1
}
pub fn encode_varint_vec(buf: &mut Vec<u8>, value: u32) {
let mut tmp = [0u8; 5];
let n = encode_varint(&mut tmp, value);
buf.extend_from_slice(&tmp[..n]);
}
pub struct Block {
data: Arc<Vec<u8>>,
restart_offset: usize,
num_restarts: u32,
}
impl Block {
pub fn new(data: Arc<Vec<u8>>) -> Result<Self> {
if data.len() < 4 {
return Err(eg!(Error::Corruption("block too short".to_string())));
}
let num_restarts = u32::from_le_bytes(data[data.len() - 4..].try_into().unwrap());
let restarts_size = (num_restarts as usize) * 4 + 4; if restarts_size > data.len() {
return Err(eg!(Error::Corruption("bad restart count".to_string())));
}
let restart_offset = data.len() - restarts_size;
Ok(Self {
data,
restart_offset,
num_restarts,
})
}
pub fn from_vec(data: Vec<u8>) -> Result<Self> {
Self::new(Arc::new(data))
}
fn restart_point(&self, i: u32) -> u32 {
let offset = self.restart_offset + (i as usize) * 4;
u32::from_le_bytes(self.data[offset..offset + 4].try_into().unwrap())
}
pub fn iter(&self) -> BlockIterator<'_> {
BlockIterator {
block: self,
offset: 0,
key: Vec::new(),
value_start: 0,
value_len: 0,
}
}
pub fn seek_by<F: Fn(&[u8], &[u8]) -> Ordering>(
&self,
target: &[u8],
compare: F,
) -> Option<(Vec<u8>, Vec<u8>)> {
if self.restart_offset == 0 {
return None;
}
let mut left = 0u32;
let mut right = self.num_restarts;
while left < right {
let mid = left + (right - left) / 2;
let rp = self.restart_point(mid) as usize;
match decode_entry_at(&self.data, rp, &[]) {
Some((key, _, _)) => {
if compare(&key, target) == Ordering::Less {
left = mid + 1;
} else {
right = mid;
}
}
None => {
right = mid;
}
}
}
let start = if left > 0 {
self.restart_point(left - 1) as usize
} else {
0
};
let mut offset = start;
let mut current_key = Vec::new();
while offset < self.restart_offset {
match decode_entry_at(&self.data, offset, ¤t_key) {
Some((key, value, next_off)) => {
if compare(&key, target) != Ordering::Less {
return Some((key, value));
}
current_key = key;
offset = next_off;
}
None => break,
}
}
None
}
pub fn seek_for_prev_by<F: Fn(&[u8], &[u8]) -> Ordering>(
&self,
target: &[u8],
compare: F,
) -> Option<(Vec<u8>, Vec<u8>)> {
if self.restart_offset == 0 {
return None;
}
let mut left = 0u32;
let mut right = self.num_restarts;
while left < right {
let mid = left + (right - left) / 2;
let rp = self.restart_point(mid) as usize;
match decode_entry_at(&self.data, rp, &[]) {
Some((key, _, _)) => {
if compare(&key, target) != Ordering::Greater {
left = mid + 1;
} else {
right = mid;
}
}
None => {
right = mid;
}
}
}
let start_restart = if left > 0 { left - 1 } else { 0 };
let start = self.restart_point(start_restart) as usize;
let mut offset = start;
let mut current_key = Vec::new();
let mut best: Option<(Vec<u8>, Vec<u8>)> = None;
while offset < self.restart_offset {
match decode_entry_at(&self.data, offset, ¤t_key) {
Some((key, value, next_off)) => {
if compare(&key, target) != Ordering::Greater {
best = Some((key.clone(), value));
} else {
break;
}
current_key = key;
offset = next_off;
}
None => break,
}
}
best
}
pub fn iter_from_restart(&self, restart_index: u32) -> Vec<(Vec<u8>, Vec<u8>)> {
if restart_index >= self.num_restarts {
return Vec::new();
}
let start = self.restart_point(restart_index) as usize;
let mut offset = start;
let mut current_key = Vec::new();
let mut entries = Vec::new();
while offset < self.restart_offset {
match decode_entry_at(&self.data, offset, ¤t_key) {
Some((key, value, next_off)) => {
entries.push((key.clone(), value));
current_key = key;
offset = next_off;
}
None => break,
}
}
entries
}
pub fn iter_restart_segment(&self, restart_index: u32) -> Vec<(Vec<u8>, Vec<u8>)> {
if restart_index >= self.num_restarts {
return Vec::new();
}
let start = self.restart_point(restart_index) as usize;
let end = if restart_index + 1 < self.num_restarts {
self.restart_point(restart_index + 1) as usize
} else {
self.restart_offset
};
let mut offset = start;
let mut current_key = Vec::new();
let mut entries = Vec::new();
while offset < end {
match decode_entry_at(&self.data, offset, ¤t_key) {
Some((key, value, next_off)) => {
entries.push((key.clone(), value));
current_key = key;
offset = next_off;
}
None => break,
}
}
entries
}
pub fn restart_index_for_offset(&self, offset: usize) -> u32 {
let offset = offset as u32;
let mut left = 0u32;
let mut right = self.num_restarts;
while left < right {
let mid = left + (right - left) / 2;
if self.restart_point(mid) <= offset {
left = mid + 1;
} else {
right = mid;
}
}
left.saturating_sub(1)
}
pub fn first_key_at_restart(&self, restart_index: u32) -> Option<Vec<u8>> {
if restart_index >= self.num_restarts {
return None;
}
let offset = self.restart_point(restart_index) as usize;
decode_entry_at(&self.data, offset, &[]).map(|(key, _, _)| key)
}
pub fn num_restarts(&self) -> u32 {
self.num_restarts
}
pub fn data_end_offset(&self) -> usize {
self.restart_offset
}
pub fn data(&self) -> &[u8] {
&self.data
}
pub fn data_arc(&self) -> &Arc<Vec<u8>> {
&self.data
}
pub fn seek(&self, target: &[u8]) -> Option<(Vec<u8>, Vec<u8>)> {
if self.restart_offset == 0 {
return None;
}
let mut left = 0u32;
let mut right = self.num_restarts;
while left < right {
let mid = left + (right - left) / 2;
let rp = self.restart_point(mid) as usize;
match decode_entry_at(&self.data, rp, &[]) {
Some((key, _, next_off)) => {
let _ = next_off;
if key.as_slice() < target {
left = mid + 1;
} else {
right = mid;
}
}
None => {
right = mid;
}
}
}
let start = if left > 0 {
self.restart_point(left - 1) as usize
} else {
0
};
let mut offset = start;
let mut current_key = Vec::new();
while offset < self.restart_offset {
match decode_entry_at(&self.data, offset, ¤t_key) {
Some((key, value, next_off)) => {
if key.as_slice() >= target {
return Some((key, value));
}
current_key = key;
offset = next_off;
}
None => break,
}
}
None
}
}
fn decode_entry_at(
data: &[u8],
offset: usize,
prev_key: &[u8],
) -> Option<(Vec<u8>, Vec<u8>, usize)> {
let mut pos = offset;
let (shared_len, n) = decode_varint(&data[pos..]).ok()?;
pos += n;
let (unshared_len, n) = decode_varint(&data[pos..]).ok()?;
pos += n;
let (value_len, n) = decode_varint(&data[pos..]).ok()?;
pos += n;
let shared = shared_len as usize;
let unshared = unshared_len as usize;
let vlen = value_len as usize;
if pos + unshared + vlen > data.len() {
return None;
}
if shared > prev_key.len() {
return None;
}
let mut key = Vec::with_capacity(shared + unshared);
key.extend_from_slice(&prev_key[..shared]);
key.extend_from_slice(&data[pos..pos + unshared]);
pos += unshared;
let value = data[pos..pos + vlen].to_vec();
pos += vlen;
Some((key, value, pos))
}
pub(crate) fn decode_entry_reuse(
data: &[u8],
offset: usize,
key_buf: &mut Vec<u8>,
) -> Option<(usize, usize, usize)> {
let mut pos = offset;
let (shared_len, n) = decode_varint(&data[pos..]).ok()?;
pos += n;
let (unshared_len, n) = decode_varint(&data[pos..]).ok()?;
pos += n;
let (value_len, n) = decode_varint(&data[pos..]).ok()?;
pos += n;
let shared = shared_len as usize;
let unshared = unshared_len as usize;
let vlen = value_len as usize;
if pos + unshared + vlen > data.len() {
return None;
}
if shared > key_buf.len() {
return None;
}
key_buf.truncate(shared);
key_buf.extend_from_slice(&data[pos..pos + unshared]);
let value_start = pos + unshared;
let next_offset = value_start + vlen;
Some((value_start, vlen, next_offset))
}
pub struct BlockIterator<'a> {
block: &'a Block,
offset: usize,
key: Vec<u8>,
value_start: usize,
value_len: usize,
}
impl<'a> Iterator for BlockIterator<'a> {
type Item = (Vec<u8>, Vec<u8>);
fn next(&mut self) -> Option<Self::Item> {
if self.offset >= self.block.restart_offset {
return None;
}
match decode_entry_at(&self.block.data, self.offset, &self.key) {
Some((key, value, next_off)) => {
self.key = key.clone();
self.value_start = next_off - value.len();
self.value_len = value.len();
self.offset = next_off;
Some((key, value))
}
None => None,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::sst::block_builder::BlockBuilder;
#[test]
fn test_varint_encode_decode() {
for val in [0, 1, 127, 128, 255, 300, 16383, 16384, 0xFFFFFFFF_u32] {
let mut buf = [0u8; 5];
let n = encode_varint(&mut buf, val);
let (decoded, m) = decode_varint(&buf[..n]).unwrap();
assert_eq!(decoded, val);
assert_eq!(n, m);
}
}
#[test]
fn test_block_build_and_read() {
let mut builder = BlockBuilder::new(16);
let pairs = vec![
(b"aaa".to_vec(), b"val_a".to_vec()),
(b"aab".to_vec(), b"val_ab".to_vec()),
(b"abc".to_vec(), b"val_abc".to_vec()),
(b"abd".to_vec(), b"val_abd".to_vec()),
(b"xyz".to_vec(), b"val_xyz".to_vec()),
];
for (k, v) in &pairs {
builder.add(k, v);
}
let data = builder.finish();
let block = Block::from_vec(data).unwrap();
let entries: Vec<_> = block.iter().collect();
assert_eq!(entries.len(), pairs.len());
for (i, (k, v)) in entries.iter().enumerate() {
assert_eq!(k, &pairs[i].0);
assert_eq!(v, &pairs[i].1);
}
}
#[test]
fn test_block_seek() {
let mut builder = BlockBuilder::new(4);
for i in 0..20 {
let key = format!("key_{:04}", i);
let val = format!("val_{}", i);
builder.add(key.as_bytes(), val.as_bytes());
}
let data = builder.finish();
let block = Block::from_vec(data).unwrap();
let (k, v) = block.seek(b"key_0010").unwrap();
assert_eq!(k, b"key_0010");
assert_eq!(v, b"val_10");
let (k, _v) = block.seek(b"key_0005x").unwrap();
assert_eq!(k, b"key_0006");
assert!(block.seek(b"zzz").is_none());
}
#[test]
fn test_empty_block() {
let builder = BlockBuilder::new(16);
let data = builder.finish();
let block = Block::from_vec(data).unwrap();
let entries: Vec<_> = block.iter().collect();
assert!(entries.is_empty());
assert!(block.seek(b"anything").is_none());
}
#[test]
fn test_single_entry_block() {
let mut builder = BlockBuilder::new(16);
builder.add(b"only_key", b"only_value");
let data = builder.finish();
let block = Block::from_vec(data).unwrap();
let entries: Vec<_> = block.iter().collect();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].0, b"only_key");
assert_eq!(entries[0].1, b"only_value");
let (k, v) = block.seek(b"only_key").unwrap();
assert_eq!(k, b"only_key");
assert_eq!(v, b"only_value");
let (k, _) = block.seek(b"a").unwrap();
assert_eq!(k, b"only_key");
assert!(block.seek(b"zzz").is_none());
}
#[test]
fn test_large_value_block() {
let mut builder = BlockBuilder::new(16);
let large_value = vec![0xAB_u8; 1024 * 1024 + 1]; builder.add(b"big", &large_value);
builder.add(b"small", b"tiny");
let data = builder.finish();
let block = Block::from_vec(data).unwrap();
let entries: Vec<_> = block.iter().collect();
assert_eq!(entries.len(), 2);
assert_eq!(entries[0].0, b"big");
assert_eq!(entries[0].1, large_value);
assert_eq!(entries[1].0, b"small");
assert_eq!(entries[1].1, b"tiny");
let (k, v) = block.seek(b"big").unwrap();
assert_eq!(k, b"big");
assert_eq!(v.len(), 1024 * 1024 + 1);
}
#[test]
fn test_varint_boundary_values() {
let boundary_values: &[u32] = &[127, 128, 16383, 16384];
for &val in boundary_values {
let mut buf = [0u8; 5];
let n = encode_varint(&mut buf, val);
let (decoded, m) = decode_varint(&buf[..n]).unwrap();
assert_eq!(decoded, val, "varint round-trip failed for {}", val);
assert_eq!(n, m, "varint byte count mismatch for {}", val);
match val {
0..=127 => assert_eq!(n, 1, "value {} should be 1 byte", val),
128..=16383 => assert_eq!(n, 2, "value {} should be 2 bytes", val),
16384..=2097151 => assert_eq!(n, 3, "value {} should be 3 bytes", val),
_ => {}
}
}
let mut builder = BlockBuilder::new(1); let key_128 = vec![b'k'; 128];
let val_16384 = vec![b'v'; 16384];
builder.add(&key_128, &val_16384);
let data = builder.finish();
let block = Block::from_vec(data).unwrap();
let entries: Vec<_> = block.iter().collect();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].0.len(), 128);
assert_eq!(entries[0].1.len(), 16384);
}
#[test]
fn test_seek_for_prev_by() {
let mut builder = BlockBuilder::new(4);
for i in 0..20 {
let key = format!("key_{:04}", i * 2); let val = format!("val_{}", i);
builder.add(key.as_bytes(), val.as_bytes());
}
let data = builder.finish();
let block = Block::from_vec(data).unwrap();
let compare = |a: &[u8], b: &[u8]| a.cmp(b);
let (k, v) = block.seek_for_prev_by(b"key_0010", compare).unwrap();
assert_eq!(k, b"key_0010");
assert_eq!(v, b"val_5");
let (k, _) = block.seek_for_prev_by(b"key_0011", compare).unwrap();
assert_eq!(k, b"key_0010");
let (k, _) = block.seek_for_prev_by(b"zzz", compare).unwrap();
assert_eq!(k, b"key_0038");
assert!(block.seek_for_prev_by(b"aaa", compare).is_none());
let (k, v) = block.seek_for_prev_by(b"key_0000", compare).unwrap();
assert_eq!(k, b"key_0000");
assert_eq!(v, b"val_0");
assert!(block.seek_for_prev_by(b"key_", compare).is_none());
}
#[test]
fn test_iter_from_restart() {
let mut builder = BlockBuilder::new(4);
for i in 0..12 {
let key = format!("key_{:04}", i);
let val = format!("val_{}", i);
builder.add(key.as_bytes(), val.as_bytes());
}
let data = builder.finish();
let block = Block::from_vec(data).unwrap();
let entries = block.iter_from_restart(0);
assert_eq!(entries.len(), 12);
assert_eq!(entries[0].0, b"key_0000");
let entries = block.iter_from_restart(1);
assert_eq!(entries.len(), 8); assert_eq!(entries[0].0, b"key_0004");
let entries = block.iter_from_restart(2);
assert_eq!(entries.len(), 4); assert_eq!(entries[0].0, b"key_0008");
let entries = block.iter_from_restart(100);
assert!(entries.is_empty());
}
}