use std::cell::RefCell;
use std::rc::{Rc, Weak};
use std::sync::atomic::{AtomicBool, Ordering};
use crate::bucket::Bucket;
use crate::consts::{Flags, MIN_KEYS_PER_PAGE, PGID};
use crate::errors::Error;
use crate::page::{BranchPageElement, LeafPageElement, Page};
use crate::utils::clamp;
use super::{INode, NodeBuilder};
#[derive(Debug)]
pub(crate) struct NodeInner {
pub(super) bucket: *const Bucket,
pub(super) is_leaf: AtomicBool,
pub(super) unbalanced: AtomicBool,
pub(super) spilled: AtomicBool,
pub(super) key: RefCell<Option<Vec<u8>>>,
pub(super) pgid: RefCell<PGID>,
pub(super) parent: RefCell<WeakNode>,
pub(crate) children: RefCell<Vec<Node>>,
pub(crate) inodes: RefCell<Vec<INode>>,
}
#[derive(Clone, Debug)]
pub(crate) struct Node(pub(crate) Rc<NodeInner>);
impl Node {
pub(crate) fn root(&self) -> Node {
match self.parent() {
None => self.clone(),
Some(ref p) => p.root(),
}
}
#[inline]
pub(crate) fn is_leaf(&self) -> bool {
self.0.is_leaf.load(Ordering::Acquire)
}
#[inline]
pub(crate) fn pgid(&self) -> PGID {
*self.0.pgid.borrow()
}
#[inline]
pub(crate) fn parent(&self) -> Option<Node> {
self.0.parent.borrow().upgrade()
}
#[inline]
fn num_children(&self) -> usize {
self.0.inodes.borrow().len()
}
pub fn child_at(&mut self, index: usize) -> Result<Node, Error> {
if self.is_leaf() {
return Err(format!("invalid childAt({}) on a leaf node", index).into());
}
Ok(self
.bucket_mut()
.unwrap()
.node(self.0.inodes.borrow()[index].pgid, WeakNode::from(self)))
}
fn next_sibling(&self) -> Option<Node> {
match self.parent() {
None => None,
Some(mut parent) => {
let index = parent.child_index(self);
if index >= parent.num_children() as isize - 1 {
return None;
}
parent.child_at((index + 1) as usize).ok()
}
}
}
fn prev_sibling(&self) -> Option<Node> {
match self.parent() {
None => None,
Some(mut parent) => {
let index = parent.child_index(self);
if index == 0 {
return None;
}
parent.child_at((index - 1) as usize).ok()
}
}
}
pub(crate) fn split(&mut self, page_size: usize) -> Result<Option<Node>, Error> {
let mut nodes = vec![self.clone()];
while let Some(n) = nodes.last().unwrap().clone().split_two(page_size)? {
nodes.push(n);
}
if nodes.len() == 1 {
return Ok(None);
}
let parent = match self.parent() {
Some(p) => {
let mut children = p.0.children.borrow_mut();
let index = children.iter().position(|ch| Rc::ptr_eq(&self.0, &ch.0));
debug_assert!(index.is_some());
children.remove(index.unwrap());
for node in nodes {
*node.0.parent.borrow_mut() = WeakNode::from(&p);
children.push(node);
}
drop(children);
p
}
None => {
let parent = NodeBuilder::new(self.0.bucket).children(nodes).build();
for ch in &mut *parent.0.children.borrow_mut() {
*ch.0.parent.borrow_mut() = WeakNode::from(&parent);
}
parent
}
};
Ok(Some(parent))
}
pub(super) fn split_two(&mut self, page_size: usize) -> Result<Option<Node>, Error> {
let split_index = {
if self.0.inodes.borrow().len() <= (MIN_KEYS_PER_PAGE * 2)
|| self.size_less_than(page_size)
{
return Ok(None);
}
let fp = clamp(
self.bucket().ok_or("bucket empty")?.fill_percent,
Bucket::MIN_FILL_PERCENT,
Bucket::MAX_FILL_PERCENT,
);
let threshold = (fp * page_size as f64) as usize;
let (split_index, _) = self.split_index(threshold);
split_index
};
let next = NodeBuilder::new(self.0.bucket)
.is_leaf(self.is_leaf())
.build();
let nodes = self.0.inodes.borrow_mut().drain(split_index..).collect();
*next.0.inodes.borrow_mut() = nodes;
self.bucket_mut()
.ok_or("bucket empty")?
.tx()?
.0
.stats
.lock()
.split += 1;
Ok(Some(next))
}
pub fn spill(&mut self) -> Result<(), Error> {
if self.0.spilled.load(Ordering::Acquire) {
return Ok(());
}
let page_size = self.bucket().unwrap().tx()?.db()?.page_size();
{
let mut children = self.0.children.borrow_mut().clone();
children.sort_by(Node::cmp_by_key);
for child in &mut *children {
child.spill()?;
}
self.0.children.borrow_mut().clear();
}
let mut node_parent = None;
{
let mut nodes = match self.split(page_size)? {
None => vec![self.clone()],
Some(p) => {
node_parent = Some(p.clone());
p.0.children.borrow().clone()
}
};
let bucket = self.bucket_mut().unwrap();
let mut tx = bucket.tx()?;
let db = tx.db()?;
let txid = tx.id();
for node in &mut nodes {
{
let node_pgid = *node.0.pgid.borrow_mut();
if node_pgid > 0 {
db.0.freelist
.try_write()
.unwrap()
.free(txid, unsafe { &*tx.page(node_pgid)? })
.unwrap();
*node.0.pgid.borrow_mut() = 0;
}
}
let page = tx.allocate((node.size() / db.page_size()) as u64 + 1)?;
let mut page = unsafe { &mut *page };
{
let id = page.id;
let txpgid = tx.pgid();
if id >= txpgid {
panic!("pgid ({}) above high water mark ({})", id, txid)
}
*node.0.pgid.borrow_mut() = id;
node.write(&mut page);
node.0.spilled.store(true, Ordering::Release);
}
if let Some(mut p) = node.parent() {
let mut okey = node.0.key.borrow().clone();
let nkey = node.0.inodes.borrow()[0].key.to_vec();
if okey.is_none() {
okey = Some(nkey.clone());
}
let pgid = *node.0.pgid.borrow();
p.put(okey.unwrap().as_slice(), &nkey, vec![], pgid, 0);
*node.0.key.borrow_mut() = Some(nkey);
assert!(
!node.0.key.borrow().as_ref().unwrap().is_empty(),
"spill: zero-length node key"
)
}
tx.0.stats.lock().spill += 1;
}
}
{
let spill_parent = match node_parent {
None => None,
Some(p) => {
let pgid_valid = *p.0.pgid.borrow() == 0;
if pgid_valid {
Some(p)
} else {
None
}
}
};
if let Some(parent) = spill_parent {
self.0.children.borrow_mut().clear();
*self = parent;
return self.spill();
}
}
Ok(())
}
pub fn rebalance(&mut self) {
{
let selfsize = self.size();
if !self.0.unbalanced.load(Ordering::Acquire) {
return;
}
self.0.unbalanced.store(false, Ordering::Release);
let threshold = {
let bucket = self.bucket_mut().unwrap();
let tx = bucket.tx().unwrap();
tx.0.stats.lock().rebalance += 1;
tx.db().unwrap().page_size() / 4
};
if selfsize > threshold && self.0.inodes.borrow().len() > self.min_keys() as usize {
return;
}
if self.parent().is_none() {
let mut inodes = self.0.inodes.borrow_mut();
if !self.is_leaf() && inodes.len() == 1 {
let mut child = self
.bucket_mut()
.unwrap()
.node(inodes[0].pgid, WeakNode::from(self));
self.0.is_leaf.store(child.is_leaf(), Ordering::Release);
*inodes = child.0.inodes.borrow_mut().drain(..).collect();
*self.0.children.borrow_mut() =
child.0.children.borrow_mut().drain(..).collect();
{
let inode_pgids = inodes.iter().map(|i| i.pgid);
let bucket = self.bucket_mut().unwrap();
for pgid in inode_pgids {
if let Some(child) = bucket.nodes.borrow_mut().get_mut(&pgid) {
*child.0.parent.borrow_mut() = WeakNode::from(self);
}
}
}
*child.0.parent.borrow_mut() = WeakNode::new();
self.bucket_mut()
.unwrap()
.nodes
.borrow_mut()
.remove(&child.0.pgid.borrow());
child.free();
}
return;
}
}
if self.num_children() == 0 {
let key = self.0.key.borrow().clone().unwrap();
let pgid = *self.0.pgid.borrow();
let mut parent = self.parent().unwrap();
parent.del(&key);
parent.remove_child(self);
self.bucket_mut().unwrap().nodes.borrow_mut().remove(&pgid);
self.free();
parent.rebalance();
return;
}
assert!(
self.parent().unwrap().num_children() > 1,
"parent must have at least 2 children"
);
let (use_next_sibling, mut target) = {
let use_next_sibling = self.parent().unwrap().child_index(self) == 0;
let target = if use_next_sibling {
self.next_sibling().unwrap()
} else {
self.prev_sibling().unwrap()
};
(use_next_sibling, target)
};
if use_next_sibling {
let bucket = self.bucket_mut().unwrap();
for pgid in target.0.inodes.borrow().iter().map(|i| i.pgid) {
if let Some(child) = bucket.nodes.borrow_mut().get_mut(&pgid) {
child.parent().unwrap().remove_child(child);
*child.0.parent.borrow_mut() = WeakNode::from(self);
child
.parent()
.unwrap()
.0
.children
.borrow_mut()
.push(child.clone());
}
}
self.0
.inodes
.borrow_mut()
.append(&mut *target.0.inodes.borrow_mut());
{
let mut parent = self.parent().unwrap();
parent.del(&target.0.key.borrow().as_ref().unwrap());
parent.remove_child(&target);
}
self.bucket_mut()
.unwrap()
.nodes
.borrow_mut()
.remove(&target.pgid());
target.free();
} else {
for pgid in target.0.inodes.borrow().iter().map(|i| i.pgid) {
if let Some(child) = self.bucket_mut().unwrap().nodes.borrow_mut().get_mut(&pgid) {
let mut parent = child.parent().unwrap();
parent.remove_child(&child);
*child.0.parent.borrow_mut() = WeakNode::from(&target);
parent.0.children.borrow_mut().push(child.clone());
}
}
target
.0
.inodes
.borrow_mut()
.append(&mut *self.0.inodes.borrow_mut());
{
let mut parent = self.parent().unwrap();
parent.del(&self.0.key.borrow().as_ref().unwrap());
parent.remove_child(&self);
}
self.bucket_mut()
.unwrap()
.nodes
.borrow_mut()
.remove(&self.pgid());
self.free();
}
self.parent().unwrap().rebalance();
}
fn remove_child(&mut self, target: &Node) {
let index = self
.0
.children
.borrow()
.iter()
.position(|c| Rc::ptr_eq(&target.0, &c.0));
if let Some(i) = index {
self.0.children.borrow_mut().remove(i);
}
}
pub fn free(&mut self) {
if *self.0.pgid.borrow() == 0 {
return;
};
{
let bucketmut = self.bucket_mut().unwrap();
let tx = bucketmut.tx().unwrap();
let txid = tx.id();
let page = unsafe { &*tx.page(*self.0.pgid.borrow()).unwrap() };
let db = tx.db().unwrap();
db.0.freelist.write().free(txid, &page).unwrap();
}
*self.0.pgid.borrow_mut() = 0;
}
pub(super) fn bucket<'a, 'b: 'a>(&'a self) -> Option<&'b Bucket> {
if self.0.bucket.is_null() {
return None;
}
Some(unsafe { &*self.0.bucket })
}
pub(super) fn bucket_mut<'a, 'b: 'a>(&'a self) -> Option<&'b mut Bucket> {
if self.0.bucket.is_null() {
return None;
}
Some(unsafe { &mut *(self.0.bucket as *mut Bucket) })
}
pub fn min_keys(&self) -> u8 {
if self.is_leaf() {
1
} else {
2
}
}
fn page_element_size(&self) -> usize {
if self.is_leaf() {
return LeafPageElement::SIZE;
}
BranchPageElement::SIZE
}
pub(crate) fn size(&self) -> usize {
let mut sz = Page::header_size();
let elsz = self.page_element_size();
for ind in &*self.0.inodes.borrow() {
sz += elsz + ind.key.len() as usize + ind.value.len();
}
sz
}
pub(super) fn size_less_than(&self, v: usize) -> bool {
let mut sz = Page::header_size();
let elsz = self.page_element_size();
for ind in &*self.0.inodes.borrow() {
sz += elsz + ind.key.len() + ind.value.len();
if sz >= v {
return false;
}
}
true
}
pub fn put(&mut self, old_key: &[u8], new_key: &[u8], value: Vec<u8>, pgid: PGID, flags: u32) {
let meta_pgid = self.bucket().unwrap().tx().unwrap().pgid();
if pgid >= meta_pgid {
panic!("pgid ({}) above high water mark ({})", pgid, meta_pgid);
} else if old_key.is_empty() {
panic!("put: zero-length old key")
} else if new_key.is_empty() {
panic!("put: zero-length new key")
}
let mut inodes = self.0.inodes.borrow_mut();
let (exact, index) = match inodes.binary_search_by(|i| i.key.as_slice().cmp(old_key)) {
Ok(n) => (true, n),
Err(n) => (false, n),
};
if !exact {
inodes.insert(index, INode::default());
}
let mut inode = &mut inodes[index];
inode.key = new_key.to_vec();
inode.value = value;
inode.flags = flags;
inode.pgid = pgid;
}
pub fn del(&mut self, key: &[u8]) {
let mut inodes = self.0.inodes.borrow_mut();
let (exact, index) = match inodes.binary_search_by(|i| i.key.as_slice().cmp(key)) {
Ok(n) => (true, n),
Err(n) => (false, n),
};
if !exact {
return;
}
inodes.remove(index);
self.0.unbalanced.store(true, Ordering::Release);
}
pub(crate) fn write(&self, p: &mut Page) {
if self.is_leaf() {
p.flags |= Flags::LEAVES;
} else {
p.flags |= Flags::BRANCHES;
}
let inodes = self.0.inodes.borrow_mut();
if inodes.len() >= 0xFFFF {
panic!("inode overflow: {} (pgid={})", inodes.len(), p.id);
}
p.count = inodes.len() as u16;
if inodes.is_empty() {
return;
}
let mut bptr = unsafe {
let offset = self.page_element_size() * inodes.len();
p.get_data_mut_ptr().add(offset)
};
let is_leaf = self.is_leaf();
let pgid = p.id;
for (i, item) in inodes.iter().enumerate() {
assert!(!item.key.is_empty(), "write: zero-length inode key");
if is_leaf {
let mut elem = p.leaf_page_element_mut(i);
let el_ptr = elem as *const LeafPageElement as *const u8;
elem.pos = unsafe { bptr.sub(el_ptr as usize) } as u32;
elem.flags = item.flags as u32;
elem.ksize = item.key.len() as u32;
elem.vsize = item.value.len() as u32;
} else {
let mut elem = p.branch_page_element_mut(i);
let el_ptr = elem as *const BranchPageElement as *const u8;
elem.pos = unsafe { bptr.sub(el_ptr as usize) } as u32;
elem.ksize = item.key.len() as u32;
elem.pgid = item.pgid;
assert!(elem.pgid != pgid, "write: circular dependency occurred");
}
let (klen, vlen) = (item.key.len(), item.value.len());
unsafe {
std::ptr::copy_nonoverlapping(item.key.as_ptr(), bptr, klen);
bptr = bptr.add(klen);
std::ptr::copy_nonoverlapping(item.value.as_ptr(), bptr, vlen);
bptr = bptr.add(vlen);
}
}
}
pub(super) fn child_index(&self, child: &Node) -> isize {
for (index, node) in self.0.inodes.borrow().iter().enumerate() {
if Some(&node.key) == child.0.key.borrow().as_ref() {
return index as isize;
}
}
-1
}
pub(crate) fn read(&mut self, page: &Page) {
*self.0.pgid.borrow_mut() = page.id;
self.0
.is_leaf
.store(matches!(page.flags, Flags::LEAVES), Ordering::Release);
let mut inodes = Vec::<INode>::with_capacity(page.count as usize);
let is_leaf = self.is_leaf();
for i in 0..page.count as usize {
if is_leaf {
let elem = page.leaf_page_element(i);
let inode = INode {
flags: elem.flags,
key: elem.key().to_vec(),
value: elem.value().to_vec(),
pgid: 0,
};
inodes.push(inode);
} else {
let elem = page.branch_page_element(i);
let inode = INode {
flags: 0,
key: elem.key().to_vec(),
value: Vec::new(),
pgid: elem.pgid,
};
inodes.push(inode);
}
}
*self.0.inodes.borrow_mut() = inodes;
{
let inodes = self.0.inodes.borrow();
*self.0.key.borrow_mut() = if inodes.is_empty() {
None
} else {
Some(inodes[0].key.clone())
};
}
}
pub(super) fn split_index(&self, threshold: usize) -> (usize, usize) {
let mut rindex = 0;
let mut pgsize = Page::header_size();
let inodes = self.0.inodes.borrow();
let pelsize = self.page_element_size();
let max = inodes.len() - MIN_KEYS_PER_PAGE;
for (index, inode) in inodes.iter().enumerate().take(max) {
rindex = index;
let elsize = pelsize + inode.key.len() + inode.value.len();
if index >= MIN_KEYS_PER_PAGE && (pgsize + elsize) > threshold {
break;
}
pgsize += elsize;
}
(rindex, pgsize)
}
fn cmp_by_key(a: &Node, b: &Node) -> std::cmp::Ordering {
a.0.inodes.borrow()[0].key.cmp(&b.0.inodes.borrow()[0].key)
}
}
#[derive(Clone, Default, Debug)]
pub(crate) struct WeakNode(Weak<NodeInner>);
impl WeakNode {
pub(crate) fn new() -> Self {
Self(Weak::new())
}
pub(crate) fn upgrade(&self) -> Option<Node> {
self.0.upgrade().map(Node)
}
pub(crate) fn from(tx: &Node) -> Self {
Self(Rc::downgrade(&tx.0))
}
}