use crate::btree::block::BlockHandle;
use std::io;
pub const BTREE_MAGIC_NUMBER: u32 = 0x50425449;
pub const BTREE_CURRENT_VERSION: u32 = 1;
pub const BTREE_FOOTER_ENCODED_LENGTH: usize = 52;
#[derive(Debug, Clone, Copy)]
pub struct BloomFilterHandle {
pub offset: u64,
pub size: u32,
pub expected_entries: u64,
}
#[derive(Debug, Clone)]
pub struct BTreeFileFooter {
pub version: u32,
pub bloom_filter_handle: Option<BloomFilterHandle>,
pub index_block_handle: BlockHandle,
pub null_bitmap_handle: Option<BlockHandle>,
}
impl BTreeFileFooter {
pub fn new(
bloom_filter_handle: Option<BloomFilterHandle>,
index_block_handle: BlockHandle,
null_bitmap_handle: Option<BlockHandle>,
) -> Self {
Self {
version: BTREE_CURRENT_VERSION,
bloom_filter_handle,
index_block_handle,
null_bitmap_handle,
}
}
pub fn write_footer(&self) -> [u8; BTREE_FOOTER_ENCODED_LENGTH] {
let mut buf = [0u8; BTREE_FOOTER_ENCODED_LENGTH];
let mut pos = 0;
match &self.bloom_filter_handle {
Some(bf) => {
buf[pos..pos + 8].copy_from_slice(&(bf.offset as i64).to_le_bytes());
pos += 8;
buf[pos..pos + 4].copy_from_slice(&(bf.size as i32).to_le_bytes());
pos += 4;
buf[pos..pos + 8].copy_from_slice(&(bf.expected_entries as i64).to_le_bytes());
pos += 8;
}
None => {
pos += 20;
}
}
buf[pos..pos + 8].copy_from_slice(&(self.index_block_handle.offset as i64).to_le_bytes());
pos += 8;
buf[pos..pos + 4].copy_from_slice(&(self.index_block_handle.size as i32).to_le_bytes());
pos += 4;
match &self.null_bitmap_handle {
Some(nb) => {
buf[pos..pos + 8].copy_from_slice(&(nb.offset as i64).to_le_bytes());
pos += 8;
buf[pos..pos + 4].copy_from_slice(&(nb.size as i32).to_le_bytes());
pos += 4;
}
None => {
pos += 12;
}
}
buf[pos..pos + 4].copy_from_slice(&(self.version as i32).to_le_bytes());
pos += 4;
buf[pos..pos + 4].copy_from_slice(&(BTREE_MAGIC_NUMBER as i32).to_le_bytes());
buf
}
pub fn read_footer(data: &[u8]) -> io::Result<Self> {
if data.len() < BTREE_FOOTER_ENCODED_LENGTH {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"Footer data too short: {} < {}",
data.len(),
BTREE_FOOTER_ENCODED_LENGTH
),
));
}
let version = i32::from_le_bytes([data[44], data[45], data[46], data[47]]) as u32;
let magic = i32::from_le_bytes([data[48], data[49], data[50], data[51]]) as u32;
if magic != BTREE_MAGIC_NUMBER {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"Bad magic number: expected 0x{:08X}, got 0x{:08X}",
BTREE_MAGIC_NUMBER, magic
),
));
}
let mut pos = 0;
let bf_offset = i64::from_le_bytes(data[pos..pos + 8].try_into().unwrap()) as u64;
pos += 8;
let bf_size = i32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as u32;
pos += 4;
let bf_expected = i64::from_le_bytes(data[pos..pos + 8].try_into().unwrap()) as u64;
pos += 8;
let bloom_filter_handle = if bf_offset == 0 && bf_size == 0 && bf_expected == 0 {
None
} else {
Some(BloomFilterHandle {
offset: bf_offset,
size: bf_size,
expected_entries: bf_expected,
})
};
let idx_offset = i64::from_le_bytes(data[pos..pos + 8].try_into().unwrap()) as u64;
pos += 8;
let idx_size = i32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as u32;
pos += 4;
let index_block_handle = BlockHandle::new(idx_offset, idx_size);
let nb_offset = i64::from_le_bytes(data[pos..pos + 8].try_into().unwrap()) as u64;
pos += 8;
let nb_size = i32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as u32;
let null_bitmap_handle = if nb_offset == 0 && nb_size == 0 {
None
} else {
Some(BlockHandle::new(nb_offset, nb_size))
};
Ok(Self {
version,
bloom_filter_handle,
index_block_handle,
null_bitmap_handle,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_footer_roundtrip() {
let footer = BTreeFileFooter::new(
None,
BlockHandle::new(1000, 200),
Some(BlockHandle::new(500, 100)),
);
let encoded = footer.write_footer();
assert_eq!(encoded.len(), BTREE_FOOTER_ENCODED_LENGTH);
let decoded = BTreeFileFooter::read_footer(&encoded).unwrap();
assert_eq!(decoded.version, BTREE_CURRENT_VERSION);
assert!(decoded.bloom_filter_handle.is_none());
assert_eq!(decoded.index_block_handle.offset, 1000);
assert_eq!(decoded.index_block_handle.size, 200);
assert!(decoded.null_bitmap_handle.is_some());
let nb = decoded.null_bitmap_handle.unwrap();
assert_eq!(nb.offset, 500);
assert_eq!(nb.size, 100);
}
#[test]
fn test_footer_with_bloom_filter() {
let footer = BTreeFileFooter::new(
Some(BloomFilterHandle {
offset: 2000,
size: 512,
expected_entries: 10000,
}),
BlockHandle::new(3000, 300),
None,
);
let encoded = footer.write_footer();
let decoded = BTreeFileFooter::read_footer(&encoded).unwrap();
let bf = decoded.bloom_filter_handle.unwrap();
assert_eq!(bf.offset, 2000);
assert_eq!(bf.size, 512);
assert_eq!(bf.expected_entries, 10000);
assert!(decoded.null_bitmap_handle.is_none());
}
#[test]
fn test_footer_bad_magic() {
let mut encoded = BTreeFileFooter::new(None, BlockHandle::new(0, 0), None).write_footer();
encoded[51] = 0xFF;
assert!(BTreeFileFooter::read_footer(&encoded).is_err());
}
}