use super::{
buffer_pool::BufferPool,
page::{
BTreeLeafPage, BTreeLeafPageReverseIterator, BTreePageID,
BTreeRootPointerPage, Entry,
},
};
use crate::btree::page::PageCategory;
use super::consts::PAGE_SIZE;
use core::fmt;
use log::{debug, info};
use std::{borrow::Borrow, cell::Cell};
use std::{
cell::RefCell,
collections::hash_map::DefaultHasher,
fs::{File, OpenOptions},
hash::{Hash, Hasher},
io::{Seek, SeekFrom, Write},
rc::Rc,
usize,
};
use std::cell::RefMut;
use super::{
page::BTreeInternalPage,
tuple::{Tuple, TupleScheme},
};
pub struct BTreeTable {
file_path: String,
pub key_field: usize,
pub tuple_scheme: TupleScheme,
file: RefCell<File>,
table_id: i32,
page_index: Cell<usize>,
}
impl fmt::Display for BTreeTable {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"<BTreeFile, file: {}, id: {}>",
self.file_path, self.table_id
)
}
}
impl BTreeTable {
pub fn new(file_path: &str, key_field: usize, row_scheme: TupleScheme) -> BTreeTable {
File::create(file_path).expect("io error");
let f = RefCell::new(
OpenOptions::new()
.write(true)
.read(true)
.open(file_path)
.unwrap(),
);
let mut hasher = DefaultHasher::new();
file_path.hash(&mut hasher);
let table_id = hasher.finish() as i32;
Self::file_init(f.borrow_mut(), table_id);
BTreeTable {
file_path: file_path.to_string(),
key_field,
tuple_scheme: row_scheme,
file: f,
table_id,
page_index: Cell::new(1),
}
}
pub fn get_id(&self) -> i32 {
self.table_id
}
pub fn insert_tuple(&self, tuple: Tuple) {
let root_pid = self.get_root_pid();
let container = self.find_leaf_page(root_pid, tuple.get_field(self.key_field).value);
let mut leaf_page = (*container).borrow_mut();
if leaf_page.empty_slots_count() == 0 {
info!(
"page full: {}, empty slots: {}",
leaf_page.page_id.borrow(),
leaf_page.empty_slots_count()
);
info!("page split");
let new_container = self.split_leaf_page(leaf_page, self.key_field);
let mut leaf_page = (*new_container).borrow_mut();
leaf_page.insert_tuple(&tuple);
} else {
leaf_page.insert_tuple(&tuple);
}
}
pub fn split_leaf_page(
&self,
mut page: RefMut<BTreeLeafPage>,
key_field: usize,
) -> Rc<RefCell<BTreeLeafPage>> {
let new_page_id = BTreePageID::new(
PageCategory::Leaf,
self.table_id,
self.get_empty_page_index(),
);
let new_page = BTreeLeafPage::new(
&new_page_id,
BTreeLeafPage::empty_page_data().to_vec(),
page.tuple_scheme.clone(),
);
self.write_page(&new_page_id.borrow());
BufferPool::global().put_leaf_page(
&mut self.get_file(),
new_page_id,
Rc::new(RefCell::new(new_page)),
);
let new_page_ref = BufferPool::global().get_leaf_page(&new_page_id).unwrap();
let mut new_page = (*new_page_ref).borrow_mut();
let tuple_count = page.tuples_count();
let move_tuple_count = tuple_count / 2;
let move_start = tuple_count - move_tuple_count;
let mut it = BTreeLeafPageReverseIterator::new(&page);
let mut delete_indexes: Vec<usize> = Vec::new();
let mut key = 0;
for i in move_start..tuple_count {
let tuple = it.next().unwrap();
delete_indexes.push(i);
new_page.insert_tuple(&tuple);
if i == tuple_count - 1 {
key = tuple.get_field(key_field).value;
}
}
for i in &delete_indexes {
page.delete_tuple(i);
}
info!(
"move {} tuples to new page, expect: {}, new page has {} empty slots now",
delete_indexes.len(),
move_tuple_count,
new_page.empty_slots_count()
);
if page.empty_slots_count() != delete_indexes.len() {
panic!("{}", page.empty_slots_count());
}
let parent_ref = self.get_parent_with_empty_slots(page.get_parent_id());
let mut parent = (*parent_ref).borrow_mut();
let entry = Entry::new(key, &page.page_id.borrow(), &new_page_id);
parent.insert_entry(&entry);
page.set_parent_id(&parent.get_id());
new_page.set_parent_id(&parent.get_id());
let v = BufferPool::global().get_leaf_page(&new_page_id.borrow());
v.unwrap()
}
fn get_empty_page_index(&self) -> usize {
let index = self.page_index.get() + 1;
self.page_index.set(index);
index
}
fn get_parent_with_empty_slots(
&self,
parent_id: BTreePageID,
) -> Rc<RefCell<BTreeInternalPage>> {
match parent_id.category {
PageCategory::RootPointer => {
let empty_page_index = self.get_empty_page_index();
let new_parent_id =
BTreePageID::new(PageCategory::Internal, self.table_id, empty_page_index);
self.write_page(&new_parent_id);
let page_id = BTreePageID::new(PageCategory::RootPointer, self.table_id, 0);
let root_pointer_page = BufferPool::global()
.get_root_pointer_page(&page_id)
.unwrap();
(*root_pointer_page)
.borrow_mut()
.set_root_pid(&new_parent_id);
let v = BufferPool::global().get_internal_page(&new_parent_id);
return v.unwrap();
}
PageCategory::Internal => {
let page_ref = BufferPool::global().get_internal_page(&parent_id).unwrap();
let page = (*page_ref).borrow();
if page.empty_slots_count() > 0 {
return Rc::clone(&page_ref);
} else {
todo!()
}
}
_ => {
todo!()
}
}
}
pub fn find_leaf_page(&self, page_id: BTreePageID, field: i32) -> Rc<RefCell<BTreeLeafPage>> {
match page_id.category {
PageCategory::Leaf => {
return BufferPool::global().get_leaf_page(&page_id).unwrap();
}
PageCategory::Internal => {
let page_ref = BufferPool::global().get_internal_page(&page_id).unwrap();
let page = (*page_ref).borrow();
for entry in page.get_entries() {
if entry.key >= field {
let left = entry.get_left_child();
return BufferPool::global().get_leaf_page(&left).unwrap();
}
}
let last_entry = page.get_last_entry();
let right = last_entry.get_right_child();
return BufferPool::global().get_leaf_page(&right).unwrap();
}
_ => {
todo!()
}
}
}
pub fn get_file(&self) -> RefMut<File> {
self.file.borrow_mut()
}
fn file_init(mut file: RefMut<File>, table_id: i32) {
if file.metadata().unwrap().len() == 0 {
debug!("db file empty, start init");
let empty_root_pointer_data = BTreeRootPointerPage::empty_page_data();
let empty_leaf_data = BTreeLeafPage::empty_page_data();
let mut n = file.write(&empty_root_pointer_data).unwrap();
debug!(
"write page to disk, pid: {}, len: {}",
BTreePageID::new(PageCategory::RootPointer, table_id, 0),
n
);
n = file.write(&empty_leaf_data).unwrap();
debug!(
"write page to disk, pid: {}, len: {}",
BTreePageID::new(PageCategory::Leaf, table_id, 1),
n
);
let file_length = file.metadata().unwrap().len();
debug!("write complete, file length: {}", file_length);
}
}
fn write_page(&self, page_id: &BTreePageID) {
info!("write page to disk, pid: {}", page_id);
let start_pos = BTreeRootPointerPage::page_size() + (page_id.page_index - 1) * PAGE_SIZE;
self.get_file()
.seek(SeekFrom::Start(start_pos as u64))
.expect("io error");
self.get_file()
.write(&BTreeInternalPage::empty_page_data())
.expect("io error");
self.get_file().flush().expect("io error");
let file_length = self.get_file().metadata().unwrap().len();
debug!("write complete, file length: {}", file_length);
}
pub fn get_root_pid(&self) -> BTreePageID {
let root_pointer_pid = BTreePageID {
category: PageCategory::RootPointer,
page_index: 0,
table_id: self.table_id,
};
let page_ref = BufferPool::global()
.get_root_pointer_page(&root_pointer_pid)
.expect("io error");
let page = (*page_ref).borrow();
let mut root_pid = page.get_root_pid();
root_pid.table_id = self.get_id();
root_pid
}
pub fn pages_count(&self) -> usize {
let file_len = self.get_file().metadata().unwrap().len() as usize;
(file_len - BTreeRootPointerPage::page_size()) / PAGE_SIZE
}
}