use std::collections::HashMap;
use kimberlite_types::{HASH_LENGTH, Hash, Offset};
use crate::btree::BTreeMeta;
use crate::error::StoreError;
use crate::types::{CRC_SIZE, PAGE_SIZE, PageId, TableId};
const SUPERBLOCK_MAGIC: &[u8; 8] = b"VDBSTORE";
const SUPERBLOCK_VERSION: u32 = 1;
#[allow(dead_code)]
const HEADER_SIZE: usize = 8 + 4 + 8 + HASH_LENGTH + 8 + 4;
#[allow(dead_code)]
const TABLE_ENTRY_SIZE: usize = 8 + 8 + 4;
#[derive(Debug, Clone)]
pub struct Superblock {
pub applied_position: Offset,
pub applied_hash: Hash,
pub next_page_id: PageId,
pub tables: HashMap<TableId, BTreeMeta>,
}
impl Superblock {
pub fn new() -> Self {
Self {
applied_position: Offset::ZERO,
applied_hash: Hash::GENESIS,
next_page_id: PageId::new(1), tables: HashMap::new(),
}
}
pub fn serialize(&self) -> [u8; PAGE_SIZE] {
let mut buf = [0u8; PAGE_SIZE];
let mut offset = 0;
buf[offset..offset + 8].copy_from_slice(SUPERBLOCK_MAGIC);
offset += 8;
buf[offset..offset + 4].copy_from_slice(&SUPERBLOCK_VERSION.to_le_bytes());
offset += 4;
buf[offset..offset + 8].copy_from_slice(&self.applied_position.as_u64().to_le_bytes());
offset += 8;
buf[offset..offset + HASH_LENGTH].copy_from_slice(self.applied_hash.as_bytes());
offset += HASH_LENGTH;
buf[offset..offset + 8].copy_from_slice(&self.next_page_id.as_u64().to_le_bytes());
offset += 8;
buf[offset..offset + 4].copy_from_slice(&(self.tables.len() as u32).to_le_bytes());
offset += 4;
for (table_id, meta) in &self.tables {
buf[offset..offset + 8].copy_from_slice(&table_id.as_u64().to_le_bytes());
offset += 8;
let root_id = meta.root.map_or(u64::MAX, PageId::as_u64);
buf[offset..offset + 8].copy_from_slice(&root_id.to_le_bytes());
offset += 8;
buf[offset..offset + 4].copy_from_slice(&(meta.height as u32).to_le_bytes());
offset += 4;
}
let crc = kimberlite_crypto::crc32(&buf[..PAGE_SIZE - CRC_SIZE]);
buf[PAGE_SIZE - CRC_SIZE..].copy_from_slice(&crc.to_le_bytes());
buf
}
pub fn deserialize(buf: &[u8; PAGE_SIZE]) -> Result<Self, StoreError> {
let stored_crc = u32::from_le_bytes(buf[PAGE_SIZE - CRC_SIZE..].try_into().unwrap());
let computed_crc = kimberlite_crypto::crc32(&buf[..PAGE_SIZE - CRC_SIZE]);
if stored_crc != computed_crc {
return Err(StoreError::SuperblockCorrupted);
}
let mut offset = 0;
let magic: [u8; 8] = buf[offset..offset + 8].try_into().unwrap();
if &magic != SUPERBLOCK_MAGIC {
return Err(StoreError::InvalidSuperblockMagic);
}
offset += 8;
let version = u32::from_le_bytes(buf[offset..offset + 4].try_into().unwrap());
if version != SUPERBLOCK_VERSION {
return Err(StoreError::UnsupportedPageVersion(version as u8));
}
offset += 4;
let applied_position = Offset::new(u64::from_le_bytes(
buf[offset..offset + 8].try_into().unwrap(),
));
offset += 8;
let applied_hash = Hash::from_bytes(buf[offset..offset + HASH_LENGTH].try_into().unwrap());
offset += HASH_LENGTH;
let next_page_id = PageId::new(u64::from_le_bytes(
buf[offset..offset + 8].try_into().unwrap(),
));
offset += 8;
let table_count = u32::from_le_bytes(buf[offset..offset + 4].try_into().unwrap()) as usize;
offset += 4;
let mut tables = HashMap::with_capacity(table_count);
for _ in 0..table_count {
let table_id = TableId::new(u64::from_le_bytes(
buf[offset..offset + 8].try_into().unwrap(),
));
offset += 8;
let root_id = u64::from_le_bytes(buf[offset..offset + 8].try_into().unwrap());
let root = if root_id == u64::MAX {
None
} else {
Some(PageId::new(root_id))
};
offset += 8;
let height = u32::from_le_bytes(buf[offset..offset + 4].try_into().unwrap()) as usize;
offset += 4;
let meta = BTreeMeta { root, height };
tables.insert(table_id, meta);
}
Ok(Self {
applied_position,
applied_hash,
next_page_id,
tables,
})
}
#[allow(dead_code)]
pub fn max_tables() -> usize {
(PAGE_SIZE - HEADER_SIZE - CRC_SIZE) / TABLE_ENTRY_SIZE
}
}
impl Default for Superblock {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod superblock_tests {
use super::*;
#[test]
fn test_empty_superblock_roundtrip() {
let sb = Superblock::new();
let bytes = sb.serialize();
let loaded = Superblock::deserialize(&bytes).unwrap();
assert_eq!(loaded.applied_position, Offset::ZERO);
assert_eq!(loaded.applied_hash, Hash::GENESIS);
assert_eq!(loaded.next_page_id, PageId::new(1));
assert!(loaded.tables.is_empty());
}
#[test]
fn test_superblock_with_tables() {
let mut sb = Superblock::new();
sb.applied_position = Offset::new(100);
sb.next_page_id = PageId::new(50);
sb.tables
.insert(TableId::new(1), BTreeMeta::with_root(PageId::new(10), 2));
sb.tables
.insert(TableId::new(2), BTreeMeta::with_root(PageId::new(20), 3));
sb.tables.insert(TableId::new(3), BTreeMeta::new());
let bytes = sb.serialize();
let loaded = Superblock::deserialize(&bytes).unwrap();
assert_eq!(loaded.applied_position, Offset::new(100));
assert_eq!(loaded.next_page_id, PageId::new(50));
assert_eq!(loaded.tables.len(), 3);
let t1 = loaded.tables.get(&TableId::new(1)).unwrap();
assert_eq!(t1.root, Some(PageId::new(10)));
assert_eq!(t1.height, 2);
let t3 = loaded.tables.get(&TableId::new(3)).unwrap();
assert_eq!(t3.root, None);
}
#[test]
fn test_superblock_corruption_detection() {
let sb = Superblock::new();
let mut bytes = sb.serialize();
bytes[50] ^= 0xFF;
let result = Superblock::deserialize(&bytes);
assert!(matches!(result, Err(StoreError::SuperblockCorrupted)));
}
#[test]
fn test_max_tables() {
assert!(Superblock::max_tables() > 100);
}
}