#![forbid(unsafe_code)]
use serde::{Deserialize, Serialize};
use crate::btree::node::{decode_node, NodeKind};
use crate::btree::{choose_child, BTree, MAX_BTREE_DEPTH};
use crate::error::{Error, Result};
use crate::id::{bump_next_id, Id};
use crate::index::{IndexKind, IndexSpec};
use crate::pager::page::PageId;
use crate::pager::{Pager, ReaderSnapshot};
use crate::platform::{FileBackend, FileHandle};
use heapless::Vec as HeaplessVec;
pub const MAX_COLLECTIONS: usize = 1 << 20;
const RESERVED_NEXT_ID_KEY: &[u8] = b"";
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct CollectionDescriptor {
pub collection_id: u32,
pub primary_root: u64,
pub type_version: u32,
pub next_id: u64,
pub indexes: Vec<IndexDescriptor>,
}
impl CollectionDescriptor {
#[must_use]
pub const fn new(collection_id: u32, primary_root: u64, type_version: u32) -> Self {
Self {
collection_id,
primary_root,
type_version,
next_id: 1,
indexes: Vec::new(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct IndexDescriptor {
pub index_id: u32,
pub name: String,
pub kind: IndexKind,
pub key_paths: Vec<String>,
pub root_page_id: u64,
pub status: IndexStatus,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[repr(u8)]
pub enum IndexStatus {
Active = 0,
DroppedPending = 1,
}
#[derive(Debug)]
pub struct Catalog<F: FileBackend = FileHandle> {
tree: BTree<F>,
next_collection_id: u32,
}
impl<F: FileBackend> Catalog<F> {
pub fn open_or_init(pager: &mut Pager<F>) -> Result<Self> {
let raw = pager.root_catalog();
if let Some(existing) = PageId::new(raw) {
return Self::open_existing(pager, existing);
}
Self::init_fresh(pager)
}
fn open_existing(pager: &mut Pager<F>, root: PageId) -> Result<Self> {
let tree = BTree::<F>::open(pager, root)?;
let watermark = match tree.get(pager, RESERVED_NEXT_ID_KEY)? {
Some(bytes) => postcard::from_bytes::<u32>(&bytes).map_err(Error::from)?,
None => {
return Err(Error::Corruption {
page_id: root.get(),
});
}
};
Ok(Self {
tree,
next_collection_id: watermark,
})
}
fn init_fresh(pager: &mut Pager<F>) -> Result<Self> {
let mut tree = BTree::<F>::empty(pager)?;
let watermark: u32 = 1;
let encoded = postcard::to_allocvec(&watermark)?;
tree.insert(pager, RESERVED_NEXT_ID_KEY, &encoded)?;
pager.set_root_catalog(tree.root().get())?;
Ok(Self {
tree,
next_collection_id: watermark,
})
}
pub fn get(&self, pager: &mut Pager<F>, name: &str) -> Result<Option<CollectionDescriptor>> {
validate_name(name)?;
match self.tree.get(pager, name.as_bytes())? {
Some(bytes) => {
let descriptor: CollectionDescriptor =
postcard::from_bytes(&bytes).map_err(Error::from)?;
Ok(Some(descriptor))
}
None => Ok(None),
}
}
pub fn lookup_via_snapshot(
pager: &Pager<F>,
snapshot: &ReaderSnapshot<F>,
name: &str,
) -> Result<Option<CollectionDescriptor>> {
validate_name(name)?;
let Some(root) = PageId::new(snapshot.root_catalog()) else {
return Ok(None);
};
let key = name.as_bytes();
let mut path: HeaplessVec<PageId, MAX_BTREE_DEPTH> = HeaplessVec::new();
let mut current = root;
let leaf_node = loop {
if path.push(current).is_err() {
return Err(Error::BTreeDepthExceeded {
limit: MAX_BTREE_DEPTH,
});
}
let page = snapshot.read_page(pager, current)?;
let decoded = decode_node(page.as_bytes())?;
match decoded.kind {
NodeKind::Leaf => break decoded,
NodeKind::Internal => {
current = choose_child(&decoded, key)?;
}
}
};
for entry in &leaf_node.leaves {
if entry.key.as_slice() == key {
let descriptor: CollectionDescriptor =
postcard::from_bytes(&entry.value).map_err(Error::from)?;
return Ok(Some(descriptor));
}
}
Ok(None)
}
pub fn insert(
&mut self,
pager: &mut Pager<F>,
name: &str,
mut descriptor: CollectionDescriptor,
) -> Result<u32> {
debug_assert!(
pager.in_txn(),
"Catalog::insert must run inside a WAL transaction \
(Pager::begin_txn / WriteTxn::begin)",
);
validate_name(name)?;
if self.tree.get(pager, name.as_bytes())?.is_some() {
return Err(Error::CollectionAlreadyExists {
name: name.to_owned(),
});
}
let assigned = self.next_collection_id;
descriptor.collection_id = assigned;
let new_watermark =
self.next_collection_id
.checked_add(1)
.ok_or_else(|| Error::IdSpaceExhausted {
collection: "<catalog>".to_owned(),
})?;
let encoded = postcard::to_allocvec(&descriptor)?;
self.tree.insert(pager, name.as_bytes(), &encoded)?;
self.persist_next_collection_id(pager, new_watermark)?;
pager.set_root_catalog(self.tree.root().get())?;
self.next_collection_id = new_watermark;
Ok(assigned)
}
pub fn update(
&mut self,
pager: &mut Pager<F>,
name: &str,
descriptor: &CollectionDescriptor,
) -> Result<()> {
debug_assert!(
pager.in_txn(),
"Catalog::update must run inside a WAL transaction",
);
validate_name(name)?;
let existing = self.get(pager, name)?.ok_or(Error::InvalidArgument(
"catalog update: collection not registered",
))?;
if existing.collection_id != descriptor.collection_id {
return Err(Error::Corruption {
page_id: self.tree.root().get(),
});
}
let encoded = postcard::to_allocvec(descriptor)?;
self.tree.delete(pager, name.as_bytes())?;
self.tree.insert(pager, name.as_bytes(), &encoded)?;
pager.set_root_catalog(self.tree.root().get())?;
Ok(())
}
pub fn declare_index(
&mut self,
pager: &mut Pager<F>,
collection: &str,
spec: &IndexSpec,
) -> Result<u32> {
debug_assert!(
pager.in_txn(),
"Catalog::declare_index must run inside a WAL transaction",
);
spec.validate()?;
validate_name(collection)?;
let mut descriptor =
self.get(pager, collection)?
.ok_or_else(|| Error::CollectionNotFound {
name: collection.to_owned(),
})?;
if let Some(existing) = descriptor.indexes.iter().find(|d| d.name == spec.name) {
return Self::reconcile_existing_index(existing, spec);
}
let index_id = next_index_id(&descriptor)?;
let root_page_id = BTree::<F>::empty(pager)?.root().get();
let new_descriptor = IndexDescriptor {
index_id,
name: spec.name.clone(),
kind: spec.kind,
key_paths: spec.key_paths.clone(),
root_page_id,
status: IndexStatus::Active,
};
descriptor.indexes.push(new_descriptor);
self.update(pager, collection, &descriptor)?;
Ok(index_id)
}
fn reconcile_existing_index(existing: &IndexDescriptor, spec: &IndexSpec) -> Result<u32> {
if existing.kind != spec.kind {
return Err(Error::IndexKindMismatch {
name: spec.name.clone(),
expected: spec.kind,
found: existing.kind,
});
}
if existing.key_paths != spec.key_paths {
return Err(Error::IndexKeyPathsMismatch {
name: spec.name.clone(),
});
}
Ok(existing.index_id)
}
pub fn reconcile_indexes(
&mut self,
pager: &mut Pager<F>,
collection: &str,
specs: &[IndexSpec],
) -> Result<Vec<IndexDescriptor>> {
debug_assert!(
pager.in_txn(),
"Catalog::reconcile_indexes must run inside a WAL transaction",
);
validate_name(collection)?;
for spec in specs {
spec.validate()?;
}
for spec in specs {
let _ = self.declare_index(pager, collection, spec)?;
}
let descriptor = self
.get(pager, collection)?
.ok_or_else(|| Error::CollectionNotFound {
name: collection.to_owned(),
})?;
let mut to_drop: Vec<String> = Vec::new();
for d in &descriptor.indexes {
if d.status == IndexStatus::Active && !specs.iter().any(|s| s.name == d.name) {
to_drop.push(d.name.clone());
}
}
for name in to_drop {
self.drop_index(pager, collection, &name)?;
}
let final_descriptor =
self.get(pager, collection)?
.ok_or_else(|| Error::CollectionNotFound {
name: collection.to_owned(),
})?;
Ok(final_descriptor.indexes)
}
pub fn drop_index(
&mut self,
pager: &mut Pager<F>,
collection: &str,
index_name: &str,
) -> Result<()> {
debug_assert!(
pager.in_txn(),
"Catalog::drop_index must run inside a WAL transaction",
);
validate_name(collection)?;
let mut descriptor =
self.get(pager, collection)?
.ok_or_else(|| Error::CollectionNotFound {
name: collection.to_owned(),
})?;
let entry = descriptor
.indexes
.iter_mut()
.find(|d| d.name == index_name)
.ok_or_else(|| Error::IndexNotFound {
collection: collection.to_owned(),
name: index_name.to_owned(),
})?;
if entry.status == IndexStatus::Active {
entry.status = IndexStatus::DroppedPending;
}
self.update(pager, collection, &descriptor)?;
Ok(())
}
pub fn next_id(&mut self, pager: &mut Pager<F>, name: &str) -> Result<Id> {
debug_assert!(
pager.in_txn(),
"Catalog::next_id must run inside a WAL transaction",
);
validate_name(name)?;
let mut descriptor = self.get(pager, name)?.ok_or(Error::InvalidArgument(
"catalog next_id: collection not registered",
))?;
let issued = bump_next_id(&mut descriptor.next_id, || name.to_owned())?;
self.update(pager, name, &descriptor)?;
Ok(issued)
}
pub fn list_collections(
&self,
pager: &mut Pager<F>,
) -> Result<Vec<(String, CollectionDescriptor)>> {
let mut out: Vec<(String, CollectionDescriptor)> = Vec::new();
let mut scanned = 0usize;
let iter = self.tree.range(pager, ..)?;
for entry in iter {
scanned += 1;
if scanned > MAX_COLLECTIONS {
return Err(Error::BTreeScanLimitExceeded {
limit: MAX_COLLECTIONS,
});
}
let (key, value) = entry?;
if key.as_slice() == RESERVED_NEXT_ID_KEY {
continue;
}
let name = String::from_utf8(key).map_err(|_| Error::Corruption {
page_id: self.tree.root().get(),
})?;
let descriptor: CollectionDescriptor =
postcard::from_bytes(&value).map_err(Error::from)?;
out.push((name, descriptor));
}
Ok(out)
}
fn persist_next_collection_id(&mut self, pager: &mut Pager<F>, watermark: u32) -> Result<()> {
let encoded = postcard::to_allocvec(&watermark)?;
self.tree.delete(pager, RESERVED_NEXT_ID_KEY)?;
self.tree.insert(pager, RESERVED_NEXT_ID_KEY, &encoded)?;
Ok(())
}
}
const MAX_COLLECTION_NAME_LEN: usize = 255;
fn validate_name(name: &str) -> Result<()> {
if name.is_empty() {
return Err(Error::InvalidArgument("collection name must be non-empty"));
}
if name.len() > MAX_COLLECTION_NAME_LEN {
return Err(Error::InvalidArgument("collection name exceeds 255 bytes"));
}
if name.chars().any(char::is_control) {
return Err(Error::InvalidArgument(
"collection name must not contain NUL or control characters",
));
}
Ok(())
}
fn next_index_id(descriptor: &CollectionDescriptor) -> Result<u32> {
let max = descriptor
.indexes
.iter()
.map(|d| d.index_id)
.max()
.unwrap_or(0);
max.checked_add(1).ok_or_else(|| Error::IdSpaceExhausted {
collection: format!("<indexes:{}>", descriptor.collection_id),
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::pager::{Config, Pager};
use crate::platform::FileHandle;
fn fresh_pager() -> Pager<FileHandle> {
Pager::<FileHandle>::memory(Config::default()).expect("pager")
}
#[test]
fn validate_name_policy() {
assert!(validate_name("users").is_ok());
let at_cap = "a".repeat(MAX_COLLECTION_NAME_LEN);
assert!(validate_name(&at_cap).is_ok());
assert!(matches!(validate_name(""), Err(Error::InvalidArgument(_))));
let too_long = "a".repeat(MAX_COLLECTION_NAME_LEN + 1);
assert!(matches!(
validate_name(&too_long),
Err(Error::InvalidArgument(_))
));
assert!(matches!(
validate_name("ab\0cd"),
Err(Error::InvalidArgument(_))
));
for bad in ["line\nbreak", "tab\tname", "del\u{7f}name"] {
assert!(
matches!(validate_name(bad), Err(Error::InvalidArgument(_))),
"expected rejection for {bad:?}"
);
}
}
#[test]
fn open_or_init_on_fresh_pager_creates_catalog() {
let mut pager = fresh_pager();
assert_eq!(pager.root_catalog(), 0);
let _catalog = Catalog::<FileHandle>::open_or_init(&mut pager).expect("init catalog");
assert_ne!(pager.root_catalog(), 0, "catalog root must be installed");
}
#[test]
fn insert_and_get_round_trip() {
let mut pager = fresh_pager();
let mut catalog = Catalog::<FileHandle>::open_or_init(&mut pager).expect("init");
let primary_root = BTree::<FileHandle>::empty(&mut pager)
.expect("primary tree")
.root();
let descriptor = CollectionDescriptor::new(0, primary_root.get(), 1);
let assigned = catalog
.insert(&mut pager, "users", descriptor.clone())
.expect("insert users");
assert_eq!(assigned, 1, "first collection gets id 1");
let back = catalog
.get(&mut pager, "users")
.expect("get")
.expect("present");
assert_eq!(back.collection_id, assigned);
assert_eq!(back.primary_root, primary_root.get());
assert_eq!(back.type_version, 1);
assert_eq!(back.next_id, 1);
assert!(back.indexes.is_empty());
}
#[test]
fn duplicate_insert_errors() {
let mut pager = fresh_pager();
let mut catalog = Catalog::<FileHandle>::open_or_init(&mut pager).expect("init");
let primary_root = BTree::<FileHandle>::empty(&mut pager)
.expect("primary tree")
.root();
let descriptor = CollectionDescriptor::new(0, primary_root.get(), 1);
catalog
.insert(&mut pager, "users", descriptor.clone())
.expect("first insert");
let err = catalog
.insert(&mut pager, "users", descriptor)
.expect_err("dup");
assert!(matches!(err, Error::CollectionAlreadyExists { ref name } if name == "users"));
}
#[test]
fn next_id_advances_and_persists() {
let mut pager = fresh_pager();
let mut catalog = Catalog::<FileHandle>::open_or_init(&mut pager).expect("init");
let primary_root = BTree::<FileHandle>::empty(&mut pager)
.expect("primary tree")
.root();
let _id = catalog
.insert(
&mut pager,
"users",
CollectionDescriptor::new(0, primary_root.get(), 1),
)
.expect("insert");
let id1 = catalog.next_id(&mut pager, "users").expect("next 1");
let id2 = catalog.next_id(&mut pager, "users").expect("next 2");
assert_eq!(id1.get(), 1);
assert_eq!(id2.get(), 2);
let descriptor = catalog
.get(&mut pager, "users")
.expect("get")
.expect("present");
assert_eq!(descriptor.next_id, 3, "next_id watermark advanced");
}
#[test]
fn cross_collection_id_isolation() {
let mut pager = fresh_pager();
let mut catalog = Catalog::<FileHandle>::open_or_init(&mut pager).expect("init");
let p1 = BTree::<FileHandle>::empty(&mut pager).expect("p1").root();
let p2 = BTree::<FileHandle>::empty(&mut pager).expect("p2").root();
catalog
.insert(&mut pager, "a", CollectionDescriptor::new(0, p1.get(), 1))
.expect("a");
catalog
.insert(&mut pager, "b", CollectionDescriptor::new(0, p2.get(), 1))
.expect("b");
let _ = catalog.next_id(&mut pager, "a").expect("a1");
let _ = catalog.next_id(&mut pager, "a").expect("a2");
let _ = catalog.next_id(&mut pager, "a").expect("a3");
let a = catalog
.get(&mut pager, "a")
.expect("get a")
.expect("present");
let b = catalog
.get(&mut pager, "b")
.expect("get b")
.expect("present");
assert_eq!(a.next_id, 4);
assert_eq!(b.next_id, 1, "b's next_id unchanged");
}
#[test]
fn list_collections_excludes_reserved_row() {
let mut pager = fresh_pager();
let mut catalog = Catalog::<FileHandle>::open_or_init(&mut pager).expect("init");
let p1 = BTree::<FileHandle>::empty(&mut pager).expect("p1").root();
let p2 = BTree::<FileHandle>::empty(&mut pager).expect("p2").root();
catalog
.insert(
&mut pager,
"alpha",
CollectionDescriptor::new(0, p1.get(), 1),
)
.expect("alpha");
catalog
.insert(
&mut pager,
"beta",
CollectionDescriptor::new(0, p2.get(), 1),
)
.expect("beta");
let listed = catalog.list_collections(&mut pager).expect("list");
assert_eq!(listed.len(), 2);
let names: Vec<&str> = listed.iter().map(|(n, _)| n.as_str()).collect();
assert!(names.contains(&"alpha"));
assert!(names.contains(&"beta"));
}
#[test]
fn reopen_preserves_watermark() {
let mut pager = fresh_pager();
let p1 = BTree::<FileHandle>::empty(&mut pager).expect("p1").root();
let p2 = BTree::<FileHandle>::empty(&mut pager).expect("p2").root();
{
let mut catalog =
Catalog::<FileHandle>::open_or_init(&mut pager).expect("init catalog");
catalog
.insert(
&mut pager,
"users",
CollectionDescriptor::new(0, p1.get(), 1),
)
.expect("users");
catalog
.insert(
&mut pager,
"posts",
CollectionDescriptor::new(0, p2.get(), 1),
)
.expect("posts");
let _ = catalog.next_id(&mut pager, "users").expect("u1");
let _ = catalog.next_id(&mut pager, "users").expect("u2");
}
let catalog = Catalog::<FileHandle>::open_or_init(&mut pager).expect("reopen");
let listed = catalog.list_collections(&mut pager).expect("list");
assert_eq!(listed.len(), 2);
let users = listed
.iter()
.find(|(n, _)| n == "users")
.expect("users present");
assert_eq!(users.1.next_id, 3, "users next_id survived reopen");
let posts = listed
.iter()
.find(|(n, _)| n == "posts")
.expect("posts present");
assert_eq!(posts.1.next_id, 1);
}
#[test]
fn empty_name_rejected() {
let mut pager = fresh_pager();
let mut catalog = Catalog::<FileHandle>::open_or_init(&mut pager).expect("init");
let p1 = BTree::<FileHandle>::empty(&mut pager).expect("p1").root();
let err = catalog
.insert(&mut pager, "", CollectionDescriptor::new(0, p1.get(), 1))
.expect_err("empty name rejected");
assert!(matches!(err, Error::InvalidArgument(_)));
}
}