use crate::buffer::{AtomicPageId, INVALID_PAGE_ID};
use crate::catalog::SchemaRef;
use crate::common::util::page_bytes_to_array;
use crate::storage::codec::TablePageCodec;
use crate::storage::{RecordId, TablePage, TupleMeta, INVALID_RID};
use crate::{buffer::BufferPoolManager, BustubxResult};
use std::collections::Bound;
use std::ops::RangeBounds;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use super::tuple::Tuple;
#[derive(Debug)]
pub struct TableHeap {
pub schema: SchemaRef,
pub buffer_pool: Arc<BufferPoolManager>,
pub first_page_id: AtomicPageId,
pub last_page_id: AtomicPageId,
}
impl TableHeap {
pub fn try_new(schema: SchemaRef, buffer_pool: Arc<BufferPoolManager>) -> BustubxResult<Self> {
let first_page = buffer_pool.new_page()?;
let first_page_id = first_page.read().unwrap().page_id;
let table_page = TablePage::new(schema.clone(), INVALID_PAGE_ID);
first_page
.write()
.unwrap()
.set_data(page_bytes_to_array(&TablePageCodec::encode(&table_page)));
Ok(Self {
schema,
buffer_pool,
first_page_id: AtomicPageId::new(first_page_id),
last_page_id: AtomicPageId::new(first_page_id),
})
}
pub fn insert_tuple(&self, meta: &TupleMeta, tuple: &Tuple) -> BustubxResult<RecordId> {
let mut last_page_id = self.last_page_id.load(Ordering::SeqCst);
let (last_page, mut last_table_page) = self
.buffer_pool
.fetch_table_page(last_page_id, self.schema.clone())?;
loop {
if last_table_page.next_tuple_offset(tuple).is_ok() {
break;
}
assert!(
last_table_page.header.num_tuples > 0,
"tuple is too large, cannot insert"
);
let next_page = self.buffer_pool.new_page()?;
let next_page_id = next_page.read().unwrap().page_id;
let next_table_page = TablePage::new(self.schema.clone(), INVALID_PAGE_ID);
next_page
.write()
.unwrap()
.set_data(page_bytes_to_array(&TablePageCodec::encode(
&next_table_page,
)));
last_table_page.header.next_page_id = next_page_id;
last_page
.write()
.unwrap()
.set_data(page_bytes_to_array(&TablePageCodec::encode(
&last_table_page,
)));
last_page_id = next_page_id;
last_table_page = next_table_page;
self.last_page_id.store(last_page_id, Ordering::SeqCst);
}
let slot_id = last_table_page.insert_tuple(meta, tuple)?;
last_page
.write()
.unwrap()
.set_data(page_bytes_to_array(&TablePageCodec::encode(
&last_table_page,
)));
Ok(RecordId::new(last_page_id, slot_id as u32))
}
pub fn update_tuple(&self, rid: RecordId, tuple: Tuple) -> BustubxResult<()> {
let (page, mut table_page) = self
.buffer_pool
.fetch_table_page(rid.page_id, self.schema.clone())?;
table_page.update_tuple(tuple, rid.slot_num as u16)?;
page.write()
.unwrap()
.set_data(page_bytes_to_array(&TablePageCodec::encode(&table_page)));
Ok(())
}
pub fn update_tuple_meta(&self, meta: TupleMeta, rid: RecordId) -> BustubxResult<()> {
let (page, mut table_page) = self
.buffer_pool
.fetch_table_page(rid.page_id, self.schema.clone())?;
table_page.update_tuple_meta(meta, rid.slot_num as u16)?;
page.write()
.unwrap()
.set_data(page_bytes_to_array(&TablePageCodec::encode(&table_page)));
Ok(())
}
pub fn full_tuple(&self, rid: RecordId) -> BustubxResult<(TupleMeta, Tuple)> {
let (_, table_page) = self
.buffer_pool
.fetch_table_page(rid.page_id, self.schema.clone())?;
let result = table_page.tuple(rid.slot_num as u16)?;
Ok(result)
}
pub fn tuple(&self, rid: RecordId) -> BustubxResult<Tuple> {
let (_meta, tuple) = self.full_tuple(rid)?;
Ok(tuple)
}
pub fn tuple_meta(&self, rid: RecordId) -> BustubxResult<TupleMeta> {
let (meta, _tuple) = self.full_tuple(rid)?;
Ok(meta)
}
pub fn get_first_rid(&self) -> BustubxResult<Option<RecordId>> {
let first_page_id = self.first_page_id.load(Ordering::SeqCst);
let (_, table_page) = self
.buffer_pool
.fetch_table_page(first_page_id, self.schema.clone())?;
if table_page.header.num_tuples == 0 {
Ok(None)
} else {
Ok(Some(RecordId::new(first_page_id, 0)))
}
}
pub fn get_next_rid(&self, rid: RecordId) -> BustubxResult<Option<RecordId>> {
let (_, table_page) = self
.buffer_pool
.fetch_table_page(rid.page_id, self.schema.clone())?;
let next_rid = table_page.get_next_rid(&rid);
if next_rid.is_some() {
return Ok(next_rid);
}
if table_page.header.next_page_id == INVALID_PAGE_ID {
return Ok(None);
}
let (_, next_table_page) = self
.buffer_pool
.fetch_table_page(table_page.header.next_page_id, self.schema.clone())?;
if next_table_page.header.num_tuples == 0 {
Ok(None)
} else {
Ok(Some(RecordId::new(table_page.header.next_page_id, 0)))
}
}
}
#[derive(Debug)]
pub struct TableIterator {
heap: Arc<TableHeap>,
start_bound: Bound<RecordId>,
end_bound: Bound<RecordId>,
cursor: RecordId,
started: bool,
ended: bool,
}
impl TableIterator {
pub fn new<R: RangeBounds<RecordId>>(heap: Arc<TableHeap>, range: R) -> Self {
Self {
heap,
start_bound: range.start_bound().cloned(),
end_bound: range.end_bound().cloned(),
cursor: INVALID_RID,
started: false,
ended: false,
}
}
pub fn next(&mut self) -> BustubxResult<Option<(RecordId, Tuple)>> {
if self.ended {
return Ok(None);
}
if self.started {
match self.end_bound {
Bound::Included(rid) => {
if let Some(next_rid) = self.heap.get_next_rid(self.cursor)? {
if next_rid == rid {
self.ended = true;
}
self.cursor = next_rid;
Ok(self
.heap
.tuple(self.cursor)
.ok()
.map(|tuple| (self.cursor, tuple)))
} else {
Ok(None)
}
}
Bound::Excluded(rid) => {
if let Some(next_rid) = self.heap.get_next_rid(self.cursor)? {
if next_rid == rid {
Ok(None)
} else {
self.cursor = next_rid;
Ok(self
.heap
.tuple(self.cursor)
.ok()
.map(|tuple| (self.cursor, tuple)))
}
} else {
Ok(None)
}
}
Bound::Unbounded => {
if let Some(next_rid) = self.heap.get_next_rid(self.cursor)? {
self.cursor = next_rid;
Ok(self
.heap
.tuple(self.cursor)
.ok()
.map(|tuple| (self.cursor, tuple)))
} else {
Ok(None)
}
}
}
} else {
self.started = true;
match self.start_bound {
Bound::Included(rid) => {
self.cursor = rid;
Ok(self
.heap
.tuple(self.cursor)
.ok()
.map(|tuple| (self.cursor, tuple)))
}
Bound::Excluded(rid) => {
if let Some(next_rid) = self.heap.get_next_rid(rid)? {
self.cursor = next_rid;
Ok(self
.heap
.tuple(self.cursor)
.ok()
.map(|tuple| (self.cursor, tuple)))
} else {
self.ended = true;
Ok(None)
}
}
Bound::Unbounded => {
if let Some(first_rid) = self.heap.get_first_rid()? {
self.cursor = first_rid;
Ok(self
.heap
.tuple(self.cursor)
.ok()
.map(|tuple| (self.cursor, tuple)))
} else {
self.ended = true;
Ok(None)
}
}
}
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use tempfile::TempDir;
use crate::catalog::{Column, DataType, Schema};
use crate::storage::{TableIterator, EMPTY_TUPLE_META};
use crate::{
buffer::BufferPoolManager,
storage::{table_heap::TableHeap, DiskManager, Tuple},
};
#[test]
pub fn test_table_heap_update_tuple_meta() {
let temp_dir = TempDir::new().unwrap();
let temp_path = temp_dir.path().join("test.db");
let schema = Arc::new(Schema::new(vec![
Column::new("a", DataType::Int8, false),
Column::new("b", DataType::Int16, false),
]));
let disk_manager = DiskManager::try_new(temp_path).unwrap();
let buffer_pool = Arc::new(BufferPoolManager::new(1000, Arc::new(disk_manager)));
let table_heap = TableHeap::try_new(schema.clone(), buffer_pool).unwrap();
let _rid1 = table_heap
.insert_tuple(
&EMPTY_TUPLE_META,
&Tuple::new(schema.clone(), vec![1i8.into(), 1i16.into()]),
)
.unwrap();
let rid2 = table_heap
.insert_tuple(
&EMPTY_TUPLE_META,
&Tuple::new(schema.clone(), vec![2i8.into(), 2i16.into()]),
)
.unwrap();
let _rid3 = table_heap
.insert_tuple(
&EMPTY_TUPLE_META,
&Tuple::new(schema.clone(), vec![3i8.into(), 3i16.into()]),
)
.unwrap();
let mut meta = table_heap.tuple_meta(rid2).unwrap();
meta.insert_txn_id = 1;
meta.delete_txn_id = 2;
meta.is_deleted = true;
table_heap.update_tuple_meta(meta, rid2).unwrap();
let meta = table_heap.tuple_meta(rid2).unwrap();
assert_eq!(meta.insert_txn_id, 1);
assert_eq!(meta.delete_txn_id, 2);
assert!(meta.is_deleted);
}
#[test]
pub fn test_table_heap_insert_tuple() {
let temp_dir = TempDir::new().unwrap();
let temp_path = temp_dir.path().join("test.db");
let schema = Arc::new(Schema::new(vec![
Column::new("a", DataType::Int8, false),
Column::new("b", DataType::Int16, false),
]));
let disk_manager = DiskManager::try_new(temp_path).unwrap();
let buffer_pool = Arc::new(BufferPoolManager::new(1000, Arc::new(disk_manager)));
let table_heap = TableHeap::try_new(schema.clone(), buffer_pool).unwrap();
let meta1 = super::TupleMeta {
insert_txn_id: 1,
delete_txn_id: 1,
is_deleted: false,
};
let rid1 = table_heap
.insert_tuple(
&meta1,
&Tuple::new(schema.clone(), vec![1i8.into(), 1i16.into()]),
)
.unwrap();
let meta2 = super::TupleMeta {
insert_txn_id: 2,
delete_txn_id: 2,
is_deleted: false,
};
let rid2 = table_heap
.insert_tuple(
&meta2,
&Tuple::new(schema.clone(), vec![2i8.into(), 2i16.into()]),
)
.unwrap();
let meta3 = super::TupleMeta {
insert_txn_id: 3,
delete_txn_id: 3,
is_deleted: false,
};
let rid3 = table_heap
.insert_tuple(
&meta3,
&Tuple::new(schema.clone(), vec![3i8.into(), 3i16.into()]),
)
.unwrap();
let (meta, tuple) = table_heap.full_tuple(rid1).unwrap();
assert_eq!(meta, meta1);
assert_eq!(tuple.data, vec![1i8.into(), 1i16.into()]);
let (meta, tuple) = table_heap.full_tuple(rid2).unwrap();
assert_eq!(meta, meta2);
assert_eq!(tuple.data, vec![2i8.into(), 2i16.into()]);
let (meta, tuple) = table_heap.full_tuple(rid3).unwrap();
assert_eq!(meta, meta3);
assert_eq!(tuple.data, vec![3i8.into(), 3i16.into()]);
}
#[test]
pub fn test_table_heap_iterator() {
let temp_dir = TempDir::new().unwrap();
let temp_path = temp_dir.path().join("test.db");
let schema = Arc::new(Schema::new(vec![
Column::new("a", DataType::Int8, false),
Column::new("b", DataType::Int16, false),
]));
let disk_manager = DiskManager::try_new(temp_path).unwrap();
let buffer_pool = Arc::new(BufferPoolManager::new(1000, Arc::new(disk_manager)));
let table_heap = Arc::new(TableHeap::try_new(schema.clone(), buffer_pool).unwrap());
let meta1 = super::TupleMeta {
insert_txn_id: 1,
delete_txn_id: 1,
is_deleted: false,
};
let rid1 = table_heap
.insert_tuple(
&meta1,
&Tuple::new(schema.clone(), vec![1i8.into(), 1i16.into()]),
)
.unwrap();
let meta2 = super::TupleMeta {
insert_txn_id: 2,
delete_txn_id: 2,
is_deleted: false,
};
let rid2 = table_heap
.insert_tuple(
&meta2,
&Tuple::new(schema.clone(), vec![2i8.into(), 2i16.into()]),
)
.unwrap();
let meta3 = super::TupleMeta {
insert_txn_id: 3,
delete_txn_id: 3,
is_deleted: false,
};
let rid3 = table_heap
.insert_tuple(
&meta3,
&Tuple::new(schema.clone(), vec![3i8.into(), 3i16.into()]),
)
.unwrap();
let mut iterator = TableIterator::new(table_heap.clone(), ..);
let (rid, tuple) = iterator.next().unwrap().unwrap();
assert_eq!(rid, rid1);
assert_eq!(tuple.data, vec![1i8.into(), 1i16.into()]);
let (rid, tuple) = iterator.next().unwrap().unwrap();
assert_eq!(rid, rid2);
assert_eq!(tuple.data, vec![2i8.into(), 2i16.into()]);
let (rid, tuple) = iterator.next().unwrap().unwrap();
assert_eq!(rid, rid3);
assert_eq!(tuple.data, vec![3i8.into(), 3i16.into()]);
assert!(iterator.next().unwrap().is_none());
}
}