use ruc::*;
pub const TABLE_MAGIC: u64 = 0x4D4D_4442_5353_5400;
pub const FOOTER_SIZE: usize = 48;
pub const RANGE_DEL_BLOCK_NAME: &str = "rangedelblock";
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct BlockHandle {
pub offset: u64,
pub size: u64,
}
impl BlockHandle {
pub fn new(offset: u64, size: u64) -> Self {
Self { offset, size }
}
pub fn encode(&self) -> [u8; 16] {
let mut buf = [0u8; 16];
buf[0..8].copy_from_slice(&self.offset.to_le_bytes());
buf[8..16].copy_from_slice(&self.size.to_le_bytes());
buf
}
pub fn decode(buf: &[u8]) -> Result<Self> {
if buf.len() < 16 {
return Err(eg!("BlockHandle::decode: need 16 bytes, got {}", buf.len()));
}
let offset = u64::from_le_bytes(buf[0..8].try_into().unwrap());
let size = u64::from_le_bytes(buf[8..16].try_into().unwrap());
Ok(Self { offset, size })
}
}
pub const BLOCK_TRAILER_SIZE: usize = 5;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum CompressionType {
None = 0,
Lz4 = 2,
Zstd = 3,
}
impl CompressionType {
pub fn from_u8(v: u8) -> Option<Self> {
match v {
0 => Some(Self::None),
2 => Some(Self::Lz4),
3 => Some(Self::Zstd),
_ => None,
}
}
}
pub fn encode_footer(
metaindex_handle: &BlockHandle,
index_handle: &BlockHandle,
) -> [u8; FOOTER_SIZE] {
let mut footer = [0u8; FOOTER_SIZE];
let meta_enc = metaindex_handle.encode();
let idx_enc = index_handle.encode();
footer[0..16].copy_from_slice(&meta_enc);
footer[16..32].copy_from_slice(&idx_enc);
footer[40..48].copy_from_slice(&TABLE_MAGIC.to_le_bytes());
footer
}
pub fn decode_footer(data: &[u8; FOOTER_SIZE]) -> crate::error::Result<(BlockHandle, BlockHandle)> {
let magic = u64::from_le_bytes(data[40..48].try_into().unwrap());
if magic != TABLE_MAGIC {
return Err(eg!(crate::error::Error::Corruption(format!(
"invalid SST magic: {:#x}",
magic
))));
}
let metaindex_handle = BlockHandle::decode(&data[0..16]).c(d!())?;
let index_handle = BlockHandle::decode(&data[16..32]).c(d!())?;
Ok((metaindex_handle, index_handle))
}
pub fn encode_index_value(handle: &BlockHandle, first_key: &[u8]) -> Vec<u8> {
let mut buf = Vec::with_capacity(20 + first_key.len());
buf.extend_from_slice(&handle.encode());
buf.extend_from_slice(&(first_key.len() as u32).to_le_bytes());
buf.extend_from_slice(first_key);
buf
}
pub fn encode_index_value_with_props(
handle: &BlockHandle,
first_key: &[u8],
properties: &[(&str, &[u8])],
) -> Vec<u8> {
let mut buf = encode_index_value(handle, first_key);
let num_props = properties.len() as u16;
buf.extend_from_slice(&num_props.to_le_bytes());
for (name, data) in properties {
buf.extend_from_slice(&(name.len() as u16).to_le_bytes());
buf.extend_from_slice(name.as_bytes());
buf.extend_from_slice(&(data.len() as u16).to_le_bytes());
buf.extend_from_slice(data);
}
buf
}
pub struct DecodedIndexValue<'a> {
pub handle: BlockHandle,
pub first_key: Option<&'a [u8]>,
pub properties: Vec<(&'a [u8], &'a [u8])>,
}
pub fn decode_index_value_with_props(data: &[u8]) -> crate::error::Result<DecodedIndexValue<'_>> {
let handle = BlockHandle::decode(data).c(d!())?;
let mut first_key: Option<&[u8]> = None;
let mut props = Vec::new();
if data.len() <= 16 {
return Ok(DecodedIndexValue {
handle,
first_key: None,
properties: props,
});
}
let mut offset = 16;
if data.len() >= offset + 4 {
let fk_len = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap()) as usize;
offset += 4;
if fk_len > 0 && data.len() >= offset + fk_len {
first_key = Some(&data[offset..offset + fk_len]);
offset += fk_len;
}
}
if data.len() >= offset + 2 {
let num_props = u16::from_le_bytes(data[offset..offset + 2].try_into().unwrap()) as usize;
offset += 2;
for _ in 0..num_props {
if data.len() < offset + 2 {
break;
}
let name_len =
u16::from_le_bytes(data[offset..offset + 2].try_into().unwrap()) as usize;
offset += 2;
if data.len() < offset + name_len {
break;
}
let name = &data[offset..offset + name_len];
offset += name_len;
if data.len() < offset + 2 {
break;
}
let data_len =
u16::from_le_bytes(data[offset..offset + 2].try_into().unwrap()) as usize;
offset += 2;
if data.len() < offset + data_len {
break;
}
let prop_data = &data[offset..offset + data_len];
offset += data_len;
props.push((name, prop_data));
}
}
Ok(DecodedIndexValue {
handle,
first_key,
properties: props,
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_block_handle_encode_decode() {
let bh = BlockHandle::new(12345, 6789);
let encoded = bh.encode();
let decoded = BlockHandle::decode(&encoded).unwrap();
assert_eq!(bh, decoded);
}
#[test]
fn test_footer_encode_decode() {
let meta = BlockHandle::new(100, 200);
let idx = BlockHandle::new(300, 400);
let footer = encode_footer(&meta, &idx);
let (m2, i2) = decode_footer(&footer).unwrap();
assert_eq!(meta, m2);
assert_eq!(idx, i2);
}
#[test]
fn test_footer_bad_magic() {
let mut footer = [0u8; FOOTER_SIZE];
footer[40..48].copy_from_slice(&0xDEADBEEF_u64.to_le_bytes());
assert!(decode_footer(&footer).is_err());
}
}