use std::fmt;
use bit_vec::BitVec;
use log::{debug, error};
use super::{
BTreeBasePage, BTreePage, BTreePageID, PageCategory,
EMPTY_PAGE_ID,
};
use crate::{
btree::{
consts::INDEX_SIZE, page_cache::PageCache, tuple::Schema,
},
concurrent_status::Permission,
error::SmallError,
field::{get_type_length, IntField},
io::{SmallReader, SmallWriter, Vaporizable},
transaction::Transaction,
types::SmallResult,
utils::{floor_div, HandyRwLock},
Unique,
};
pub struct BTreeInternalPage {
base: BTreeBasePage,
keys: Vec<IntField>,
children: Vec<BTreePageID>,
slot_count: usize,
header: BitVec<u32>,
children_category: PageCategory,
old_data: Vec<u8>,
}
impl BTreeInternalPage {
fn new(
pid: &BTreePageID,
bytes: &[u8],
tuple_scheme: &Schema,
key_field: usize,
) -> Self {
let mut instance: Self;
if BTreeBasePage::is_empty_page(&bytes) {
instance = Self::new_empty_page(
pid,
bytes,
tuple_scheme,
key_field,
);
} else {
let key_size = get_type_length(
tuple_scheme.fields[key_field].field_type,
);
let slot_count = Self::get_children_cap(key_size) + 1;
let mut reader = SmallReader::new(&bytes);
let category = PageCategory::read_from(&mut reader);
if category != PageCategory::Internal {
panic!(
"The page category of the internal page is not
correct, expect: {:?}, actual: {:?}",
PageCategory::Internal,
category,
);
}
let parent_pid = BTreePageID::new(
PageCategory::Internal,
pid.get_table_id(),
u32::read_from(&mut reader),
);
let children_category =
PageCategory::read_from(&mut reader);
let header = BitVec::read_from(&mut reader);
let mut keys: Vec<IntField> = Vec::new();
keys.push(IntField::new(0));
for _ in 1..slot_count {
let key = IntField::read_from(&mut reader);
keys.push(key);
}
let mut children: Vec<BTreePageID> = Vec::new();
for _ in 0..slot_count {
let child = BTreePageID::new(
children_category,
pid.get_table_id(),
u32::read_from(&mut reader),
);
children.push(child);
}
let mut base = BTreeBasePage::new(pid);
base.set_parent_pid(&parent_pid);
instance = Self {
base,
keys,
children,
slot_count,
header,
children_category,
old_data: Vec::new(),
};
}
instance.set_before_image();
return instance;
}
fn new_empty_page(
pid: &BTreePageID,
bytes: &[u8],
tuple_scheme: &Schema,
key_field: usize,
) -> Self {
let key_size = get_type_length(
tuple_scheme.fields[key_field].field_type,
);
let slot_count = Self::get_children_cap(key_size) + 1;
let mut reader = SmallReader::new(&bytes);
let parent_pid = BTreePageID::new(
PageCategory::Internal,
pid.get_table_id(),
EMPTY_PAGE_ID,
);
let children_category = PageCategory::Leaf;
let mut header = BitVec::new();
header.grow(slot_count, false);
let mut keys: Vec<IntField> = Vec::new();
keys.push(IntField::new(0));
for _ in 1..slot_count {
let key = IntField::read_from(&mut reader);
keys.push(key);
}
let mut children: Vec<BTreePageID> = Vec::new();
for _ in 0..slot_count {
let child = BTreePageID::new(
children_category,
pid.get_table_id(),
u32::read_from(&mut reader),
);
children.push(child);
}
let mut base = BTreeBasePage::new(pid);
base.set_parent_pid(&parent_pid);
Self {
base,
keys,
children,
slot_count,
header,
children_category,
old_data: Vec::new(),
}
}
pub fn get_coresponding_entry(
&self,
left_pid: Option<&BTreePageID>,
right_pid: Option<&BTreePageID>,
) -> Option<Entry> {
let mut it = BTreeInternalPageIterator::new(self);
let mut entry = None;
for e in it.by_ref() {
if let Some(left) = left_pid {
if e.get_left_child() != *left {
continue;
}
}
if let Some(right) = right_pid {
if e.get_right_child() != *right {
continue;
}
}
entry = Some(e);
break;
}
entry
}
pub fn stable(&self) -> bool {
if self.get_parent_pid().category == PageCategory::RootPointer
{
return true;
}
let max_empty_slots =
floor_div(self.get_children_capacity(), 2);
return self.empty_slots_count() <= max_empty_slots;
}
pub fn get_entry(&self, index: usize) -> Option<Entry> {
if self.is_slot_used(index) {
Some(Entry::new(
self.keys[index],
&self.children[index - 1],
&self.children[index],
))
} else {
None
}
}
pub fn delete_key_and_right_child(&mut self, record_id: usize) {
self.mark_slot_status(record_id, false);
}
pub fn delete_key_and_left_child(&mut self, record_id: usize) {
for i in (0..record_id).rev() {
if self.is_slot_used(i) {
self.children[i] = self.children[record_id];
self.mark_slot_status(record_id, false);
return;
}
}
}
pub fn update_entry(&mut self, entry: &Entry) {
let record_id = entry.get_record_id();
for i in (0..record_id).rev() {
if self.is_slot_used(i) {
self.children[i] = entry.get_left_child();
break;
}
}
self.children[record_id] = entry.get_right_child();
self.keys[record_id] = entry.get_key();
}
pub fn is_slot_used(&self, slot_index: usize) -> bool {
self.header[slot_index]
}
fn move_entry(&mut self, from: usize, to: usize) {
if self.is_slot_used(from) && !self.is_slot_used(to) {
self.keys[to] = self.keys[from];
self.children[to] = self.children[from];
self.mark_slot_status(from, false);
self.mark_slot_status(to, true);
} else {
}
}
fn mark_slot_status(&mut self, slot_index: usize, used: bool) {
self.header.set(slot_index, used);
}
pub fn get_child_pid(
&self,
_index: usize,
) -> Option<BTreePageID> {
unimplemented!()
}
pub fn get_first_child_pid(&self) -> BTreePageID {
let mut it = BTreeInternalPageIterator::new(self);
return it.next().unwrap().get_left_child();
}
pub fn get_last_child_pid(&self) -> BTreePageID {
let mut it = BTreeInternalPageIterator::new(self);
return it.next_back().unwrap().get_right_child();
}
pub fn get_left_sibling_pid(
&self,
tx: &Transaction,
) -> Option<BTreePageID> {
let parent_pid = self.get_parent_pid();
let parent_rc = Unique::mut_page_cache()
.get_internal_page(tx, Permission::ReadOnly, &parent_pid)
.unwrap();
let parent = parent_rc.rl();
let it = BTreeInternalPageIterator::new(&parent);
for e in it {
if e.get_right_child() == self.get_pid() {
return Some(e.get_left_child());
}
}
return None;
}
pub fn get_right_sibling_pid(
&self,
tx: &Transaction,
) -> Option<BTreePageID> {
let parent_pid = self.get_parent_pid();
let parent_rc = Unique::mut_page_cache()
.get_internal_page(tx, Permission::ReadOnly, &parent_pid)
.unwrap();
let parent = parent_rc.rl();
let it = BTreeInternalPageIterator::new(&parent);
for e in it {
if e.get_left_child() == self.get_pid() {
return Some(e.get_right_child());
}
}
return None;
}
pub fn get_entry_by_children(
&self,
left_pid: &BTreePageID,
right_pid: &BTreePageID,
) -> Option<Entry> {
let it = BTreeInternalPageIterator::new(self);
for entry in it {
if entry.get_left_child() == *left_pid
&& entry.get_right_child() == *right_pid
{
return Some(entry);
}
}
None
}
pub fn check_integrity(
&self,
parent_pid: &BTreePageID,
lower_bound: Option<IntField>,
upper_bound: Option<IntField>,
check_occupancy: bool,
depth: usize,
) {
assert_eq!(self.get_pid().category, PageCategory::Internal);
assert_eq!(&self.get_parent_pid(), parent_pid);
let mut previous = lower_bound;
let it = BTreeInternalPageIterator::new(self);
for e in it {
if let Some(previous) = previous {
assert!(
previous <= e.get_key(),
"entries are not in order, previous (lower_bound): {}, current entry: {}, current pid: {}, parent pid: {}",
previous,
e,
self.get_pid(),
self.get_parent_pid(),
);
}
previous = Some(e.get_key());
}
if let Some(upper_bound) = upper_bound {
if let Some(previous) = previous {
assert!(previous <= upper_bound);
}
}
if check_occupancy && depth > 0 {
assert!(
self.children_count()
>= Self::get_stable_threshold(4),
"children count: {}, max children: {}, pid: {:?}",
self.children_count(),
Self::get_children_cap(4),
self.get_pid(),
);
}
}
}
impl BTreeInternalPage {
pub fn insert_entry(&mut self, e: &Entry) -> SmallResult {
if self.empty_slots_count() == 0 {
return Err(SmallError::new(
"No empty slots on this page.",
));
}
if self.entries_count() == 0 {
self.children_category = e.get_left_child().category;
self.children[0] = e.get_left_child();
self.children[1] = e.get_right_child();
self.keys[1] = e.get_key();
self.mark_slot_status(0, true);
self.mark_slot_status(1, true);
return Ok(());
}
let mut empty_slot = 0;
for i in 0..self.slot_count {
if !self.is_slot_used(i) {
empty_slot = i;
break;
}
}
let mut slot_just_ahead: usize = usize::MAX;
for i in 0..self.slot_count {
if !self.is_slot_used(i) {
continue;
}
if self.children[i] == e.get_left_child() {
slot_just_ahead = i;
break;
}
if self.children[i] == e.get_right_child() {
slot_just_ahead = i;
self.children[i] = e.get_left_child();
break;
}
}
if slot_just_ahead == usize::MAX {
let e = SmallError::new(&format!(
"No slot found for entry {}, pid: {}, entries count: {}",
e,
self.get_pid(),
self.entries_count()
));
error!("{}", e);
return Err(e);
}
let good_slot: usize;
if empty_slot < slot_just_ahead {
for i in empty_slot..slot_just_ahead {
self.move_entry(i + 1, i);
}
good_slot = slot_just_ahead
} else {
for i in (slot_just_ahead + 1..empty_slot).rev() {
self.move_entry(i, i + 1);
}
good_slot = slot_just_ahead + 1
}
self.keys[good_slot] = e.get_key();
self.children[good_slot] = e.get_right_child();
self.mark_slot_status(good_slot, true);
Ok(())
}
}
impl BTreeInternalPage {
pub fn empty_slots_count(&self) -> usize {
let mut count = 0;
for i in 1..self.slot_count {
if !self.is_slot_used(i) {
count += 1
}
}
count
}
pub fn children_count(&self) -> usize {
self.slot_count - self.empty_slots_count()
}
pub fn entries_count(&self) -> usize {
self.slot_count - self.empty_slots_count() - 1
}
}
impl BTreeInternalPage {
pub fn get_children_capacity(&self) -> usize {
self.slot_count
}
pub fn get_stable_threshold(key_size: usize) -> usize {
floor_div(Self::get_children_cap(key_size), 2)
}
pub fn get_children_cap(key_size: usize) -> usize {
let bits_per_entry_including_header =
key_size * 8 + INDEX_SIZE * 8 + 1;
let extra_bits = (4 * INDEX_SIZE + 2) * 8 + 1;
let entries_per_page = (PageCache::get_page_size() * 8
- extra_bits)
/ bits_per_entry_including_header; return entries_per_page;
}
}
impl BTreePage for BTreeInternalPage {
fn new(
pid: &BTreePageID,
bytes: &[u8],
tuple_scheme: &Schema,
key_field: usize,
) -> Self {
Self::new(pid, bytes, tuple_scheme, key_field)
}
fn get_pid(&self) -> BTreePageID {
self.base.get_pid()
}
fn get_parent_pid(&self) -> BTreePageID {
self.base.get_parent_pid()
}
fn set_parent_pid(&mut self, pid: &BTreePageID) {
self.base.set_parent_pid(pid)
}
fn get_page_data(&self) -> Vec<u8> {
let mut writer = SmallWriter::new();
writer.write(&self.get_pid().category);
writer.write(&self.get_parent_pid().page_index);
writer.write(&self.children_category);
writer.write(&self.header);
for i in 1..self.slot_count {
writer.write(&self.keys[i]);
}
for i in 0..self.slot_count {
writer.write(&self.children[i].page_index);
}
return writer.to_padded_bytes(PageCache::get_page_size());
}
fn set_before_image(&mut self) {
self.old_data = self.get_page_data();
}
fn get_before_image(&self) -> Vec<u8> {
if self.old_data.is_empty() {
panic!("before image is not set");
}
return self.old_data.clone();
}
fn peek(&self) {
debug!("======start=======");
println!("Internal page: {}", self.get_pid());
println!("Parent: {}", self.get_parent_pid());
println!("slots count: {}", self.slot_count);
println!("entries count: {}", self.entries_count());
println!("children category: {:?}", self.children_category);
}
}
#[derive(Clone, Copy, Debug)]
pub struct Entry {
key: IntField,
left: BTreePageID,
right: BTreePageID,
record_id: usize,
}
impl Entry {
pub fn new(
key: IntField,
left: &BTreePageID,
right: &BTreePageID,
) -> Self {
Self {
key,
left: *left,
right: *right,
record_id: 0,
}
}
pub fn set_record_id(&mut self, record_id: usize) {
self.record_id = record_id;
}
pub fn get_record_id(&self) -> usize {
self.record_id
}
pub fn get_key(&self) -> IntField {
self.key
}
pub fn set_key(&mut self, key: IntField) {
self.key = key;
}
pub fn get_left_child(&self) -> BTreePageID {
self.left
}
pub fn get_right_child(&self) -> BTreePageID {
self.right
}
}
impl fmt::Display for Entry {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "({}, {}, {})", self.key, self.left, self.right)
}
}
pub struct BTreeInternalPageIterator<'page> {
page: &'page BTreeInternalPage,
cursor: usize,
left_child_position: usize,
reverse_cursor: usize,
right_child_position: usize,
}
impl<'page> BTreeInternalPageIterator<'page> {
pub fn new(page: &'page BTreeInternalPage) -> Self {
let mut right_child_position = page.slot_count;
loop {
right_child_position -= 1;
if page.is_slot_used(right_child_position) {
break;
}
}
Self {
page,
cursor: 0,
left_child_position: 0,
reverse_cursor: right_child_position,
right_child_position,
}
}
}
impl Iterator for BTreeInternalPageIterator<'_> {
type Item = Entry;
fn next(&mut self) -> Option<Self::Item> {
loop {
self.cursor += 1;
let cursor = self.cursor;
if cursor >= self.page.slot_count {
return None;
}
if !self.page.is_slot_used(cursor) {
continue;
}
let mut e = Entry::new(
self.page.keys[cursor],
&self.page.children[self.left_child_position],
&self.page.children[cursor],
);
e.set_record_id(cursor);
self.left_child_position = cursor;
return Some(e);
}
}
}
impl<'page> DoubleEndedIterator for BTreeInternalPageIterator<'_> {
fn next_back(&mut self) -> Option<Self::Item> {
loop {
if let Some(left_index) =
self.reverse_cursor.checked_sub(1)
{
self.reverse_cursor = left_index;
if !self.page.is_slot_used(left_index) {
continue;
}
let mut e = Entry::new(
self.page.keys[self.right_child_position],
&self.page.children[left_index],
&self.page.children[self.right_child_position],
);
e.set_record_id(self.right_child_position);
self.right_child_position = left_index;
return Some(e);
} else {
return None;
}
}
}
}