use std::sync::{Arc, RwLock};
use backtrace::Backtrace;
use bit_vec::BitVec;
use super::{
BTreeBasePage, BTreePage, BTreePageID, PageCategory,
EMPTY_PAGE_ID,
};
use crate::{
btree::{
consts::INDEX_SIZE,
page_cache::PageCache,
tuple::{Schema, WrappedTuple},
},
field::IntField,
io::{SmallReader, SmallWriter, Vaporizable},
utils::{ceil_div, HandyRwLock},
Tuple,
};
pub struct BTreeLeafPage {
base: BTreeBasePage,
pub slot_count: usize,
header: BitVec<u32>,
tuples: Vec<Tuple>,
pub tuple_scheme: Schema,
right_sibling_id: u32,
left_sibling_id: u32,
key_field: usize,
old_data: Vec<u8>,
}
impl BTreeLeafPage {
fn new(
pid: &BTreePageID,
bytes: &[u8],
tuple_scheme: &Schema,
key_field: usize,
) -> Self {
let mut instance: Self;
if BTreeBasePage::is_empty_page(&bytes) {
instance = Self::new_empty_page(
pid,
bytes,
tuple_scheme,
key_field,
);
} else {
let slot_count =
Self::calculate_slots_count(&tuple_scheme);
let mut reader = SmallReader::new(&bytes);
let category = reader.read::<PageCategory>();
if category != PageCategory::Leaf {
panic!(
"BTreeLeafPage::new: page category is not leaf, category: {:?}",
category,
);
}
let parent_pid = BTreePageID::new(
PageCategory::Internal,
pid.get_table_id(),
u32::read_from(&mut reader),
);
let left_sibling_id = u32::read_from(&mut reader);
let right_sibling_id = u32::read_from(&mut reader);
let header = BitVec::read_from(&mut reader);
let mut tuples = Vec::new();
for _ in 0..slot_count {
let t = Tuple::read_from(&mut reader, tuple_scheme);
tuples.push(t);
}
let mut base = BTreeBasePage::new(pid);
base.set_parent_pid(&parent_pid);
instance = Self {
base,
slot_count,
header,
tuples,
tuple_scheme: tuple_scheme.clone(),
right_sibling_id,
left_sibling_id,
key_field,
old_data: Vec::new(),
};
}
instance.set_before_image();
return instance;
}
fn new_empty_page(
pid: &BTreePageID,
bytes: &[u8],
tuple_scheme: &Schema,
key_field: usize,
) -> Self {
let slot_count = Self::calculate_slots_count(&tuple_scheme);
let mut reader = SmallReader::new(&bytes);
let parent_pid = BTreePageID::new(
PageCategory::Internal,
pid.get_table_id(),
EMPTY_PAGE_ID,
);
let mut header = BitVec::new();
header.grow(slot_count, false);
let mut tuples = Vec::new();
for _ in 0..slot_count {
let t = Tuple::read_from(&mut reader, tuple_scheme);
tuples.push(t);
}
let mut base = BTreeBasePage::new(pid);
base.set_parent_pid(&parent_pid);
Self {
base,
slot_count,
header,
tuples,
tuple_scheme: tuple_scheme.clone(),
right_sibling_id: EMPTY_PAGE_ID,
left_sibling_id: EMPTY_PAGE_ID,
key_field,
old_data: Vec::new(),
}
}
pub fn set_right_pid(&mut self, pid: Option<BTreePageID>) {
match pid {
Some(pid) => {
self.right_sibling_id = pid.page_index;
}
None => {
self.right_sibling_id = EMPTY_PAGE_ID;
}
}
}
pub fn get_right_pid(&self) -> Option<BTreePageID> {
if self.right_sibling_id == EMPTY_PAGE_ID {
return None;
} else {
return Some(BTreePageID::new(
PageCategory::Leaf,
self.get_pid().table_id,
self.right_sibling_id,
));
}
}
pub fn set_left_pid(&mut self, pid: Option<BTreePageID>) {
match pid {
Some(pid) => {
self.left_sibling_id = pid.page_index;
}
None => {
self.left_sibling_id = EMPTY_PAGE_ID;
}
}
}
pub fn get_left_pid(&self) -> Option<BTreePageID> {
if self.left_sibling_id == EMPTY_PAGE_ID {
return None;
} else {
return Some(BTreePageID::new(
PageCategory::Leaf,
self.get_pid().table_id,
self.left_sibling_id,
));
}
}
pub fn get_slots_count(&self) -> usize {
self.slot_count
}
pub fn stable(&self) -> bool {
if self.get_parent_pid().category == PageCategory::RootPointer
{
return true;
}
let stable_threshold = ceil_div(self.slot_count, 2);
return self.tuples_count() >= stable_threshold;
}
pub fn empty_slots_count(&self) -> usize {
let mut count = 0;
for i in 0..self.slot_count {
if !self.is_slot_used(i) {
count += 1;
}
}
count
}
pub fn tuples_count(&self) -> usize {
self.slot_count - self.empty_slots_count()
}
pub fn calculate_header_size(slot_count: usize) -> usize {
slot_count / 8 + 1
}
pub fn insert_tuple(&mut self, tuple: &Tuple) {
let mut first_empty_slot: i32 = 0;
for i in 0..self.slot_count {
if !self.is_slot_used(i) {
first_empty_slot = i as i32;
break;
}
}
let mut last_less_slot: i32 = -1;
for i in 0..self.slot_count {
if self.is_slot_used(i) {
if self.tuples[i].get_field(self.key_field)
< tuple.get_field(self.key_field)
{
last_less_slot = i as i32;
} else {
break;
}
}
}
let good_slot: usize;
if first_empty_slot < last_less_slot {
for i in first_empty_slot..last_less_slot {
self.move_tuple((i + 1) as usize, i as usize);
}
good_slot = last_less_slot as usize;
} else {
for i in (last_less_slot + 1..first_empty_slot).rev() {
self.move_tuple(i as usize, (i + 1) as usize);
}
good_slot = (last_less_slot + 1) as usize;
}
self.tuples[good_slot] = tuple.clone();
self.mark_slot_status(good_slot, true);
}
fn move_tuple(&mut self, from: usize, to: usize) {
if !self.is_slot_used(from) {
return;
}
self.tuples[to] = self.tuples[from].clone();
self.mark_slot_status(to, true);
self.mark_slot_status(from, false);
}
pub fn get_tuple(&self, slot_index: usize) -> Option<Tuple> {
if self.is_slot_used(slot_index) {
return Some(self.tuples[slot_index].clone());
}
None
}
pub fn delete_tuple(&mut self, slot_index: usize) {
self.mark_slot_status(slot_index, false);
}
pub fn is_slot_used(&self, slot_index: usize) -> bool {
self.header[slot_index]
}
pub fn mark_slot_status(
&mut self,
slot_index: usize,
used: bool,
) {
self.header.set(slot_index, used);
}
pub fn check_integrity(
&self,
parent_pid: &BTreePageID,
lower_bound: Option<IntField>,
upper_bound: Option<IntField>,
check_occupancy: bool,
depth: usize,
) {
let bt = Backtrace::new();
assert_eq!(self.get_pid().category, PageCategory::Leaf);
assert_eq!(
&self.get_parent_pid(),
parent_pid,
"parent pid incorrect, current page: {:?}, actual parent pid: {:?}, expect parent pid: {:?}, backtrace: {:?}",
self.get_pid(),
self.get_parent_pid(),
parent_pid,
bt,
);
let mut previous = lower_bound;
let it = BTreeLeafPageIterator::new(self);
for tuple in it {
if let Some(previous) = previous {
assert!(
previous <= tuple.get_field(self.key_field),
"previous: {:?}, current: {:?}, page_id: {:?}",
previous,
tuple.get_field(self.key_field),
self.get_pid(),
);
}
previous = Some(tuple.get_field(self.key_field));
}
if let Some(upper_bound) = upper_bound {
if let Some(previous) = previous {
assert!(
previous <= upper_bound,
"the last tuple exceeds upper_bound, last tuple: {}, upper bound: {}",
previous,
upper_bound,
);
}
}
if check_occupancy && depth > 0 {
assert!(
self.tuples_count() >= self.get_slots_count() / 2
);
}
}
pub fn iter(&self) -> BTreeLeafPageIterator {
BTreeLeafPageIterator::new(self)
}
}
impl BTreeLeafPage {
pub fn calculate_slots_count(scheme: &Schema) -> usize {
let bits_per_tuple_including_header =
scheme.get_size() * 8 + 1;
let extra_bits = (4 * INDEX_SIZE + 2) * 8;
(PageCache::get_page_size() * 8 - extra_bits)
/ bits_per_tuple_including_header
}
}
impl BTreePage for BTreeLeafPage {
fn new(
pid: &BTreePageID,
bytes: &[u8],
tuple_scheme: &Schema,
key_field: usize,
) -> Self {
Self::new(pid, &bytes, tuple_scheme, key_field)
}
fn get_pid(&self) -> BTreePageID {
self.base.get_pid()
}
fn get_parent_pid(&self) -> BTreePageID {
self.base.get_parent_pid()
}
fn set_parent_pid(&mut self, pid: &BTreePageID) {
self.base.set_parent_pid(pid)
}
fn get_page_data(&self) -> Vec<u8> {
let mut writer = SmallWriter::new();
writer.write(&self.get_pid().category);
writer.write(&self.get_parent_pid().page_index);
writer.write(&self.left_sibling_id);
writer.write(&self.right_sibling_id);
writer.write(&self.header);
for tuple in &self.tuples {
writer.write(tuple);
}
return writer.to_padded_bytes(PageCache::get_page_size());
}
fn set_before_image(&mut self) {
self.old_data = self.get_page_data();
}
fn get_before_image(&self) -> Vec<u8> {
if self.old_data.is_empty() {
panic!("before image is not set");
}
return self.old_data.clone();
}
fn peek(&self) {
println!("page id: {:?}", self.get_pid());
println!("parent id: {:?}", self.get_parent_pid());
println!("left sibling id: {:?}", self.left_sibling_id);
println!("right sibling id: {:?}", self.right_sibling_id);
println!("header: {:?}", self.header);
println!("tuples: {:?}", self.tuples);
}
}
pub struct BTreeLeafPageIteratorRc {
page: Arc<RwLock<BTreeLeafPage>>,
cursor: i32,
reverse_cursor: i32,
}
impl BTreeLeafPageIteratorRc {
pub fn new(page: Arc<RwLock<BTreeLeafPage>>) -> Self {
let slot_count = page.rl().get_slots_count();
Self {
page,
cursor: -1,
reverse_cursor: slot_count as i32,
}
}
}
impl Iterator for BTreeLeafPageIteratorRc {
type Item = WrappedTuple;
fn next(&mut self) -> Option<Self::Item> {
let page = self.page.rl();
loop {
self.cursor += 1;
let cursor = self.cursor as usize;
if cursor >= page.slot_count {
return None;
}
if page.is_slot_used(cursor) {
return Some(WrappedTuple::new(
page.tuples[cursor].clone(),
cursor,
page.get_pid(),
));
}
}
}
}
impl DoubleEndedIterator for BTreeLeafPageIteratorRc {
fn next_back(&mut self) -> Option<Self::Item> {
let page = self.page.rl();
loop {
self.reverse_cursor -= 1;
if self.reverse_cursor < 0 {
return None;
}
let cursor = self.reverse_cursor as usize;
if page.is_slot_used(cursor) {
return Some(WrappedTuple::new(
page.tuples[cursor].clone(),
cursor,
page.get_pid(),
));
}
}
}
}
pub struct BTreeLeafPageIterator<'page> {
page: &'page BTreeLeafPage,
cursor: i32,
reverse_cursor: i32,
}
impl<'page> BTreeLeafPageIterator<'page> {
pub fn new(page: &'page BTreeLeafPage) -> Self {
Self {
page,
cursor: -1,
reverse_cursor: page.slot_count as i32,
}
}
}
impl<'page> Iterator for BTreeLeafPageIterator<'_> {
type Item = WrappedTuple;
fn next(&mut self) -> Option<Self::Item> {
let page = self.page;
loop {
self.cursor += 1;
let cursor = self.cursor as usize;
if cursor >= page.slot_count {
return None;
}
if page.is_slot_used(cursor) {
return Some(WrappedTuple::new(
page.tuples[cursor].clone(),
cursor,
page.get_pid(),
));
}
}
}
}
impl<'page> DoubleEndedIterator for BTreeLeafPageIterator<'_> {
fn next_back(&mut self) -> Option<Self::Item> {
let page = self.page;
loop {
self.reverse_cursor -= 1;
if self.reverse_cursor < 0 {
return None;
}
let cursor = self.reverse_cursor as usize;
if page.is_slot_used(cursor) {
return Some(WrappedTuple::new(
page.tuples[cursor].clone(),
cursor,
page.get_pid(),
));
}
}
}
}