use crate::*;
#[derive(Clone, Debug, Eq, PartialEq, Default, Serialize, Deserialize)]
pub struct Meta {
inner: BTreeMap<Vec<u8>, PageId>,
}
impl Meta {
pub fn get_root(&self, table: &[u8]) -> Option<PageId> {
self.inner.get(table).cloned()
}
pub fn set_root(&mut self, name: Vec<u8>, pid: PageId) -> Option<PageId> {
self.inner.insert(name, pid)
}
pub fn del_root(&mut self, name: &[u8]) -> Option<PageId> {
self.inner.remove(name)
}
pub fn tenants(&self) -> BTreeMap<Vec<u8>, PageId> {
self.inner.clone()
}
pub(crate) fn size_in_bytes(&self) -> u64 {
self.inner
.iter()
.map(|(k, _pid)| {
k.len() as u64 + std::mem::size_of::<PageId>() as u64
})
.sum()
}
}
pub(crate) fn open_tree(
context: &Context,
name: Vec<u8>,
guard: &Guard,
) -> Result<Tree> {
loop {
match context.pagecache.meta_pid_for_name(&name, guard) {
Ok(root_id) => {
return Ok(Tree(Arc::new(TreeInner {
tree_id: name,
context: context.clone(),
subscriptions: Subscriptions::default(),
root: AtomicU64::new(root_id),
concurrency_control: RwLock::new(()),
merge_operator: RwLock::new(None),
})));
}
Err(Error::CollectionNotFound(_)) => {}
Err(other) => return Err(other),
}
let leaf = Frag::leaf(Data::Leaf(vec![]));
let (leaf_id, leaf_ptr) = context.pagecache.allocate(leaf, guard)?;
trace!(
"allocated pid {} for leaf in new_tree for namespace {:?}",
leaf_id,
name
);
let root_index_vec = vec![(prefix::empty().into(), leaf_id)];
let root = Frag::root(Data::Index(root_index_vec));
let (root_id, root_ptr) = context.pagecache.allocate(root, guard)?;
debug!("allocated pid {} for root of new_tree {:?}", root_id, name);
let res = context.pagecache.cas_root_in_meta(
&name,
None,
Some(root_id),
guard,
)?;
if res.is_err() {
let _ = context
.pagecache
.free(root_id, root_ptr, guard)?
.expect("could not free allocated page");
let _ = context
.pagecache
.free(leaf_id, leaf_ptr, guard)?
.expect("could not free allocated page");
continue;
}
return Ok(Tree(Arc::new(TreeInner {
tree_id: name,
subscriptions: Subscriptions::default(),
context: context.clone(),
root: AtomicU64::new(root_id),
concurrency_control: RwLock::new(()),
merge_operator: RwLock::new(None),
})));
}
}