1use crate::buffer::{AtomicPageId, WritePageGuard, INVALID_PAGE_ID};
2use crate::catalog::SchemaRef;
3use crate::recovery::Lsn;
4use crate::storage::codec::TablePageCodec;
5use crate::storage::page::{RecordId, TablePage, TupleMeta, INVALID_RID};
6use crate::{buffer::BufferManager, error::QuillSQLResult};
7use std::collections::Bound;
8use std::ops::RangeBounds;
9use std::sync::atomic::{AtomicU32, Ordering};
10use std::sync::Arc;
11
12use crate::storage::heap::wal_codec::RelationIdent;
13use crate::storage::tuple::Tuple;
14
15#[derive(Debug)]
16pub struct TableHeap {
17 pub schema: SchemaRef,
18 pub buffer_pool: Arc<BufferManager>,
19 pub first_page_id: AtomicPageId,
20 pub last_page_id: AtomicPageId,
21}
22
23impl TableHeap {
24 pub fn try_new(schema: SchemaRef, buffer_pool: Arc<BufferManager>) -> QuillSQLResult<Self> {
26 let mut first_page_guard = buffer_pool.new_page()?;
28 let first_page_id = first_page_guard.page_id();
29
30 let table_page = TablePage::new(schema.clone(), INVALID_PAGE_ID);
32 let encoded_data = TablePageCodec::encode(&table_page);
33
34 first_page_guard.data_mut().copy_from_slice(&encoded_data);
37 first_page_guard.set_lsn(table_page.lsn());
38
39 Ok(Self {
41 schema,
42 buffer_pool,
43 first_page_id: AtomicU32::new(first_page_id),
44 last_page_id: AtomicU32::new(first_page_id),
45 })
46 }
47
48 fn write_back_page(
49 &self,
50 guard: &mut WritePageGuard,
51 table_page: &mut TablePage,
52 new_lsn: Option<Lsn>,
53 ) -> QuillSQLResult<()> {
54 let new_image = TablePageCodec::encode(table_page);
55 guard.overwrite(&new_image, new_lsn.or(Some(guard.lsn())));
56 Ok(())
57 }
58
59 pub(crate) fn relation_ident(&self) -> RelationIdent {
60 RelationIdent {
61 root_page_id: self.first_page_id.load(Ordering::SeqCst),
62 }
63 }
64
65 pub fn insert_tuple(&self, meta: &TupleMeta, tuple: &Tuple) -> QuillSQLResult<RecordId> {
68 self.insert_tuple_with(meta, tuple, |_rid, _meta, _tuple| Ok(None))
69 }
70
71 pub fn insert_tuple_with<F>(
72 &self,
73 meta: &TupleMeta,
74 tuple: &Tuple,
75 mut wal_cb: F,
76 ) -> QuillSQLResult<RecordId>
77 where
78 F: FnMut(RecordId, &TupleMeta, &Tuple) -> QuillSQLResult<Option<Lsn>>,
79 {
80 let tuple_bytes = crate::storage::codec::TupleCodec::encode(tuple);
81 let mut current_page_id = self.last_page_id.load(Ordering::SeqCst);
82
83 loop {
84 let mut current_page_guard = self.buffer_pool.fetch_page_write(current_page_id)?;
85 let mut table_page =
86 TablePageCodec::decode(current_page_guard.data(), self.schema.clone())?.0;
87 table_page.set_lsn(current_page_guard.lsn());
88
89 if table_page
90 .next_tuple_offset_with_len(tuple_bytes.len())
91 .is_ok()
92 {
93 let slot_id = table_page.insert_tuple_bytes(meta, &tuple_bytes)?;
94 let rid = RecordId::new(current_page_id, slot_id as u32);
95 let wal_lsn = wal_cb(rid, meta, tuple)?;
96 self.write_back_page(&mut current_page_guard, &mut table_page, wal_lsn)?;
97 return Ok(rid);
98 }
99
100 let mut new_page_guard = self.buffer_pool.new_page()?;
101 let new_page_id = new_page_guard.page_id();
102 let mut new_table_page = TablePage::new(self.schema.clone(), INVALID_PAGE_ID);
103 self.write_back_page(&mut new_page_guard, &mut new_table_page, None)?;
104
105 table_page.header.next_page_id = new_page_id;
106 self.write_back_page(&mut current_page_guard, &mut table_page, None)?;
107 drop(current_page_guard);
108
109 self.last_page_id.store(new_page_id, Ordering::SeqCst);
110 current_page_id = new_page_id;
111 }
112 }
113
114 pub fn full_tuple(&self, rid: RecordId) -> QuillSQLResult<(TupleMeta, Tuple)> {
115 let (_, table_page) = self
116 .buffer_pool
117 .fetch_table_page(rid.page_id, self.schema.clone())?;
118 let result = table_page.tuple(rid.slot_num as u16)?;
119 Ok(result)
120 }
121
122 pub fn write_tuple_meta(&self, rid: RecordId, meta: TupleMeta) -> QuillSQLResult<()> {
124 self.write_tuple_meta_with_lsn(rid, meta, None)
125 }
126
127 pub fn write_tuple_meta_with_lsn(
128 &self,
129 rid: RecordId,
130 meta: TupleMeta,
131 new_lsn: Option<Lsn>,
132 ) -> QuillSQLResult<()> {
133 let mut page_guard = self.buffer_pool.fetch_page_write(rid.page_id)?;
134 let mut table_page = TablePageCodec::decode(page_guard.data(), self.schema.clone())?.0;
135 table_page.set_lsn(page_guard.lsn());
136
137 let slot = rid.slot_num as u16;
138 table_page.update_tuple_meta(meta, slot)?;
139 self.write_back_page(&mut page_guard, &mut table_page, new_lsn)
140 }
141
142 pub fn tuple(&self, rid: RecordId) -> QuillSQLResult<Tuple> {
143 let (_meta, tuple) = self.full_tuple(rid)?;
144 Ok(tuple)
145 }
146
147 pub fn tuple_meta(&self, rid: RecordId) -> QuillSQLResult<TupleMeta> {
148 let (meta, _tuple) = self.full_tuple(rid)?;
149 Ok(meta)
150 }
151
152 pub fn get_first_rid(&self) -> QuillSQLResult<Option<RecordId>> {
153 let first_page_id = self.first_page_id.load(Ordering::SeqCst);
154 let (_, table_page) = self
155 .buffer_pool
156 .fetch_table_page(first_page_id, self.schema.clone())?;
157
158 if table_page.header.num_tuples == 0 {
159 Ok(None)
160 } else {
161 Ok(Some(RecordId::new(first_page_id, 0)))
162 }
163 }
164
165 pub fn get_next_rid(&self, rid: RecordId) -> QuillSQLResult<Option<RecordId>> {
166 let (_, table_page) = self
167 .buffer_pool
168 .fetch_table_page(rid.page_id, self.schema.clone())?;
169 let next_rid = table_page.get_next_rid(&rid);
170 if next_rid.is_some() {
171 return Ok(next_rid);
172 }
173
174 if table_page.header.next_page_id == INVALID_PAGE_ID {
175 return Ok(None);
176 }
177 let (_, next_table_page) = self
178 .buffer_pool
179 .fetch_table_page(table_page.header.next_page_id, self.schema.clone())?;
180
181 if next_table_page.header.num_tuples == 0 {
182 return Ok(None);
183 }
184 Ok(Some(RecordId::new(table_page.header.next_page_id, 0)))
185 }
186
187 pub fn vacuum_slot_if<F>(&self, rid: RecordId, predicate: F) -> QuillSQLResult<bool>
190 where
191 F: FnOnce(&TupleMeta) -> bool,
192 {
193 let mut page_guard = self.buffer_pool.fetch_page_write(rid.page_id)?;
194 let mut table_page = TablePageCodec::decode(page_guard.data(), self.schema.clone())?.0;
195 table_page.set_lsn(page_guard.lsn());
196
197 let slot = rid.slot_num as u16;
198 if slot >= table_page.header.num_tuples {
199 return Ok(false);
200 }
201 let meta = table_page.header.tuple_infos[slot as usize].meta;
202 if !predicate(&meta) {
203 return Ok(false);
204 }
205
206 table_page.reclaim_tuple(slot)?;
207 self.write_back_page(&mut page_guard, &mut table_page, None)?;
208 Ok(true)
209 }
210}
211
212#[derive(Debug)]
213pub struct TableIterator {
214 heap: Arc<TableHeap>,
215 start_bound: Bound<RecordId>,
216 end_bound: Bound<RecordId>,
217 cursor: RecordId,
218 started: bool,
219 ended: bool,
220}
221
222impl TableIterator {
223 pub fn new<R: RangeBounds<RecordId>>(heap: Arc<TableHeap>, range: R) -> Self {
224 let start = range.start_bound().cloned();
225 let end = range.end_bound().cloned();
226 Self {
227 heap,
228 start_bound: start,
229 end_bound: end,
230 cursor: INVALID_RID,
231 started: false,
232 ended: false,
233 }
234 }
235
236 pub fn next(&mut self) -> QuillSQLResult<Option<(RecordId, TupleMeta, Tuple)>> {
237 if self.ended {
238 return Ok(None);
239 }
240
241 if self.started {
242 match self.end_bound {
243 Bound::Included(rid) => {
244 if let Some(next_rid) = self.heap.get_next_rid(self.cursor)? {
245 self.cursor = next_rid;
246 if self.cursor == rid {
247 self.ended = true;
248 }
249 let (meta, tuple) = self.heap.full_tuple(self.cursor)?;
250 Ok(Some((self.cursor, meta, tuple)))
251 } else {
252 Ok(None)
253 }
254 }
255 Bound::Excluded(rid) => {
256 if let Some(next_rid) = self.heap.get_next_rid(self.cursor)? {
257 if next_rid == rid {
258 self.ended = true;
259 Ok(None)
260 } else {
261 self.cursor = next_rid;
262 let (meta, tuple) = self.heap.full_tuple(self.cursor)?;
263 Ok(Some((self.cursor, meta, tuple)))
264 }
265 } else {
266 Ok(None)
267 }
268 }
269 Bound::Unbounded => {
270 if let Some(next_rid) = self.heap.get_next_rid(self.cursor)? {
271 self.cursor = next_rid;
272 let (meta, tuple) = self.heap.full_tuple(self.cursor)?;
273 Ok(Some((self.cursor, meta, tuple)))
274 } else {
275 Ok(None)
276 }
277 }
278 }
279 } else {
280 self.started = true;
281 match self.start_bound {
282 Bound::Included(rid) => {
283 self.cursor = rid;
284 let (meta, tuple) = self.heap.full_tuple(self.cursor)?;
285 Ok(Some((self.cursor, meta, tuple)))
286 }
287 Bound::Excluded(rid) => {
288 if let Some(next_rid) = self.heap.get_next_rid(rid)? {
289 self.cursor = next_rid;
290 let (meta, tuple) = self.heap.full_tuple(self.cursor)?;
291 Ok(Some((self.cursor, meta, tuple)))
292 } else {
293 self.ended = true;
294 Ok(None)
295 }
296 }
297 Bound::Unbounded => {
298 if let Some(first_rid) = self.heap.get_first_rid()? {
299 self.cursor = first_rid;
300 let (meta, tuple) = self.heap.full_tuple(self.cursor)?;
301 Ok(Some((self.cursor, meta, tuple)))
302 } else {
303 self.ended = true;
304 Ok(None)
305 }
306 }
307 }
308 }
309 }
310}
311
312#[cfg(test)]
313mod tests {
314
315 use std::sync::Arc;
316 use tempfile::TempDir;
317
318 use crate::buffer::BufferManager;
319 use crate::catalog::{Column, DataType, Schema};
320 use crate::storage::codec::TupleCodec;
321 use crate::storage::disk_manager::DiskManager;
322 use crate::storage::disk_scheduler::DiskScheduler;
323 use crate::storage::page::EMPTY_TUPLE_META;
324 use crate::storage::table_heap::TableIterator;
325 use crate::storage::{table_heap::TableHeap, tuple::Tuple};
326 use crate::utils::scalar::ScalarValue;
327
328 #[test]
329 pub fn test_table_heap_write_tuple_meta() {
330 let temp_dir = TempDir::new().unwrap();
331 let temp_path = temp_dir.path().join("test.db");
332
333 let schema = Arc::new(Schema::new(vec![
334 Column::new("a", DataType::Int8, false),
335 Column::new("b", DataType::Int16, false),
336 ]));
337 let disk_manager = DiskManager::try_new(temp_path).unwrap();
338 let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
339 let buffer_pool = Arc::new(BufferManager::new(1000, disk_scheduler));
340 let table_heap = TableHeap::try_new(schema.clone(), buffer_pool).unwrap();
341
342 let _rid1 = table_heap
343 .insert_tuple(
344 &EMPTY_TUPLE_META,
345 &Tuple::new(schema.clone(), vec![1i8.into(), 1i16.into()]),
346 )
347 .unwrap();
348 let rid2 = table_heap
349 .insert_tuple(
350 &EMPTY_TUPLE_META,
351 &Tuple::new(schema.clone(), vec![2i8.into(), 2i16.into()]),
352 )
353 .unwrap();
354 let _rid3 = table_heap
355 .insert_tuple(
356 &EMPTY_TUPLE_META,
357 &Tuple::new(schema.clone(), vec![3i8.into(), 3i16.into()]),
358 )
359 .unwrap();
360
361 let mut meta = table_heap.tuple_meta(rid2).unwrap();
362 meta.insert_txn_id = 1;
363 table_heap.write_tuple_meta(rid2, meta).unwrap();
364
365 let meta = table_heap.tuple_meta(rid2).unwrap();
366 assert_eq!(meta.insert_txn_id, 1);
367 }
368
369 #[test]
370 pub fn test_table_heap_insert_tuple() {
371 let temp_dir = TempDir::new().unwrap();
372 let temp_path = temp_dir.path().join("test.db");
373
374 let schema = Arc::new(Schema::new(vec![
375 Column::new("a", DataType::Int8, false),
376 Column::new("b", DataType::Int16, false),
377 ]));
378 let disk_manager = DiskManager::try_new(temp_path).unwrap();
379 let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
380 let buffer_pool = Arc::new(BufferManager::new(1000, disk_scheduler));
381 let table_heap = TableHeap::try_new(schema.clone(), buffer_pool).unwrap();
382
383 let meta1 = super::TupleMeta::new(1, 0);
384 let rid1 = table_heap
385 .insert_tuple(
386 &meta1,
387 &Tuple::new(schema.clone(), vec![1i8.into(), 1i16.into()]),
388 )
389 .unwrap();
390 let meta2 = super::TupleMeta::new(2, 0);
391 let rid2 = table_heap
392 .insert_tuple(
393 &meta2,
394 &Tuple::new(schema.clone(), vec![2i8.into(), 2i16.into()]),
395 )
396 .unwrap();
397 let meta3 = super::TupleMeta::new(3, 0);
398 let rid3 = table_heap
399 .insert_tuple(
400 &meta3,
401 &Tuple::new(schema.clone(), vec![3i8.into(), 3i16.into()]),
402 )
403 .unwrap();
404
405 let (meta, tuple) = table_heap.full_tuple(rid1).unwrap();
406 assert_eq!(meta, meta1);
407 assert_eq!(tuple.data, vec![1i8.into(), 1i16.into()]);
408
409 let (meta, tuple) = table_heap.full_tuple(rid2).unwrap();
410 assert_eq!(meta, meta2);
411 assert_eq!(tuple.data, vec![2i8.into(), 2i16.into()]);
412
413 let (meta, tuple) = table_heap.full_tuple(rid3).unwrap();
414 assert_eq!(meta, meta3);
415 assert_eq!(tuple.data, vec![3i8.into(), 3i16.into()]);
416 }
417
418 #[test]
419 pub fn test_table_heap_iterator() {
420 let temp_dir = TempDir::new().unwrap();
421 let temp_path = temp_dir.path().join("test.db");
422
423 let schema = Arc::new(Schema::new(vec![
424 Column::new("a", DataType::Int8, false),
425 Column::new("b", DataType::Int16, false),
426 ]));
427
428 let disk_manager = DiskManager::try_new(temp_path).unwrap();
429 let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
430 let buffer_pool = Arc::new(BufferManager::new(1000, disk_scheduler));
431 let table_heap = Arc::new(TableHeap::try_new(schema.clone(), buffer_pool).unwrap());
432
433 let meta1 = super::TupleMeta::new(1, 0);
434 let rid1 = table_heap
435 .insert_tuple(
436 &meta1,
437 &Tuple::new(schema.clone(), vec![1i8.into(), 1i16.into()]),
438 )
439 .unwrap();
440 let meta2 = super::TupleMeta::new(2, 0);
441 let rid2 = table_heap
442 .insert_tuple(
443 &meta2,
444 &Tuple::new(schema.clone(), vec![2i8.into(), 2i16.into()]),
445 )
446 .unwrap();
447 let meta3 = super::TupleMeta::new(3, 0);
448 let rid3 = table_heap
449 .insert_tuple(
450 &meta3,
451 &Tuple::new(schema.clone(), vec![3i8.into(), 3i16.into()]),
452 )
453 .unwrap();
454
455 let mut iterator = TableIterator::new(table_heap.clone(), ..);
456
457 let (rid, meta, tuple) = iterator.next().unwrap().unwrap();
458 assert_eq!(rid, rid1);
459 assert_eq!(meta, meta1);
460 assert_eq!(tuple.data, vec![1i8.into(), 1i16.into()]);
461
462 let (rid, meta, tuple) = iterator.next().unwrap().unwrap();
463 assert_eq!(rid, rid2);
464 assert_eq!(meta, meta2);
465 assert_eq!(tuple.data, vec![2i8.into(), 2i16.into()]);
466
467 let (rid, meta, tuple) = iterator.next().unwrap().unwrap();
468 assert_eq!(rid, rid3);
469 assert_eq!(meta, meta3);
470 assert_eq!(tuple.data, vec![3i8.into(), 3i16.into()]);
471
472 assert!(iterator.next().unwrap().is_none());
473 }
474
475 #[test]
476 pub fn test_recover_set_tuple_meta_and_bytes() {
477 let temp_dir = TempDir::new().unwrap();
478 let temp_path = temp_dir.path().join("test.db");
479
480 let schema = Arc::new(Schema::new(vec![
481 Column::new("a", DataType::Int8, false),
482 Column::new("b", DataType::Int16, false),
483 ]));
484 let disk_manager = DiskManager::try_new(temp_path).unwrap();
485 let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
486 let buffer_pool = Arc::new(BufferManager::new(128, disk_scheduler));
487 let table_heap = TableHeap::try_new(schema.clone(), buffer_pool.clone()).unwrap();
488
489 let rid = table_heap
491 .insert_tuple(
492 &EMPTY_TUPLE_META,
493 &Tuple::new(schema.clone(), vec![1i8.into(), 10i16.into()]),
494 )
495 .unwrap();
496
497 let new_tuple = Tuple::new(schema.clone(), vec![2i8.into(), 20i16.into()]);
499 let new_bytes = TupleCodec::encode(&new_tuple);
500 table_heap
501 .recover_set_tuple_bytes(rid, &new_bytes)
502 .expect("recover bytes");
503
504 let (_m, t) = table_heap.full_tuple(rid).unwrap();
506 assert_eq!(t.data, vec![2i8.into(), 20i16.into()]);
507
508 let mut meta = table_heap.tuple_meta(rid).unwrap();
510 meta.is_deleted = true;
511 table_heap
512 .recover_set_tuple_meta(rid, meta)
513 .expect("recover meta");
514 let m2 = table_heap.tuple_meta(rid).unwrap();
515 assert!(m2.is_deleted);
516 }
517
518 #[test]
519 pub fn test_recover_repack_on_size_mismatch() {
520 let temp_dir = TempDir::new().unwrap();
521 let temp_path = temp_dir.path().join("test.db");
522
523 let schema = Arc::new(Schema::new(vec![
524 Column::new("a", DataType::Int8, false),
525 Column::new("b", DataType::Int16, false),
526 ]));
527 let disk_manager = DiskManager::try_new(temp_path).unwrap();
528 let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
529 let buffer_pool = Arc::new(BufferManager::new(128, disk_scheduler));
530 let table_heap = TableHeap::try_new(schema.clone(), buffer_pool.clone()).unwrap();
531
532 let rid = table_heap
533 .insert_tuple(
534 &EMPTY_TUPLE_META,
535 &Tuple::new(schema.clone(), vec![1i8.into(), 10i16.into()]),
536 )
537 .unwrap();
538
539 let larger_tuple = Tuple::new(schema.clone(), vec![99i8.into(), 300i16.into()]);
541 let larger_bytes = TupleCodec::encode(&larger_tuple);
542 table_heap
543 .recover_set_tuple_bytes(rid, &larger_bytes)
544 .expect("recover larger bytes");
545
546 let (_m, t2) = table_heap.full_tuple(rid).unwrap();
547 assert_eq!(t2.data, vec![99i8.into(), 300i16.into()]);
548 }
549
550 #[test]
551 fn vacuum_slot_if_reclaims_tuple() {
552 let temp_dir = TempDir::new().unwrap();
553 let temp_path = temp_dir.path().join("vacuum.db");
554 let schema = Arc::new(Schema::new(vec![Column::new("v", DataType::Int32, false)]));
555 let disk_manager = DiskManager::try_new(temp_path).unwrap();
556 let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
557 let buffer_pool = Arc::new(BufferManager::new(32, disk_scheduler));
558 let heap = TableHeap::try_new(schema.clone(), buffer_pool).unwrap();
559
560 let meta = super::TupleMeta::new(1, 0);
561 let tuple = Tuple::new(schema.clone(), vec![ScalarValue::Int32(Some(5))]);
562 let rid = heap.insert_tuple(&meta, &tuple).unwrap();
563
564 assert!(heap.full_tuple(rid).is_ok());
565 assert!(heap.vacuum_slot_if(rid, |_| true).unwrap());
566 assert!(heap.full_tuple(rid).is_err());
567 assert!(heap.get_first_rid().unwrap().is_none());
568 }
569
570 #[test]
571 fn vacuum_slot_if_respects_predicate() {
572 let temp_dir = TempDir::new().unwrap();
573 let temp_path = temp_dir.path().join("vacuum_predicate.db");
574 let schema = Arc::new(Schema::new(vec![Column::new("v", DataType::Int32, false)]));
575 let disk_manager = DiskManager::try_new(temp_path).unwrap();
576 let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
577 let buffer_pool = Arc::new(BufferManager::new(32, disk_scheduler));
578 let heap = TableHeap::try_new(schema.clone(), buffer_pool).unwrap();
579
580 let meta = super::TupleMeta::new(42, 0);
581 let tuple = Tuple::new(schema.clone(), vec![ScalarValue::Int32(Some(9))]);
582 let rid = heap.insert_tuple(&meta, &tuple).unwrap();
583
584 assert!(!heap
585 .vacuum_slot_if(rid, |current| current.insert_txn_id == 0)
586 .unwrap());
587 let (meta_after, tuple_after) = heap.full_tuple(rid).unwrap();
588 assert_eq!(meta_after.insert_txn_id, 42);
589 assert_eq!(tuple_after, tuple);
590 }
591}