use std::sync::Arc;
use crate::storage::page::{Page, PageHeader, PageId, PageType, PAGE_HEADER_LEN, PAGE_SIZE};
use crate::storage::v4::store::PageStore;
use crate::{Error, Result};
const ENTRY_COUNT_OFFSET: usize = PAGE_HEADER_LEN;
const NEXT_PAGE_OFFSET: usize = ENTRY_COUNT_OFFSET + 4;
const ENTRIES_OFFSET: usize = NEXT_PAGE_OFFSET + 8;
const ENTRY_FIXED_BYTES: usize = 4 + 2 + 8 + 8 + 8 + 1 + 3;
pub(crate) const ENTRY_FLAG_TOMBSTONE: u8 = 1 << 0;
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct CatalogEntry {
pub(crate) id: u32,
pub(crate) name: String,
pub(crate) leaf_head: u64,
pub(crate) bloom_root: u64,
pub(crate) record_count: u64,
pub(crate) flags: u8,
}
impl CatalogEntry {
#[must_use]
pub(crate) fn new(id: u32, name: impl Into<String>) -> Self {
Self {
id,
name: name.into(),
leaf_head: 0,
bloom_root: 0,
record_count: 0,
flags: 0,
}
}
#[must_use]
pub(crate) const fn is_tombstoned(&self) -> bool {
self.flags & ENTRY_FLAG_TOMBSTONE != 0
}
fn encoded_len(&self) -> usize {
ENTRY_FIXED_BYTES + self.name.len()
}
}
#[derive(Debug, Clone, Default)]
pub(crate) struct Catalog {
entries: Vec<CatalogEntry>,
root_page: Option<PageId>,
}
impl Catalog {
#[must_use]
pub(crate) fn fresh() -> Self {
let mut catalog = Self::default();
catalog.entries.push(CatalogEntry::new(0, ""));
catalog
}
pub(crate) fn load(store: &PageStore, root: PageId) -> Result<Self> {
if root.get() == 0 {
return Ok(Self::fresh());
}
let mut entries: Vec<CatalogEntry> = Vec::new();
let mut current = root;
while current.get() != 0 {
let page = store.read_page(current)?;
page.validate_crc()?;
let bytes = page.as_bytes();
let header = page.header()?;
if header.page_type != PageType::LeafSlotted {
return Err(Error::Corrupted {
offset: 0,
reason: "catalog page has unexpected page type",
});
}
let entry_count = read_u32(bytes, ENTRY_COUNT_OFFSET) as usize;
let next_page = read_u64(bytes, NEXT_PAGE_OFFSET);
let mut cursor = ENTRIES_OFFSET;
for _ in 0..entry_count {
let entry = decode_entry(bytes, &mut cursor)?;
entries.push(entry);
}
current = PageId::new(next_page);
}
if !entries.iter().any(|entry| entry.id == 0) {
entries.insert(0, CatalogEntry::new(0, ""));
}
Ok(Self {
entries,
root_page: Some(root),
})
}
pub(crate) fn save(&mut self, store: &PageStore) -> Result<PageId> {
if self.entries.is_empty() {
return Ok(PageId::new(0));
}
let mut groups: Vec<Vec<&CatalogEntry>> = Vec::new();
let mut current_group: Vec<&CatalogEntry> = Vec::new();
let mut current_bytes = ENTRIES_OFFSET;
for entry in &self.entries {
let entry_len = entry.encoded_len();
if entry_len > PAGE_SIZE - ENTRIES_OFFSET {
return Err(Error::InvalidConfig(
"catalog entry encoded length exceeds single-page capacity",
));
}
if current_bytes + entry_len > PAGE_SIZE {
groups.push(std::mem::take(&mut current_group));
current_bytes = ENTRIES_OFFSET;
}
current_group.push(entry);
current_bytes += entry_len;
}
if !current_group.is_empty() {
groups.push(current_group);
}
let mut page_ids: Vec<PageId> = Vec::with_capacity(groups.len());
for _ in 0..groups.len() {
page_ids.push(store.allocate_page()?);
}
for (idx, group) in groups.iter().enumerate() {
let mut page = Page::new(PageHeader::new(PageType::LeafSlotted));
{
let bytes = page.as_mut_bytes();
write_u32(bytes, ENTRY_COUNT_OFFSET, group.len() as u32);
let next = page_ids
.get(idx + 1)
.copied()
.unwrap_or_else(|| PageId::new(0));
write_u64(bytes, NEXT_PAGE_OFFSET, next.get());
let mut cursor = ENTRIES_OFFSET;
for entry in group {
encode_entry(bytes, &mut cursor, entry);
}
}
let _crc = page.refresh_crc()?;
store.write_page(page_ids[idx], Arc::new(page))?;
}
let root = page_ids[0];
self.root_page = Some(root);
Ok(root)
}
#[must_use]
pub(crate) fn live_count(&self) -> usize {
self.entries.iter().filter(|e| !e.is_tombstoned()).count()
}
#[must_use]
pub(crate) fn raw_count(&self) -> usize {
self.entries.len()
}
pub(crate) fn live_entries(&self) -> impl Iterator<Item = &CatalogEntry> {
self.entries.iter().filter(|e| !e.is_tombstoned())
}
#[must_use]
pub(crate) fn find_by_id(&self, id: u32) -> Option<&CatalogEntry> {
self.entries
.iter()
.find(|e| e.id == id && !e.is_tombstoned())
}
#[must_use]
pub(crate) fn find_by_name(&self, name: &str) -> Option<&CatalogEntry> {
self.entries
.iter()
.find(|e| e.name == name && !e.is_tombstoned())
}
pub(crate) fn create(&mut self, name: impl Into<String>) -> Result<u32> {
let name = name.into();
if self.find_by_name(&name).is_some() {
return Err(Error::InvalidConfig("namespace already exists"));
}
let next_id = self
.entries
.iter()
.map(|e| e.id)
.max()
.unwrap_or(0)
.checked_add(1)
.ok_or(Error::InvalidConfig("namespace id space exhausted"))?;
self.entries.push(CatalogEntry::new(next_id, name));
Ok(next_id)
}
pub(crate) fn tombstone(&mut self, id: u32) -> Result<bool> {
if id == 0 {
return Err(Error::InvalidConfig(
"default namespace cannot be tombstoned",
));
}
for entry in self.entries.iter_mut() {
if entry.id == id && !entry.is_tombstoned() {
entry.flags |= ENTRY_FLAG_TOMBSTONE;
return Ok(true);
}
}
Ok(false)
}
pub(crate) fn set_leaf_head(&mut self, id: u32, head: u64) {
for entry in self.entries.iter_mut() {
if entry.id == id && !entry.is_tombstoned() {
entry.leaf_head = head;
return;
}
}
}
pub(crate) fn set_bloom_root(&mut self, id: u32, root: u64) {
for entry in self.entries.iter_mut() {
if entry.id == id && !entry.is_tombstoned() {
entry.bloom_root = root;
return;
}
}
}
pub(crate) fn set_record_count(&mut self, id: u32, count: u64) {
for entry in self.entries.iter_mut() {
if entry.id == id && !entry.is_tombstoned() {
entry.record_count = count;
return;
}
}
}
pub(crate) fn tombstoned_entries(&self) -> impl Iterator<Item = &CatalogEntry> {
self.entries.iter().filter(|e| e.is_tombstoned())
}
pub(crate) fn remove_tombstoned(&mut self, id: u32) -> Result<bool> {
let position = self.entries.iter().position(|e| e.id == id);
let Some(idx) = position else {
return Ok(false);
};
if !self.entries[idx].is_tombstoned() {
return Err(Error::InvalidConfig(
"remove_tombstoned called on a live entry",
));
}
let _removed = self.entries.remove(idx);
Ok(true)
}
}
fn encode_entry(out: &mut [u8], cursor: &mut usize, entry: &CatalogEntry) {
write_u32(out, *cursor, entry.id);
*cursor += 4;
write_u16(out, *cursor, entry.name.len() as u16);
*cursor += 2;
out[*cursor..*cursor + entry.name.len()].copy_from_slice(entry.name.as_bytes());
*cursor += entry.name.len();
write_u64(out, *cursor, entry.leaf_head);
*cursor += 8;
write_u64(out, *cursor, entry.bloom_root);
*cursor += 8;
write_u64(out, *cursor, entry.record_count);
*cursor += 8;
out[*cursor] = entry.flags;
*cursor += 1;
out[*cursor] = 0;
out[*cursor + 1] = 0;
out[*cursor + 2] = 0;
*cursor += 3;
}
fn decode_entry(bytes: &[u8], cursor: &mut usize) -> Result<CatalogEntry> {
if *cursor + ENTRY_FIXED_BYTES > bytes.len() {
return Err(Error::Corrupted {
offset: *cursor as u64,
reason: "catalog entry truncated",
});
}
let id = read_u32(bytes, *cursor);
*cursor += 4;
let name_len = read_u16(bytes, *cursor) as usize;
*cursor += 2;
if *cursor + name_len + (ENTRY_FIXED_BYTES - 6) > bytes.len() {
return Err(Error::Corrupted {
offset: *cursor as u64,
reason: "catalog entry name extends past page",
});
}
let name_bytes = &bytes[*cursor..*cursor + name_len];
let name = match std::str::from_utf8(name_bytes) {
Ok(s) => s.to_string(),
Err(_invalid_utf8) => {
return Err(Error::Corrupted {
offset: *cursor as u64,
reason: "catalog namespace name is not valid utf-8",
});
}
};
*cursor += name_len;
let leaf_head = read_u64(bytes, *cursor);
*cursor += 8;
let bloom_root = read_u64(bytes, *cursor);
*cursor += 8;
let record_count = read_u64(bytes, *cursor);
*cursor += 8;
let flags = bytes[*cursor];
*cursor += 1;
*cursor += 3;
Ok(CatalogEntry {
id,
name,
leaf_head,
bloom_root,
record_count,
flags,
})
}
fn read_u16(bytes: &[u8], offset: usize) -> u16 {
let mut buf = [0_u8; 2];
buf.copy_from_slice(&bytes[offset..offset + 2]);
u16::from_le_bytes(buf)
}
fn read_u32(bytes: &[u8], offset: usize) -> u32 {
let mut buf = [0_u8; 4];
buf.copy_from_slice(&bytes[offset..offset + 4]);
u32::from_le_bytes(buf)
}
fn read_u64(bytes: &[u8], offset: usize) -> u64 {
let mut buf = [0_u8; 8];
buf.copy_from_slice(&bytes[offset..offset + 8]);
u64::from_le_bytes(buf)
}
fn write_u16(out: &mut [u8], offset: usize, value: u16) {
out[offset..offset + 2].copy_from_slice(&value.to_le_bytes());
}
fn write_u32(out: &mut [u8], offset: usize, value: u32) {
out[offset..offset + 4].copy_from_slice(&value.to_le_bytes());
}
fn write_u64(out: &mut [u8], offset: usize, value: u64) {
out[offset..offset + 8].copy_from_slice(&value.to_le_bytes());
}
#[cfg(test)]
mod tests {
use super::{Catalog, CatalogEntry};
use crate::page_cache::PageCache;
use crate::storage::page::PageId;
use crate::storage::v4::store::PageStore;
use std::sync::Arc;
fn tmp_path(name: &str) -> std::path::PathBuf {
let mut p = std::env::temp_dir();
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_or(0_u128, |d| d.as_nanos());
p.push(format!("emdb-v4-catalog-{name}-{nanos}.emdb"));
p
}
fn open_store(name: &str) -> (PageStore, std::path::PathBuf) {
let path = tmp_path(name);
let cache = Arc::new(PageCache::with_default_capacity());
let store = match PageStore::open(path.clone(), 0, cache) {
Ok(s) => s,
Err(err) => panic!("open should succeed: {err}"),
};
(store, path)
}
#[test]
fn fresh_catalog_only_has_default_namespace() {
let catalog = Catalog::fresh();
assert_eq!(catalog.live_count(), 1);
let default = match catalog.find_by_id(0) {
Some(e) => e,
None => panic!("default namespace should be present"),
};
assert_eq!(default.name, "");
}
#[test]
fn create_assigns_monotonic_ids() {
let mut catalog = Catalog::fresh();
let users_id = match catalog.create("users") {
Ok(id) => id,
Err(err) => panic!("create should succeed: {err}"),
};
let sessions_id = match catalog.create("sessions") {
Ok(id) => id,
Err(err) => panic!("create should succeed: {err}"),
};
assert_eq!(users_id, 1);
assert_eq!(sessions_id, 2);
}
#[test]
fn create_rejects_duplicate_names() {
let mut catalog = Catalog::fresh();
let _ = catalog.create("users");
let dup = catalog.create("users");
assert!(dup.is_err());
}
#[test]
fn cannot_tombstone_default_namespace() {
let mut catalog = Catalog::fresh();
let result = catalog.tombstone(0);
assert!(result.is_err());
}
#[test]
fn tombstone_hides_entry_from_lookups() {
let mut catalog = Catalog::fresh();
let id = match catalog.create("temp") {
Ok(id) => id,
Err(err) => panic!("create should succeed: {err}"),
};
let by_name = catalog.find_by_name("temp");
assert!(by_name.is_some());
assert!(matches!(catalog.tombstone(id), Ok(true)));
let by_name = catalog.find_by_name("temp");
assert!(by_name.is_none());
let by_id = catalog.find_by_id(id);
assert!(by_id.is_none());
}
#[test]
fn tombstone_unknown_id_reports_false() {
let mut catalog = Catalog::fresh();
assert!(matches!(catalog.tombstone(99), Ok(false)));
}
#[test]
fn save_then_load_round_trips_via_page_store() {
let (store, path) = open_store("round-trip");
let mut catalog = Catalog::fresh();
let _ = catalog.create("users");
let _ = catalog.create("sessions");
catalog.set_leaf_head(1, 42);
catalog.set_bloom_root(2, 7);
catalog.set_record_count(1, 1000);
let root = match catalog.save(&store) {
Ok(r) => r,
Err(err) => panic!("save should succeed: {err}"),
};
assert_ne!(root.get(), 0);
let _ = store.flush();
let reloaded = match Catalog::load(&store, root) {
Ok(c) => c,
Err(err) => panic!("load should succeed: {err}"),
};
assert_eq!(reloaded.live_count(), 3);
let users = match reloaded.find_by_name("users") {
Some(e) => e,
None => panic!("users entry missing after reload"),
};
assert_eq!(users.id, 1);
assert_eq!(users.leaf_head, 42);
assert_eq!(users.record_count, 1000);
let sessions = match reloaded.find_by_name("sessions") {
Some(e) => e,
None => panic!("sessions entry missing after reload"),
};
assert_eq!(sessions.id, 2);
assert_eq!(sessions.bloom_root, 7);
let _removed = std::fs::remove_file(&path);
}
#[test]
fn save_with_zero_namespaces_returns_zero_root() {
let (store, path) = open_store("empty");
let mut empty = Catalog::default();
let root = match empty.save(&store) {
Ok(r) => r,
Err(err) => panic!("save should succeed: {err}"),
};
assert_eq!(root.get(), 0);
let _removed = std::fs::remove_file(&path);
}
#[test]
fn load_from_zero_root_returns_fresh_catalog() {
let (store, path) = open_store("from-zero");
let catalog = match Catalog::load(&store, PageId::new(0)) {
Ok(c) => c,
Err(err) => panic!("load should succeed: {err}"),
};
assert_eq!(catalog.live_count(), 1);
let _removed = std::fs::remove_file(&path);
}
#[test]
fn live_entries_skips_tombstones() {
let mut catalog = Catalog::fresh();
let temp_id = match catalog.create("temp") {
Ok(id) => id,
Err(err) => panic!("create should succeed: {err}"),
};
let _ = catalog.create("keep");
let _ = catalog.tombstone(temp_id);
let names: Vec<String> = catalog.live_entries().map(|e| e.name.clone()).collect();
assert_eq!(names.len(), 2); assert!(names.iter().any(|n| n.is_empty()));
assert!(names.iter().any(|n| n == "keep"));
}
#[test]
fn save_chains_pages_when_catalog_overflows_one_page() {
let (store, path) = open_store("chain");
let mut catalog = Catalog::fresh();
let big_name = "n".repeat(200);
for i in 0..30_u32 {
let _ = catalog.create(format!("{}{i}", big_name));
}
let root = match catalog.save(&store) {
Ok(r) => r,
Err(err) => panic!("save should succeed: {err}"),
};
let _ = store.flush();
let reloaded = match Catalog::load(&store, root) {
Ok(c) => c,
Err(err) => panic!("load should succeed: {err}"),
};
assert_eq!(reloaded.live_count(), 31);
let _removed = std::fs::remove_file(&path);
}
#[test]
fn entry_new_constructor_initialises_zero_metadata() {
let entry = CatalogEntry::new(7, "thing");
assert_eq!(entry.id, 7);
assert_eq!(entry.name, "thing");
assert_eq!(entry.leaf_head, 0);
assert_eq!(entry.bloom_root, 0);
assert_eq!(entry.record_count, 0);
assert_eq!(entry.flags, 0);
assert!(!entry.is_tombstoned());
}
}