quill_sql/storage/heap/
table_heap.rs

1use crate::buffer::{AtomicPageId, WritePageGuard, INVALID_PAGE_ID};
2use crate::catalog::SchemaRef;
3use crate::recovery::Lsn;
4use crate::storage::codec::TablePageCodec;
5use crate::storage::page::{RecordId, TablePage, TupleMeta, INVALID_RID};
6use crate::{buffer::BufferManager, error::QuillSQLResult};
7use std::collections::Bound;
8use std::ops::RangeBounds;
9use std::sync::atomic::{AtomicU32, Ordering};
10use std::sync::Arc;
11
12use crate::storage::heap::wal_codec::RelationIdent;
13use crate::storage::tuple::Tuple;
14
15#[derive(Debug)]
16pub struct TableHeap {
17    pub schema: SchemaRef,
18    pub buffer_pool: Arc<BufferManager>,
19    pub first_page_id: AtomicPageId,
20    pub last_page_id: AtomicPageId,
21}
22
23impl TableHeap {
24    /// Creates a new table heap. This involves allocating an initial page.
25    pub fn try_new(schema: SchemaRef, buffer_pool: Arc<BufferManager>) -> QuillSQLResult<Self> {
26        // new_page() returns a WritePageGuard.
27        let mut first_page_guard = buffer_pool.new_page()?;
28        let first_page_id = first_page_guard.page_id();
29
30        // Initialize the first page as an empty TablePage.
31        let table_page = TablePage::new(schema.clone(), INVALID_PAGE_ID);
32        let encoded_data = TablePageCodec::encode(&table_page);
33
34        // Use DerefMut to get a mutable reference and update the page data.
35        // This also marks the page as dirty automatically.
36        first_page_guard.data_mut().copy_from_slice(&encoded_data);
37        first_page_guard.set_lsn(table_page.lsn());
38
39        // The first_page_guard is dropped here, automatically unpinning the page.
40        Ok(Self {
41            schema,
42            buffer_pool,
43            first_page_id: AtomicU32::new(first_page_id),
44            last_page_id: AtomicU32::new(first_page_id),
45        })
46    }
47
48    fn write_back_page(
49        &self,
50        guard: &mut WritePageGuard,
51        table_page: &mut TablePage,
52        new_lsn: Option<Lsn>,
53    ) -> QuillSQLResult<()> {
54        let new_image = TablePageCodec::encode(table_page);
55        guard.overwrite(&new_image, new_lsn.or(Some(guard.lsn())));
56        Ok(())
57    }
58
59    pub(crate) fn relation_ident(&self) -> RelationIdent {
60        RelationIdent {
61            root_page_id: self.first_page_id.load(Ordering::SeqCst),
62        }
63    }
64
65    /// Inserts `tuple` with MVCC metadata `meta`, allocating a new page if the
66    /// current tail page runs out of capacity.
67    pub fn insert_tuple(&self, meta: &TupleMeta, tuple: &Tuple) -> QuillSQLResult<RecordId> {
68        self.insert_tuple_with(meta, tuple, |_rid, _meta, _tuple| Ok(None))
69    }
70
71    pub fn insert_tuple_with<F>(
72        &self,
73        meta: &TupleMeta,
74        tuple: &Tuple,
75        mut wal_cb: F,
76    ) -> QuillSQLResult<RecordId>
77    where
78        F: FnMut(RecordId, &TupleMeta, &Tuple) -> QuillSQLResult<Option<Lsn>>,
79    {
80        let tuple_bytes = crate::storage::codec::TupleCodec::encode(tuple);
81        let mut current_page_id = self.last_page_id.load(Ordering::SeqCst);
82
83        loop {
84            let mut current_page_guard = self.buffer_pool.fetch_page_write(current_page_id)?;
85            let mut table_page =
86                TablePageCodec::decode(current_page_guard.data(), self.schema.clone())?.0;
87            table_page.set_lsn(current_page_guard.lsn());
88
89            if table_page
90                .next_tuple_offset_with_len(tuple_bytes.len())
91                .is_ok()
92            {
93                let slot_id = table_page.insert_tuple_bytes(meta, &tuple_bytes)?;
94                let rid = RecordId::new(current_page_id, slot_id as u32);
95                let wal_lsn = wal_cb(rid, meta, tuple)?;
96                self.write_back_page(&mut current_page_guard, &mut table_page, wal_lsn)?;
97                return Ok(rid);
98            }
99
100            let mut new_page_guard = self.buffer_pool.new_page()?;
101            let new_page_id = new_page_guard.page_id();
102            let mut new_table_page = TablePage::new(self.schema.clone(), INVALID_PAGE_ID);
103            self.write_back_page(&mut new_page_guard, &mut new_table_page, None)?;
104
105            table_page.header.next_page_id = new_page_id;
106            self.write_back_page(&mut current_page_guard, &mut table_page, None)?;
107            drop(current_page_guard);
108
109            self.last_page_id.store(new_page_id, Ordering::SeqCst);
110            current_page_id = new_page_id;
111        }
112    }
113
114    pub fn full_tuple(&self, rid: RecordId) -> QuillSQLResult<(TupleMeta, Tuple)> {
115        let (_, table_page) = self
116            .buffer_pool
117            .fetch_table_page(rid.page_id, self.schema.clone())?;
118        let result = table_page.tuple(rid.slot_num as u16)?;
119        Ok(result)
120    }
121
122    /// Overwrite the tuple metadata at `rid` without emitting WAL.
123    pub fn write_tuple_meta(&self, rid: RecordId, meta: TupleMeta) -> QuillSQLResult<()> {
124        self.write_tuple_meta_with_lsn(rid, meta, None)
125    }
126
127    pub fn write_tuple_meta_with_lsn(
128        &self,
129        rid: RecordId,
130        meta: TupleMeta,
131        new_lsn: Option<Lsn>,
132    ) -> QuillSQLResult<()> {
133        let mut page_guard = self.buffer_pool.fetch_page_write(rid.page_id)?;
134        let mut table_page = TablePageCodec::decode(page_guard.data(), self.schema.clone())?.0;
135        table_page.set_lsn(page_guard.lsn());
136
137        let slot = rid.slot_num as u16;
138        table_page.update_tuple_meta(meta, slot)?;
139        self.write_back_page(&mut page_guard, &mut table_page, new_lsn)
140    }
141
142    pub fn tuple(&self, rid: RecordId) -> QuillSQLResult<Tuple> {
143        let (_meta, tuple) = self.full_tuple(rid)?;
144        Ok(tuple)
145    }
146
147    pub fn tuple_meta(&self, rid: RecordId) -> QuillSQLResult<TupleMeta> {
148        let (meta, _tuple) = self.full_tuple(rid)?;
149        Ok(meta)
150    }
151
152    pub fn get_first_rid(&self) -> QuillSQLResult<Option<RecordId>> {
153        let first_page_id = self.first_page_id.load(Ordering::SeqCst);
154        let (_, table_page) = self
155            .buffer_pool
156            .fetch_table_page(first_page_id, self.schema.clone())?;
157
158        if table_page.header.num_tuples == 0 {
159            Ok(None)
160        } else {
161            Ok(Some(RecordId::new(first_page_id, 0)))
162        }
163    }
164
165    pub fn get_next_rid(&self, rid: RecordId) -> QuillSQLResult<Option<RecordId>> {
166        let (_, table_page) = self
167            .buffer_pool
168            .fetch_table_page(rid.page_id, self.schema.clone())?;
169        let next_rid = table_page.get_next_rid(&rid);
170        if next_rid.is_some() {
171            return Ok(next_rid);
172        }
173
174        if table_page.header.next_page_id == INVALID_PAGE_ID {
175            return Ok(None);
176        }
177        let (_, next_table_page) = self
178            .buffer_pool
179            .fetch_table_page(table_page.header.next_page_id, self.schema.clone())?;
180
181        if next_table_page.header.num_tuples == 0 {
182            return Ok(None);
183        }
184        Ok(Some(RecordId::new(table_page.header.next_page_id, 0)))
185    }
186
187    /// Attempt to reclaim the tuple at `rid` if `predicate` returns true for the current metadata.
188    /// Returns true when the tuple was removed.
189    pub fn vacuum_slot_if<F>(&self, rid: RecordId, predicate: F) -> QuillSQLResult<bool>
190    where
191        F: FnOnce(&TupleMeta) -> bool,
192    {
193        let mut page_guard = self.buffer_pool.fetch_page_write(rid.page_id)?;
194        let mut table_page = TablePageCodec::decode(page_guard.data(), self.schema.clone())?.0;
195        table_page.set_lsn(page_guard.lsn());
196
197        let slot = rid.slot_num as u16;
198        if slot >= table_page.header.num_tuples {
199            return Ok(false);
200        }
201        let meta = table_page.header.tuple_infos[slot as usize].meta;
202        if !predicate(&meta) {
203            return Ok(false);
204        }
205
206        table_page.reclaim_tuple(slot)?;
207        self.write_back_page(&mut page_guard, &mut table_page, None)?;
208        Ok(true)
209    }
210}
211
212#[derive(Debug)]
213pub struct TableIterator {
214    heap: Arc<TableHeap>,
215    start_bound: Bound<RecordId>,
216    end_bound: Bound<RecordId>,
217    cursor: RecordId,
218    started: bool,
219    ended: bool,
220}
221
222impl TableIterator {
223    pub fn new<R: RangeBounds<RecordId>>(heap: Arc<TableHeap>, range: R) -> Self {
224        let start = range.start_bound().cloned();
225        let end = range.end_bound().cloned();
226        Self {
227            heap,
228            start_bound: start,
229            end_bound: end,
230            cursor: INVALID_RID,
231            started: false,
232            ended: false,
233        }
234    }
235
236    pub fn next(&mut self) -> QuillSQLResult<Option<(RecordId, TupleMeta, Tuple)>> {
237        if self.ended {
238            return Ok(None);
239        }
240
241        if self.started {
242            match self.end_bound {
243                Bound::Included(rid) => {
244                    if let Some(next_rid) = self.heap.get_next_rid(self.cursor)? {
245                        self.cursor = next_rid;
246                        if self.cursor == rid {
247                            self.ended = true;
248                        }
249                        let (meta, tuple) = self.heap.full_tuple(self.cursor)?;
250                        Ok(Some((self.cursor, meta, tuple)))
251                    } else {
252                        Ok(None)
253                    }
254                }
255                Bound::Excluded(rid) => {
256                    if let Some(next_rid) = self.heap.get_next_rid(self.cursor)? {
257                        if next_rid == rid {
258                            self.ended = true;
259                            Ok(None)
260                        } else {
261                            self.cursor = next_rid;
262                            let (meta, tuple) = self.heap.full_tuple(self.cursor)?;
263                            Ok(Some((self.cursor, meta, tuple)))
264                        }
265                    } else {
266                        Ok(None)
267                    }
268                }
269                Bound::Unbounded => {
270                    if let Some(next_rid) = self.heap.get_next_rid(self.cursor)? {
271                        self.cursor = next_rid;
272                        let (meta, tuple) = self.heap.full_tuple(self.cursor)?;
273                        Ok(Some((self.cursor, meta, tuple)))
274                    } else {
275                        Ok(None)
276                    }
277                }
278            }
279        } else {
280            self.started = true;
281            match self.start_bound {
282                Bound::Included(rid) => {
283                    self.cursor = rid;
284                    let (meta, tuple) = self.heap.full_tuple(self.cursor)?;
285                    Ok(Some((self.cursor, meta, tuple)))
286                }
287                Bound::Excluded(rid) => {
288                    if let Some(next_rid) = self.heap.get_next_rid(rid)? {
289                        self.cursor = next_rid;
290                        let (meta, tuple) = self.heap.full_tuple(self.cursor)?;
291                        Ok(Some((self.cursor, meta, tuple)))
292                    } else {
293                        self.ended = true;
294                        Ok(None)
295                    }
296                }
297                Bound::Unbounded => {
298                    if let Some(first_rid) = self.heap.get_first_rid()? {
299                        self.cursor = first_rid;
300                        let (meta, tuple) = self.heap.full_tuple(self.cursor)?;
301                        Ok(Some((self.cursor, meta, tuple)))
302                    } else {
303                        self.ended = true;
304                        Ok(None)
305                    }
306                }
307            }
308        }
309    }
310}
311
312#[cfg(test)]
313mod tests {
314
315    use std::sync::Arc;
316    use tempfile::TempDir;
317
318    use crate::buffer::BufferManager;
319    use crate::catalog::{Column, DataType, Schema};
320    use crate::storage::codec::TupleCodec;
321    use crate::storage::disk_manager::DiskManager;
322    use crate::storage::disk_scheduler::DiskScheduler;
323    use crate::storage::page::EMPTY_TUPLE_META;
324    use crate::storage::table_heap::TableIterator;
325    use crate::storage::{table_heap::TableHeap, tuple::Tuple};
326    use crate::utils::scalar::ScalarValue;
327
328    #[test]
329    pub fn test_table_heap_write_tuple_meta() {
330        let temp_dir = TempDir::new().unwrap();
331        let temp_path = temp_dir.path().join("test.db");
332
333        let schema = Arc::new(Schema::new(vec![
334            Column::new("a", DataType::Int8, false),
335            Column::new("b", DataType::Int16, false),
336        ]));
337        let disk_manager = DiskManager::try_new(temp_path).unwrap();
338        let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
339        let buffer_pool = Arc::new(BufferManager::new(1000, disk_scheduler));
340        let table_heap = TableHeap::try_new(schema.clone(), buffer_pool).unwrap();
341
342        let _rid1 = table_heap
343            .insert_tuple(
344                &EMPTY_TUPLE_META,
345                &Tuple::new(schema.clone(), vec![1i8.into(), 1i16.into()]),
346            )
347            .unwrap();
348        let rid2 = table_heap
349            .insert_tuple(
350                &EMPTY_TUPLE_META,
351                &Tuple::new(schema.clone(), vec![2i8.into(), 2i16.into()]),
352            )
353            .unwrap();
354        let _rid3 = table_heap
355            .insert_tuple(
356                &EMPTY_TUPLE_META,
357                &Tuple::new(schema.clone(), vec![3i8.into(), 3i16.into()]),
358            )
359            .unwrap();
360
361        let mut meta = table_heap.tuple_meta(rid2).unwrap();
362        meta.insert_txn_id = 1;
363        table_heap.write_tuple_meta(rid2, meta).unwrap();
364
365        let meta = table_heap.tuple_meta(rid2).unwrap();
366        assert_eq!(meta.insert_txn_id, 1);
367    }
368
369    #[test]
370    pub fn test_table_heap_insert_tuple() {
371        let temp_dir = TempDir::new().unwrap();
372        let temp_path = temp_dir.path().join("test.db");
373
374        let schema = Arc::new(Schema::new(vec![
375            Column::new("a", DataType::Int8, false),
376            Column::new("b", DataType::Int16, false),
377        ]));
378        let disk_manager = DiskManager::try_new(temp_path).unwrap();
379        let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
380        let buffer_pool = Arc::new(BufferManager::new(1000, disk_scheduler));
381        let table_heap = TableHeap::try_new(schema.clone(), buffer_pool).unwrap();
382
383        let meta1 = super::TupleMeta::new(1, 0);
384        let rid1 = table_heap
385            .insert_tuple(
386                &meta1,
387                &Tuple::new(schema.clone(), vec![1i8.into(), 1i16.into()]),
388            )
389            .unwrap();
390        let meta2 = super::TupleMeta::new(2, 0);
391        let rid2 = table_heap
392            .insert_tuple(
393                &meta2,
394                &Tuple::new(schema.clone(), vec![2i8.into(), 2i16.into()]),
395            )
396            .unwrap();
397        let meta3 = super::TupleMeta::new(3, 0);
398        let rid3 = table_heap
399            .insert_tuple(
400                &meta3,
401                &Tuple::new(schema.clone(), vec![3i8.into(), 3i16.into()]),
402            )
403            .unwrap();
404
405        let (meta, tuple) = table_heap.full_tuple(rid1).unwrap();
406        assert_eq!(meta, meta1);
407        assert_eq!(tuple.data, vec![1i8.into(), 1i16.into()]);
408
409        let (meta, tuple) = table_heap.full_tuple(rid2).unwrap();
410        assert_eq!(meta, meta2);
411        assert_eq!(tuple.data, vec![2i8.into(), 2i16.into()]);
412
413        let (meta, tuple) = table_heap.full_tuple(rid3).unwrap();
414        assert_eq!(meta, meta3);
415        assert_eq!(tuple.data, vec![3i8.into(), 3i16.into()]);
416    }
417
418    #[test]
419    pub fn test_table_heap_iterator() {
420        let temp_dir = TempDir::new().unwrap();
421        let temp_path = temp_dir.path().join("test.db");
422
423        let schema = Arc::new(Schema::new(vec![
424            Column::new("a", DataType::Int8, false),
425            Column::new("b", DataType::Int16, false),
426        ]));
427
428        let disk_manager = DiskManager::try_new(temp_path).unwrap();
429        let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
430        let buffer_pool = Arc::new(BufferManager::new(1000, disk_scheduler));
431        let table_heap = Arc::new(TableHeap::try_new(schema.clone(), buffer_pool).unwrap());
432
433        let meta1 = super::TupleMeta::new(1, 0);
434        let rid1 = table_heap
435            .insert_tuple(
436                &meta1,
437                &Tuple::new(schema.clone(), vec![1i8.into(), 1i16.into()]),
438            )
439            .unwrap();
440        let meta2 = super::TupleMeta::new(2, 0);
441        let rid2 = table_heap
442            .insert_tuple(
443                &meta2,
444                &Tuple::new(schema.clone(), vec![2i8.into(), 2i16.into()]),
445            )
446            .unwrap();
447        let meta3 = super::TupleMeta::new(3, 0);
448        let rid3 = table_heap
449            .insert_tuple(
450                &meta3,
451                &Tuple::new(schema.clone(), vec![3i8.into(), 3i16.into()]),
452            )
453            .unwrap();
454
455        let mut iterator = TableIterator::new(table_heap.clone(), ..);
456
457        let (rid, meta, tuple) = iterator.next().unwrap().unwrap();
458        assert_eq!(rid, rid1);
459        assert_eq!(meta, meta1);
460        assert_eq!(tuple.data, vec![1i8.into(), 1i16.into()]);
461
462        let (rid, meta, tuple) = iterator.next().unwrap().unwrap();
463        assert_eq!(rid, rid2);
464        assert_eq!(meta, meta2);
465        assert_eq!(tuple.data, vec![2i8.into(), 2i16.into()]);
466
467        let (rid, meta, tuple) = iterator.next().unwrap().unwrap();
468        assert_eq!(rid, rid3);
469        assert_eq!(meta, meta3);
470        assert_eq!(tuple.data, vec![3i8.into(), 3i16.into()]);
471
472        assert!(iterator.next().unwrap().is_none());
473    }
474
475    #[test]
476    pub fn test_recover_set_tuple_meta_and_bytes() {
477        let temp_dir = TempDir::new().unwrap();
478        let temp_path = temp_dir.path().join("test.db");
479
480        let schema = Arc::new(Schema::new(vec![
481            Column::new("a", DataType::Int8, false),
482            Column::new("b", DataType::Int16, false),
483        ]));
484        let disk_manager = DiskManager::try_new(temp_path).unwrap();
485        let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
486        let buffer_pool = Arc::new(BufferManager::new(128, disk_scheduler));
487        let table_heap = TableHeap::try_new(schema.clone(), buffer_pool.clone()).unwrap();
488
489        // Insert a row
490        let rid = table_heap
491            .insert_tuple(
492                &EMPTY_TUPLE_META,
493                &Tuple::new(schema.clone(), vec![1i8.into(), 10i16.into()]),
494            )
495            .unwrap();
496
497        // Change bytes via recovery API
498        let new_tuple = Tuple::new(schema.clone(), vec![2i8.into(), 20i16.into()]);
499        let new_bytes = TupleCodec::encode(&new_tuple);
500        table_heap
501            .recover_set_tuple_bytes(rid, &new_bytes)
502            .expect("recover bytes");
503
504        // Verify tuple data changed
505        let (_m, t) = table_heap.full_tuple(rid).unwrap();
506        assert_eq!(t.data, vec![2i8.into(), 20i16.into()]);
507
508        // Mark deleted via recovery API and verify
509        let mut meta = table_heap.tuple_meta(rid).unwrap();
510        meta.is_deleted = true;
511        table_heap
512            .recover_set_tuple_meta(rid, meta)
513            .expect("recover meta");
514        let m2 = table_heap.tuple_meta(rid).unwrap();
515        assert!(m2.is_deleted);
516    }
517
518    #[test]
519    pub fn test_recover_repack_on_size_mismatch() {
520        let temp_dir = TempDir::new().unwrap();
521        let temp_path = temp_dir.path().join("test.db");
522
523        let schema = Arc::new(Schema::new(vec![
524            Column::new("a", DataType::Int8, false),
525            Column::new("b", DataType::Int16, false),
526        ]));
527        let disk_manager = DiskManager::try_new(temp_path).unwrap();
528        let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
529        let buffer_pool = Arc::new(BufferManager::new(128, disk_scheduler));
530        let table_heap = TableHeap::try_new(schema.clone(), buffer_pool.clone()).unwrap();
531
532        let rid = table_heap
533            .insert_tuple(
534                &EMPTY_TUPLE_META,
535                &Tuple::new(schema.clone(), vec![1i8.into(), 10i16.into()]),
536            )
537            .unwrap();
538
539        // Create a tuple with different encoded length and recover-set it
540        let larger_tuple = Tuple::new(schema.clone(), vec![99i8.into(), 300i16.into()]);
541        let larger_bytes = TupleCodec::encode(&larger_tuple);
542        table_heap
543            .recover_set_tuple_bytes(rid, &larger_bytes)
544            .expect("recover larger bytes");
545
546        let (_m, t2) = table_heap.full_tuple(rid).unwrap();
547        assert_eq!(t2.data, vec![99i8.into(), 300i16.into()]);
548    }
549
550    #[test]
551    fn vacuum_slot_if_reclaims_tuple() {
552        let temp_dir = TempDir::new().unwrap();
553        let temp_path = temp_dir.path().join("vacuum.db");
554        let schema = Arc::new(Schema::new(vec![Column::new("v", DataType::Int32, false)]));
555        let disk_manager = DiskManager::try_new(temp_path).unwrap();
556        let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
557        let buffer_pool = Arc::new(BufferManager::new(32, disk_scheduler));
558        let heap = TableHeap::try_new(schema.clone(), buffer_pool).unwrap();
559
560        let meta = super::TupleMeta::new(1, 0);
561        let tuple = Tuple::new(schema.clone(), vec![ScalarValue::Int32(Some(5))]);
562        let rid = heap.insert_tuple(&meta, &tuple).unwrap();
563
564        assert!(heap.full_tuple(rid).is_ok());
565        assert!(heap.vacuum_slot_if(rid, |_| true).unwrap());
566        assert!(heap.full_tuple(rid).is_err());
567        assert!(heap.get_first_rid().unwrap().is_none());
568    }
569
570    #[test]
571    fn vacuum_slot_if_respects_predicate() {
572        let temp_dir = TempDir::new().unwrap();
573        let temp_path = temp_dir.path().join("vacuum_predicate.db");
574        let schema = Arc::new(Schema::new(vec![Column::new("v", DataType::Int32, false)]));
575        let disk_manager = DiskManager::try_new(temp_path).unwrap();
576        let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
577        let buffer_pool = Arc::new(BufferManager::new(32, disk_scheduler));
578        let heap = TableHeap::try_new(schema.clone(), buffer_pool).unwrap();
579
580        let meta = super::TupleMeta::new(42, 0);
581        let tuple = Tuple::new(schema.clone(), vec![ScalarValue::Int32(Some(9))]);
582        let rid = heap.insert_tuple(&meta, &tuple).unwrap();
583
584        assert!(!heap
585            .vacuum_slot_if(rid, |current| current.insert_txn_id == 0)
586            .unwrap());
587        let (meta_after, tuple_after) = heap.full_tuple(rid).unwrap();
588        assert_eq!(meta_after.insert_txn_id, 42);
589        assert_eq!(tuple_after, tuple);
590    }
591}