quill_sql/storage/
table_heap.rs

1use crate::buffer::{AtomicPageId, PageId, WritePageGuard, INVALID_PAGE_ID};
2use crate::catalog::{SchemaRef, EMPTY_SCHEMA_REF};
3use crate::config::TableScanConfig;
4use crate::storage::codec::TablePageCodec;
5use crate::storage::disk_scheduler::{DiskCommandResultReceiver, DiskScheduler};
6use crate::storage::page::{RecordId, TablePage, TupleMeta, INVALID_RID};
7use crate::transaction::{CommandId, TransactionId, INVALID_COMMAND_ID};
8use crate::{
9    buffer::BufferManager,
10    error::{QuillSQLError, QuillSQLResult},
11};
12use bytes::BytesMut;
13use std::collections::Bound;
14use std::collections::VecDeque;
15use std::ops::RangeBounds;
16use std::sync::atomic::{AtomicU32, Ordering};
17use std::sync::Arc;
18
19use crate::recovery::wal_record::WalRecordPayload;
20use crate::storage::codec::TupleCodec;
21use crate::storage::heap::wal_codec::{
22    HeapDeletePayload, HeapInsertPayload, HeapRecordPayload, HeapUpdatePayload, RelationIdent,
23    TupleMetaRepr,
24};
25use crate::storage::tuple::Tuple;
26use crate::utils::ring_buffer::RingBuffer;
27
28#[derive(Debug)]
29pub struct TableHeap {
30    pub schema: SchemaRef,
31    pub buffer_pool: Arc<BufferManager>,
32    pub first_page_id: AtomicPageId,
33    pub last_page_id: AtomicPageId,
34}
35
36impl TableHeap {
37    /// Creates a new table heap. This involves allocating an initial page.
38    pub fn try_new(schema: SchemaRef, buffer_pool: Arc<BufferManager>) -> QuillSQLResult<Self> {
39        // new_page() returns a WritePageGuard.
40        let mut first_page_guard = buffer_pool.new_page()?;
41        let first_page_id = first_page_guard.page_id();
42
43        // Initialize the first page as an empty TablePage.
44        let table_page = TablePage::new(schema.clone(), INVALID_PAGE_ID);
45        let encoded_data = TablePageCodec::encode(&table_page);
46
47        // Use DerefMut to get a mutable reference and update the page data.
48        // This also marks the page as dirty automatically.
49        first_page_guard.data_mut().copy_from_slice(&encoded_data);
50        first_page_guard.set_lsn(table_page.lsn());
51
52        // The first_page_guard is dropped here, automatically unpinning the page.
53        Ok(Self {
54            schema,
55            buffer_pool,
56            first_page_id: AtomicU32::new(first_page_id),
57            last_page_id: AtomicU32::new(first_page_id),
58        })
59    }
60
61    fn write_back_page(
62        &self,
63        guard: &mut WritePageGuard,
64        table_page: &mut TablePage,
65    ) -> QuillSQLResult<()> {
66        let new_image = TablePageCodec::encode(table_page);
67        guard.apply_page_image(&new_image)?;
68        Ok(())
69    }
70
71    pub(crate) fn relation_ident(&self) -> RelationIdent {
72        RelationIdent {
73            root_page_id: self.first_page_id.load(Ordering::SeqCst),
74        }
75    }
76
77    fn append_heap_record(&self, payload: HeapRecordPayload) -> QuillSQLResult<()> {
78        if let Some(wal) = self.buffer_pool.wal_manager() {
79            let _wal_result =
80                wal.append_record_with(|_| WalRecordPayload::Heap(payload.clone()))?;
81        }
82        Ok(())
83    }
84
85    /// Inserts a tuple into the table.
86    ///
87    /// This function inserts the given tuple into the table. If the last page in the table
88    /// has enough space for the tuple, it is inserted there. Otherwise, a new page is allocated
89    /// and the tuple is inserted there.
90    ///
91    /// Parameters:
92    /// - `meta`: The metadata associated with the tuple.
93    /// - `tuple`: The tuple to be inserted.
94    ///
95    /// Returns:
96    /// An `Option` containing the `Rid` of the inserted tuple if successful, otherwise `None`.
97    /// Inserts a tuple into the table.
98    pub fn insert_tuple(&self, meta: &TupleMeta, tuple: &Tuple) -> QuillSQLResult<RecordId> {
99        let mut current_page_id = self.last_page_id.load(Ordering::SeqCst);
100
101        loop {
102            let mut current_page_guard = self.buffer_pool.fetch_page_write(current_page_id)?;
103            let mut table_page =
104                TablePageCodec::decode(current_page_guard.data(), self.schema.clone())?.0;
105            table_page.set_lsn(current_page_guard.lsn());
106
107            if table_page.next_tuple_offset(tuple).is_ok() {
108                let tuple_bytes = TupleCodec::encode(tuple);
109                let slot_id = table_page.insert_tuple(meta, tuple)?;
110                let relation = self.relation_ident();
111                let tuple_meta = TupleMetaRepr::from(*meta);
112                let payload = HeapRecordPayload::Insert(HeapInsertPayload {
113                    relation,
114                    page_id: current_page_id,
115                    slot_id,
116                    op_txn_id: meta.insert_txn_id,
117                    tuple_meta,
118                    tuple_data: tuple_bytes,
119                });
120                self.append_heap_record(payload.clone())?;
121                self.write_back_page(&mut current_page_guard, &mut table_page)?;
122                return Ok(RecordId::new(current_page_id, slot_id as u32));
123            }
124
125            let mut new_page_guard = self.buffer_pool.new_page()?;
126            let new_page_id = new_page_guard.page_id();
127            let mut new_table_page = TablePage::new(self.schema.clone(), INVALID_PAGE_ID);
128            self.write_back_page(&mut new_page_guard, &mut new_table_page)?;
129
130            table_page.header.next_page_id = new_page_id;
131            self.write_back_page(&mut current_page_guard, &mut table_page)?;
132            drop(current_page_guard);
133
134            self.last_page_id.store(new_page_id, Ordering::SeqCst);
135            current_page_id = new_page_id;
136        }
137    }
138
139    /// MVCC helper: insert a new tuple version with optional link to a previous version.
140    pub fn mvcc_insert_version(
141        &self,
142        tuple: &Tuple,
143        txn_id: TransactionId,
144        cid: CommandId,
145        prev_version: Option<RecordId>,
146    ) -> QuillSQLResult<(RecordId, TupleMeta)> {
147        let mut meta = TupleMeta::new(txn_id, cid);
148        meta.set_prev_version(prev_version);
149        let rid = self.insert_tuple(&meta, tuple)?;
150        Ok((rid, meta))
151    }
152
153    /// Create a new tuple version for UPDATE while marking the source version deleted.
154    /// Returns the RecordId of the new version and the original metadata (pre-change) for undo.
155    pub fn mvcc_update(
156        &self,
157        current_rid: RecordId,
158        new_tuple: Tuple,
159        txn_id: TransactionId,
160        cid: CommandId,
161    ) -> QuillSQLResult<(RecordId, TupleMeta)> {
162        let (current_meta, _existing_tuple) = self.full_tuple(current_rid)?;
163        if current_meta.is_deleted && current_meta.next_version.is_some() {
164            return Err(QuillSQLError::Execution(format!(
165                "tuple {} has already been updated",
166                current_rid
167            )));
168        }
169
170        // Capture previous metadata for undo before mutation.
171        let prev_meta = current_meta;
172
173        // Insert new physical version linked to the current record.
174        let (new_rid, mut new_meta) =
175            self.mvcc_insert_version(&new_tuple, txn_id, cid, Some(current_rid))?;
176        new_meta.set_prev_version(Some(current_rid));
177
178        // Update the source version metadata to mark it deleted and link forward.
179        let mut updated_meta = current_meta;
180        updated_meta.mark_deleted(txn_id, cid);
181        updated_meta.set_next_version(Some(new_rid));
182        self.update_tuple_meta(updated_meta, current_rid)?;
183
184        Ok((new_rid, prev_meta))
185    }
186
187    /// Mark a tuple version as deleted for MVCC-aware DELETE operations.
188    /// Returns the previous metadata prior to marking deleted (for undo purposes).
189    pub fn mvcc_mark_deleted(
190        &self,
191        rid: RecordId,
192        txn_id: TransactionId,
193        cid: CommandId,
194    ) -> QuillSQLResult<TupleMeta> {
195        let (mut current_meta, _) = self.full_tuple(rid)?;
196        if current_meta.is_deleted {
197            return Ok(current_meta);
198        }
199        let prev_meta = current_meta;
200        current_meta.mark_deleted(txn_id, cid);
201        self.update_tuple_meta(current_meta, rid)?;
202        Ok(prev_meta)
203    }
204
205    /// Remove a heap version created by an aborted operation. For safety this currently
206    /// expects the version to live at the tail of the page; otherwise it simply marks the slot
207    /// deleted and leaves full reclamation to vacuum.
208    #[allow(dead_code)]
209    pub fn mvcc_remove_version(&self, rid: RecordId) -> QuillSQLResult<()> {
210        let mut page_guard = self.buffer_pool.fetch_page_write(rid.page_id)?;
211        let mut table_page = TablePageCodec::decode(page_guard.data(), self.schema.clone())?.0;
212        table_page.set_lsn(page_guard.lsn());
213
214        let slot = rid.slot_num as usize;
215        if slot >= table_page.header.num_tuples as usize {
216            return Ok(());
217        }
218
219        if slot + 1 != table_page.header.num_tuples as usize {
220            // Non-tail removal: mark deleted and let vacuum reclaim later.
221            let mut meta = table_page.header.tuple_infos[slot].meta;
222            if !meta.is_deleted {
223                meta.mark_deleted(0, INVALID_COMMAND_ID);
224                table_page.update_tuple_meta(meta, slot as u16)?;
225                self.write_back_page(&mut page_guard, &mut table_page)?;
226            }
227            return Ok(());
228        }
229
230        table_page.reclaim_tuple(slot as u16)?;
231        self.write_back_page(&mut page_guard, &mut table_page)
232    }
233
234    pub fn update_tuple(&self, rid: RecordId, tuple: Tuple) -> QuillSQLResult<()> {
235        let mut page_guard = self.buffer_pool.fetch_page_write(rid.page_id)?;
236        let mut table_page = TablePageCodec::decode(page_guard.data(), self.schema.clone())?.0;
237        table_page.set_lsn(page_guard.lsn());
238
239        let slot = rid.slot_num as u16;
240        let (old_meta, old_tuple) = table_page.tuple(slot)?;
241        let old_tuple_bytes = TupleCodec::encode(&old_tuple);
242        let new_tuple_bytes = TupleCodec::encode(&tuple);
243        table_page.update_tuple(tuple, rid.slot_num as u16)?;
244        let new_meta = table_page.header.tuple_infos[slot as usize].meta;
245        let relation = self.relation_ident();
246        self.append_heap_record(HeapRecordPayload::Update(HeapUpdatePayload {
247            relation,
248            page_id: rid.page_id,
249            slot_id: slot,
250            op_txn_id: new_meta.insert_txn_id,
251            new_tuple_meta: TupleMetaRepr::from(new_meta),
252            new_tuple_data: new_tuple_bytes,
253            old_tuple_meta: Some(TupleMetaRepr::from(old_meta)),
254            old_tuple_data: Some(old_tuple_bytes),
255        }))?;
256        self.write_back_page(&mut page_guard, &mut table_page)
257    }
258
259    pub fn update_tuple_meta(&self, meta: TupleMeta, rid: RecordId) -> QuillSQLResult<()> {
260        let mut page_guard = self.buffer_pool.fetch_page_write(rid.page_id)?;
261        let mut table_page = TablePageCodec::decode(page_guard.data(), self.schema.clone())?.0;
262        table_page.set_lsn(page_guard.lsn());
263
264        let slot = rid.slot_num as u16;
265        let (old_meta, old_tuple) = table_page.tuple(slot)?;
266        let old_tuple_bytes = TupleCodec::encode(&old_tuple);
267        table_page.update_tuple_meta(meta, slot)?;
268        let relation = self.relation_ident();
269        let payload = if meta.is_deleted && !old_meta.is_deleted {
270            HeapRecordPayload::Delete(HeapDeletePayload {
271                relation,
272                page_id: rid.page_id,
273                slot_id: slot,
274                op_txn_id: meta.delete_txn_id,
275                old_tuple_meta: TupleMetaRepr::from(old_meta),
276                old_tuple_data: Some(old_tuple_bytes),
277            })
278        } else {
279            let (_, current_tuple) = table_page.tuple(slot)?;
280            let new_tuple_bytes = TupleCodec::encode(&current_tuple);
281            HeapRecordPayload::Update(HeapUpdatePayload {
282                relation,
283                page_id: rid.page_id,
284                slot_id: slot,
285                op_txn_id: meta.insert_txn_id,
286                new_tuple_meta: TupleMetaRepr::from(meta),
287                new_tuple_data: new_tuple_bytes,
288                old_tuple_meta: Some(TupleMetaRepr::from(old_meta)),
289                old_tuple_data: Some(old_tuple_bytes),
290            })
291        };
292        self.append_heap_record(payload)?;
293        self.write_back_page(&mut page_guard, &mut table_page)
294    }
295
296    pub fn full_tuple(&self, rid: RecordId) -> QuillSQLResult<(TupleMeta, Tuple)> {
297        let (_, table_page) = self
298            .buffer_pool
299            .fetch_table_page(rid.page_id, self.schema.clone())?;
300        let result = table_page.tuple(rid.slot_num as u16)?;
301        Ok(result)
302    }
303
304    pub fn tuple(&self, rid: RecordId) -> QuillSQLResult<Tuple> {
305        let (_meta, tuple) = self.full_tuple(rid)?;
306        Ok(tuple)
307    }
308
309    pub fn tuple_meta(&self, rid: RecordId) -> QuillSQLResult<TupleMeta> {
310        let (meta, _tuple) = self.full_tuple(rid)?;
311        Ok(meta)
312    }
313
314    pub fn get_first_rid(&self) -> QuillSQLResult<Option<RecordId>> {
315        let first_page_id = self.first_page_id.load(Ordering::SeqCst);
316        let (_, table_page) = self
317            .buffer_pool
318            .fetch_table_page(first_page_id, self.schema.clone())?;
319
320        if table_page.header.num_tuples == 0 {
321            Ok(None)
322        } else {
323            Ok(Some(RecordId::new(first_page_id, 0)))
324        }
325    }
326
327    pub fn get_next_rid(&self, rid: RecordId) -> QuillSQLResult<Option<RecordId>> {
328        let (_, table_page) = self
329            .buffer_pool
330            .fetch_table_page(rid.page_id, self.schema.clone())?;
331        let next_rid = table_page.get_next_rid(&rid);
332        if next_rid.is_some() {
333            return Ok(next_rid);
334        }
335
336        if table_page.header.next_page_id == INVALID_PAGE_ID {
337            return Ok(None);
338        }
339        let (_, next_table_page) = self
340            .buffer_pool
341            .fetch_table_page(table_page.header.next_page_id, self.schema.clone())?;
342
343        if next_table_page.header.num_tuples == 0 {
344            return Ok(None);
345        }
346        Ok(Some(RecordId::new(table_page.header.next_page_id, 0)))
347    }
348
349    /// Construct a lightweight TableHeap view for recovery operations.
350    /// This instance uses an empty schema and does not rely on first/last page ids.
351    pub fn recovery_view(buffer_pool: Arc<BufferManager>) -> Self {
352        Self {
353            schema: EMPTY_SCHEMA_REF.clone(),
354            buffer_pool,
355            first_page_id: AtomicU32::new(0),
356            last_page_id: AtomicU32::new(0),
357        }
358    }
359
360    /// Recovery-only API: set tuple meta without emitting WAL.
361    /// Only used by RecoveryManager during UNDO.
362    pub fn recover_set_tuple_meta(&self, rid: RecordId, meta: TupleMeta) -> QuillSQLResult<()> {
363        let mut guard = self.buffer_pool.fetch_page_write(rid.page_id)?;
364        let (mut header, hdr_len) =
365            crate::storage::codec::TablePageHeaderCodec::decode(guard.data())?;
366        if (rid.slot_num as usize) >= header.tuple_infos.len() {
367            return Ok(());
368        }
369        let info = &mut header.tuple_infos[rid.slot_num as usize];
370        if info.meta.is_deleted != meta.is_deleted {
371            if meta.is_deleted {
372                header.num_deleted_tuples = header.num_deleted_tuples.saturating_add(1);
373            } else {
374                header.num_deleted_tuples = header.num_deleted_tuples.saturating_sub(1);
375            }
376        }
377        info.meta = meta;
378        let new_header = crate::storage::codec::TablePageHeaderCodec::encode(&header);
379        let copy_len = std::cmp::min(hdr_len, new_header.len());
380        guard.data_mut()[0..copy_len].copy_from_slice(&new_header[..copy_len]);
381        guard.mark_dirty();
382        Ok(())
383    }
384
385    /// Recovery-only API: set tuple raw bytes without emitting WAL.
386    /// If size mismatches, repack tuple area and update offsets.
387    pub fn recover_set_tuple_bytes(&self, rid: RecordId, new_bytes: &[u8]) -> QuillSQLResult<()> {
388        let mut guard = self.buffer_pool.fetch_page_write(rid.page_id)?;
389        let (mut header, _hdr_len) =
390            crate::storage::codec::TablePageHeaderCodec::decode(guard.data())?;
391        if (rid.slot_num as usize) >= header.tuple_infos.len() {
392            return Ok(());
393        }
394        let slot = rid.slot_num as usize;
395        let info = &mut header.tuple_infos[slot];
396        let off = info.offset as usize;
397        let sz = info.size as usize;
398        if new_bytes.len() == sz {
399            if off + sz <= crate::buffer::PAGE_SIZE {
400                guard.data_mut()[off..off + sz].copy_from_slice(new_bytes);
401            }
402            guard.mark_dirty();
403            return Ok(());
404        }
405        let n = header.tuple_infos.len();
406        let mut tuples: Vec<Vec<u8>> = Vec::with_capacity(n);
407        for i in 0..n {
408            let inf = &header.tuple_infos[i];
409            let s = &guard.data()[inf.offset as usize..(inf.offset + inf.size) as usize];
410            if i == slot {
411                tuples.push(new_bytes.to_vec());
412            } else {
413                tuples.push(s.to_vec());
414            }
415        }
416        let mut tail = crate::buffer::PAGE_SIZE;
417        for i in 0..n {
418            let sz = tuples[i].len();
419            tail = tail.saturating_sub(sz);
420            header.tuple_infos[i].offset = tail as u16;
421            header.tuple_infos[i].size = sz as u16;
422        }
423        let new_header = crate::storage::codec::TablePageHeaderCodec::encode(&header);
424        for b in guard.data_mut().iter_mut() {
425            *b = 0;
426        }
427        let hdr_copy = std::cmp::min(new_header.len(), crate::buffer::PAGE_SIZE);
428        guard.data_mut()[0..hdr_copy].copy_from_slice(&new_header[..hdr_copy]);
429        for i in 0..n {
430            let off = header.tuple_infos[i].offset as usize;
431            let sz = header.tuple_infos[i].size as usize;
432            if off + sz <= crate::buffer::PAGE_SIZE {
433                guard.data_mut()[off..off + sz].copy_from_slice(&tuples[i][..sz]);
434            }
435        }
436        guard.mark_dirty();
437        Ok(())
438    }
439
440    pub fn recover_restore_tuple(
441        &self,
442        rid: RecordId,
443        meta: TupleMeta,
444        tuple: &Tuple,
445    ) -> QuillSQLResult<()> {
446        let bytes = TupleCodec::encode(tuple);
447        self.recover_set_tuple_bytes(rid, &bytes)?;
448        self.recover_set_tuple_meta(rid, meta)
449    }
450
451    pub fn recover_delete_tuple(
452        &self,
453        rid: RecordId,
454        txn_id: TransactionId,
455        cid: CommandId,
456    ) -> QuillSQLResult<()> {
457        let mut meta = self.tuple_meta(rid)?;
458        if meta.is_deleted {
459            return Ok(());
460        }
461        meta.mark_deleted(txn_id, cid);
462        self.recover_set_tuple_meta(rid, meta)
463    }
464
465    pub fn delete_tuple(
466        &self,
467        rid: RecordId,
468        txn_id: TransactionId,
469        cid: CommandId,
470    ) -> QuillSQLResult<()> {
471        let _ = self.mvcc_mark_deleted(rid, txn_id, cid)?;
472        Ok(())
473    }
474
475    /// Attempt to reclaim the tuple at `rid` if `predicate` returns true for the current metadata.
476    /// Returns true when the tuple was removed.
477    pub fn vacuum_slot_if<F>(&self, rid: RecordId, predicate: F) -> QuillSQLResult<bool>
478    where
479        F: FnOnce(&TupleMeta) -> bool,
480    {
481        let mut page_guard = self.buffer_pool.fetch_page_write(rid.page_id)?;
482        let mut table_page = TablePageCodec::decode(page_guard.data(), self.schema.clone())?.0;
483        table_page.set_lsn(page_guard.lsn());
484
485        let slot = rid.slot_num as u16;
486        if slot >= table_page.header.num_tuples {
487            return Ok(false);
488        }
489        let meta = table_page.header.tuple_infos[slot as usize].meta;
490        if !predicate(&meta) {
491            return Ok(false);
492        }
493
494        table_page.reclaim_tuple(slot)?;
495        self.write_back_page(&mut page_guard, &mut table_page)?;
496        Ok(true)
497    }
498}
499
500#[derive(Debug)]
501pub struct TableIterator {
502    heap: Arc<TableHeap>,
503    start_bound: Bound<RecordId>,
504    end_bound: Bound<RecordId>,
505    cursor: RecordId,
506    started: bool,
507    ended: bool,
508    strategy: ScanStrategy,
509}
510
511#[derive(Debug)]
512enum ScanStrategy {
513    /// Existing behavior: go through buffer pool (page_table/LRU-K)
514    Cached,
515    /// Streaming with generic ring buffer holding decoded tuples
516    Streaming(StreamScanState),
517}
518
519#[derive(Debug)]
520struct StreamScanState {
521    ring: RingBuffer<(RecordId, TupleMeta, Tuple)>,
522    first_page: bool,
523    prefetch: StreamPrefetchState,
524}
525
526#[derive(Debug)]
527struct StreamPrefetchState {
528    pending: VecDeque<PageId>,
529    inflight: VecDeque<StreamBatch>,
530    ready: VecDeque<(PageId, BytesMut)>,
531    readahead: usize,
532    exhausted: bool,
533}
534
535#[derive(Debug)]
536struct StreamBatch {
537    page_ids: Vec<PageId>,
538    rx: DiskCommandResultReceiver<Vec<BytesMut>>,
539}
540
541impl StreamPrefetchState {
542    fn ensure_ready(&mut self, scheduler: &Arc<DiskScheduler>) -> QuillSQLResult<()> {
543        while !self.exhausted && self.ready.is_empty() {
544            let capacity = self
545                .readahead
546                .saturating_sub(self.ready.len() + self.inflight.len());
547            if capacity > 0 && !self.pending.is_empty() {
548                self.schedule_batch(scheduler, capacity)?;
549                continue;
550            }
551            if let Some(batch) = self.inflight.pop_front() {
552                let buffers = batch.rx.recv().map_err(|e| {
553                    QuillSQLError::Internal(format!("DiskScheduler channel disconnected: {}", e))
554                })??;
555                for (pid, bytes) in batch.page_ids.into_iter().zip(buffers.into_iter()) {
556                    self.ready.push_back((pid, bytes));
557                }
558            } else {
559                self.exhausted = true;
560            }
561        }
562        Ok(())
563    }
564
565    fn maybe_schedule(&mut self, scheduler: &Arc<DiskScheduler>) -> QuillSQLResult<()> {
566        if self.exhausted {
567            return Ok(());
568        }
569        let capacity = self
570            .readahead
571            .saturating_sub(self.ready.len() + self.inflight.len());
572        if capacity == 0 || self.pending.is_empty() {
573            return Ok(());
574        }
575        self.schedule_batch(scheduler, capacity)
576    }
577
578    fn schedule_batch(
579        &mut self,
580        scheduler: &Arc<DiskScheduler>,
581        limit: usize,
582    ) -> QuillSQLResult<()> {
583        let mut ids = Vec::with_capacity(limit);
584        while ids.len() < limit {
585            if let Some(pid) = self.pending.pop_front() {
586                ids.push(pid);
587            } else {
588                break;
589            }
590        }
591        if ids.is_empty() {
592            return Ok(());
593        }
594        let rx = scheduler.schedule_read_pages(ids.clone())?;
595        self.inflight.push_back(StreamBatch { page_ids: ids, rx });
596        Ok(())
597    }
598}
599
600impl TableIterator {
601    pub fn new<R: RangeBounds<RecordId>>(heap: Arc<TableHeap>, range: R) -> Self {
602        Self::new_with_hint(heap, range, None)
603    }
604
605    pub fn new_with_hint<R: RangeBounds<RecordId>>(
606        heap: Arc<TableHeap>,
607        range: R,
608        streaming_hint: Option<bool>,
609    ) -> Self {
610        let start = range.start_bound().cloned();
611        let end = range.end_bound().cloned();
612
613        // Centralized config (remove env): use TableScanConfig defaults
614        let cfg = TableScanConfig::default();
615        let pool_quarter = (heap.buffer_pool.buffer_pool().capacity().max(1) / 4) as u32;
616        let threshold: u32 = cfg.stream_threshold_pages.unwrap_or(pool_quarter.max(1));
617        let readahead: usize = cfg.readahead_pages;
618
619        let approx_pages = heap
620            .last_page_id
621            .load(Ordering::SeqCst)
622            .saturating_sub(heap.first_page_id.load(Ordering::SeqCst))
623            + 1;
624
625        let is_full_scan = matches!(start, Bound::Unbounded) && matches!(end, Bound::Unbounded);
626
627        // Requested streaming decision
628        let requested_stream = match streaming_hint {
629            Some(true) => true,
630            Some(false) => false,
631            None => cfg.stream_scan_enable || approx_pages >= threshold,
632        };
633        // If explicitly hinted true, allow streaming even for ranged scans. Otherwise only for full scan.
634        let use_streaming = if matches!(streaming_hint, Some(true)) {
635            true
636        } else {
637            is_full_scan && requested_stream
638        };
639
640        let strategy = if use_streaming {
641            let tuple_ring_cap = readahead.max(1).saturating_mul(1024);
642            let ring = RingBuffer::with_capacity(tuple_ring_cap);
643            let default_first = heap.first_page_id.load(Ordering::SeqCst);
644            let start_pid = match &start {
645                Bound::Included(r) | Bound::Excluded(r) => r.page_id,
646                Bound::Unbounded => default_first,
647            };
648            let mut pending = VecDeque::new();
649            let exhausted = if start_pid == INVALID_PAGE_ID {
650                true
651            } else {
652                pending.push_back(start_pid);
653                false
654            };
655            let prefetch = StreamPrefetchState {
656                pending,
657                inflight: VecDeque::new(),
658                ready: VecDeque::new(),
659                readahead: readahead.max(1),
660                exhausted,
661            };
662            ScanStrategy::Streaming(StreamScanState {
663                ring,
664                first_page: true,
665                prefetch,
666            })
667        } else {
668            ScanStrategy::Cached
669        };
670
671        Self {
672            heap,
673            start_bound: start,
674            end_bound: end,
675            cursor: INVALID_RID,
676            started: false,
677            ended: false,
678            strategy,
679        }
680    }
681
682    pub fn next(&mut self) -> QuillSQLResult<Option<(RecordId, TupleMeta, Tuple)>> {
683        if self.ended {
684            return Ok(None);
685        }
686
687        // Streaming now supports bounded scans; no unconditional fallback required here.
688
689        // Clone refs needed by streaming helper before borrowing self.strategy mutably
690        let heap_arc = self.heap.clone();
691        let schema = self.heap.schema.clone();
692        let start_bound_cloned = self.start_bound.clone();
693
694        // Streaming strategy (only supports full scan). For other ranges fallback to Cached.
695        if let ScanStrategy::Streaming(state) = &mut self.strategy {
696            // Initialize on first call
697            if !self.started {
698                self.started = true;
699                if state.prefetch.exhausted {
700                    self.ended = true;
701                    return Ok(None);
702                }
703                // Ensure disk visibility for streaming reads
704                self.heap.buffer_pool.flush_all_pages()?;
705                fill_stream_ring(&heap_arc, schema.clone(), &start_bound_cloned, state)?;
706            }
707
708            loop {
709                if let Some((rid, meta, tuple)) = state.ring.pop() {
710                    // Respect end bound
711                    match &self.end_bound {
712                        Bound::Unbounded => return Ok(Some((rid, meta, tuple))),
713                        Bound::Included(end) => {
714                            if rid == *end {
715                                self.ended = true;
716                            }
717                            return Ok(Some((rid, meta, tuple)));
718                        }
719                        Bound::Excluded(end) => {
720                            if rid == *end {
721                                self.ended = true;
722                                return Ok(None);
723                            }
724                            return Ok(Some((rid, meta, tuple)));
725                        }
726                    }
727                } else {
728                    if state.prefetch.exhausted {
729                        self.ended = true;
730                        return Ok(None);
731                    }
732                    fill_stream_ring(&heap_arc, schema.clone(), &start_bound_cloned, state)?;
733                    if state.ring.is_empty() {
734                        if state.prefetch.exhausted {
735                            self.ended = true;
736                        }
737                        return Ok(None);
738                    }
739                }
740            }
741        }
742
743        // Cached strategy (original)
744        if self.started {
745            match self.end_bound {
746                Bound::Included(rid) => {
747                    if let Some(next_rid) = self.heap.get_next_rid(self.cursor)? {
748                        self.cursor = next_rid;
749                        if self.cursor == rid {
750                            self.ended = true;
751                        }
752                        let (meta, tuple) = self.heap.full_tuple(self.cursor)?;
753                        Ok(Some((self.cursor, meta, tuple)))
754                    } else {
755                        Ok(None)
756                    }
757                }
758                Bound::Excluded(rid) => {
759                    if let Some(next_rid) = self.heap.get_next_rid(self.cursor)? {
760                        if next_rid == rid {
761                            self.ended = true;
762                            Ok(None)
763                        } else {
764                            self.cursor = next_rid;
765                            let (meta, tuple) = self.heap.full_tuple(self.cursor)?;
766                            Ok(Some((self.cursor, meta, tuple)))
767                        }
768                    } else {
769                        Ok(None)
770                    }
771                }
772                Bound::Unbounded => {
773                    if let Some(next_rid) = self.heap.get_next_rid(self.cursor)? {
774                        self.cursor = next_rid;
775                        let (meta, tuple) = self.heap.full_tuple(self.cursor)?;
776                        Ok(Some((self.cursor, meta, tuple)))
777                    } else {
778                        Ok(None)
779                    }
780                }
781            }
782        } else {
783            self.started = true;
784            match self.start_bound {
785                Bound::Included(rid) => {
786                    self.cursor = rid;
787                    let (meta, tuple) = self.heap.full_tuple(self.cursor)?;
788                    Ok(Some((self.cursor, meta, tuple)))
789                }
790                Bound::Excluded(rid) => {
791                    if let Some(next_rid) = self.heap.get_next_rid(rid)? {
792                        self.cursor = next_rid;
793                        let (meta, tuple) = self.heap.full_tuple(self.cursor)?;
794                        Ok(Some((self.cursor, meta, tuple)))
795                    } else {
796                        self.ended = true;
797                        Ok(None)
798                    }
799                }
800                Bound::Unbounded => {
801                    if let Some(first_rid) = self.heap.get_first_rid()? {
802                        self.cursor = first_rid;
803                        let (meta, tuple) = self.heap.full_tuple(self.cursor)?;
804                        Ok(Some((self.cursor, meta, tuple)))
805                    } else {
806                        self.ended = true;
807                        Ok(None)
808                    }
809                }
810            }
811        }
812    }
813}
814
815fn fill_stream_ring(
816    heap: &Arc<TableHeap>,
817    schema: SchemaRef,
818    start_bound: &Bound<RecordId>,
819    state: &mut StreamScanState,
820) -> QuillSQLResult<()> {
821    let scheduler = heap.buffer_pool.buffer_pool().disk_scheduler();
822    while state.ring.len() < state.ring.capacity() && !state.prefetch.exhausted {
823        state.prefetch.ensure_ready(&scheduler)?;
824        let Some((pid, bytes)) = state.prefetch.ready.pop_front() else {
825            break;
826        };
827
828        let (page, _) = TablePageCodec::decode(&bytes, schema.clone())?;
829        if page.header.next_page_id != INVALID_PAGE_ID {
830            state.prefetch.pending.push_back(page.header.next_page_id);
831        }
832        state.prefetch.maybe_schedule(&scheduler)?;
833
834        let start_slot = if state.first_page {
835            state.first_page = false;
836            match start_bound {
837                Bound::Included(r) if r.page_id == pid => r.slot_num as usize,
838                Bound::Excluded(r) if r.page_id == pid => r.slot_num as usize + 1,
839                _ => 0,
840            }
841        } else {
842            0
843        };
844
845        for slot in start_slot..page.header.num_tuples as usize {
846            let rid = RecordId::new(pid, slot as u32);
847            let (meta, tuple) = page.tuple(slot as u16)?;
848            state.ring.push((rid, meta, tuple));
849            if state.ring.len() >= state.ring.capacity() {
850                break;
851            }
852        }
853    }
854    Ok(())
855}
856
857#[cfg(test)]
858mod tests {
859
860    use std::sync::Arc;
861    use tempfile::TempDir;
862
863    use crate::buffer::BufferManager;
864    use crate::catalog::{Column, DataType, Schema};
865    use crate::storage::codec::TupleCodec;
866    use crate::storage::disk_manager::DiskManager;
867    use crate::storage::disk_scheduler::DiskScheduler;
868    use crate::storage::page::EMPTY_TUPLE_META;
869    use crate::storage::table_heap::TableIterator;
870    use crate::storage::{table_heap::TableHeap, tuple::Tuple};
871    use crate::utils::scalar::ScalarValue;
872
873    #[test]
874    pub fn test_table_heap_update_tuple_meta() {
875        let temp_dir = TempDir::new().unwrap();
876        let temp_path = temp_dir.path().join("test.db");
877
878        let schema = Arc::new(Schema::new(vec![
879            Column::new("a", DataType::Int8, false),
880            Column::new("b", DataType::Int16, false),
881        ]));
882        let disk_manager = DiskManager::try_new(temp_path).unwrap();
883        let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
884        let buffer_pool = Arc::new(BufferManager::new(1000, disk_scheduler));
885        let table_heap = TableHeap::try_new(schema.clone(), buffer_pool).unwrap();
886
887        let _rid1 = table_heap
888            .insert_tuple(
889                &EMPTY_TUPLE_META,
890                &Tuple::new(schema.clone(), vec![1i8.into(), 1i16.into()]),
891            )
892            .unwrap();
893        let rid2 = table_heap
894            .insert_tuple(
895                &EMPTY_TUPLE_META,
896                &Tuple::new(schema.clone(), vec![2i8.into(), 2i16.into()]),
897            )
898            .unwrap();
899        let _rid3 = table_heap
900            .insert_tuple(
901                &EMPTY_TUPLE_META,
902                &Tuple::new(schema.clone(), vec![3i8.into(), 3i16.into()]),
903            )
904            .unwrap();
905
906        let mut meta = table_heap.tuple_meta(rid2).unwrap();
907        meta.insert_txn_id = 1;
908        meta.mark_deleted(2, 0);
909        meta.is_deleted = true;
910        table_heap.update_tuple_meta(meta, rid2).unwrap();
911
912        let meta = table_heap.tuple_meta(rid2).unwrap();
913        assert_eq!(meta.insert_txn_id, 1);
914        assert_eq!(meta.delete_txn_id, 2);
915        assert_eq!(meta.delete_cid, 0);
916        assert!(meta.is_deleted);
917    }
918
919    #[test]
920    pub fn test_table_heap_insert_tuple() {
921        let temp_dir = TempDir::new().unwrap();
922        let temp_path = temp_dir.path().join("test.db");
923
924        let schema = Arc::new(Schema::new(vec![
925            Column::new("a", DataType::Int8, false),
926            Column::new("b", DataType::Int16, false),
927        ]));
928        let disk_manager = DiskManager::try_new(temp_path).unwrap();
929        let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
930        let buffer_pool = Arc::new(BufferManager::new(1000, disk_scheduler));
931        let table_heap = TableHeap::try_new(schema.clone(), buffer_pool).unwrap();
932
933        let meta1 = super::TupleMeta::new(1, 0);
934        let rid1 = table_heap
935            .insert_tuple(
936                &meta1,
937                &Tuple::new(schema.clone(), vec![1i8.into(), 1i16.into()]),
938            )
939            .unwrap();
940        let meta2 = super::TupleMeta::new(2, 0);
941        let rid2 = table_heap
942            .insert_tuple(
943                &meta2,
944                &Tuple::new(schema.clone(), vec![2i8.into(), 2i16.into()]),
945            )
946            .unwrap();
947        let meta3 = super::TupleMeta::new(3, 0);
948        let rid3 = table_heap
949            .insert_tuple(
950                &meta3,
951                &Tuple::new(schema.clone(), vec![3i8.into(), 3i16.into()]),
952            )
953            .unwrap();
954
955        let (meta, tuple) = table_heap.full_tuple(rid1).unwrap();
956        assert_eq!(meta, meta1);
957        assert_eq!(tuple.data, vec![1i8.into(), 1i16.into()]);
958
959        let (meta, tuple) = table_heap.full_tuple(rid2).unwrap();
960        assert_eq!(meta, meta2);
961        assert_eq!(tuple.data, vec![2i8.into(), 2i16.into()]);
962
963        let (meta, tuple) = table_heap.full_tuple(rid3).unwrap();
964        assert_eq!(meta, meta3);
965        assert_eq!(tuple.data, vec![3i8.into(), 3i16.into()]);
966    }
967
968    #[test]
969    fn mvcc_update_creates_version_chain() {
970        let temp_dir = TempDir::new().unwrap();
971        let temp_path = temp_dir.path().join("mvcc_test.db");
972
973        let schema = Arc::new(Schema::new(vec![
974            Column::new("id", DataType::Int32, false),
975            Column::new("val", DataType::Int32, false),
976        ]));
977        let disk_manager = DiskManager::try_new(temp_path).unwrap();
978        let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
979        let buffer_pool = Arc::new(BufferManager::new(256, disk_scheduler));
980        let table_heap = TableHeap::try_new(schema.clone(), buffer_pool).unwrap();
981
982        let base_tuple = Tuple::new(
983            schema.clone(),
984            vec![ScalarValue::Int32(Some(1)), ScalarValue::Int32(Some(10))],
985        );
986        let rid = table_heap
987            .insert_tuple(&super::TupleMeta::new(1, 0), &base_tuple)
988            .expect("insert base");
989
990        let updated_tuple = Tuple::new(
991            schema.clone(),
992            vec![ScalarValue::Int32(Some(1)), ScalarValue::Int32(Some(20))],
993        );
994        let (new_rid, _) = table_heap
995            .mvcc_update(rid, updated_tuple, 2, 0)
996            .expect("mvcc update");
997
998        let old_meta = table_heap.tuple_meta(rid).expect("old meta");
999        assert!(old_meta.is_deleted);
1000        assert_eq!(old_meta.next_version, Some(new_rid));
1001
1002        let new_meta = table_heap.tuple_meta(new_rid).expect("new meta");
1003        assert_eq!(new_meta.prev_version, Some(rid));
1004        assert!(!new_meta.is_deleted);
1005    }
1006
1007    #[test]
1008    pub fn test_table_heap_iterator() {
1009        let temp_dir = TempDir::new().unwrap();
1010        let temp_path = temp_dir.path().join("test.db");
1011
1012        let schema = Arc::new(Schema::new(vec![
1013            Column::new("a", DataType::Int8, false),
1014            Column::new("b", DataType::Int16, false),
1015        ]));
1016
1017        let disk_manager = DiskManager::try_new(temp_path).unwrap();
1018        let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
1019        let buffer_pool = Arc::new(BufferManager::new(1000, disk_scheduler));
1020        let table_heap = Arc::new(TableHeap::try_new(schema.clone(), buffer_pool).unwrap());
1021
1022        let meta1 = super::TupleMeta::new(1, 0);
1023        let rid1 = table_heap
1024            .insert_tuple(
1025                &meta1,
1026                &Tuple::new(schema.clone(), vec![1i8.into(), 1i16.into()]),
1027            )
1028            .unwrap();
1029        let meta2 = super::TupleMeta::new(2, 0);
1030        let rid2 = table_heap
1031            .insert_tuple(
1032                &meta2,
1033                &Tuple::new(schema.clone(), vec![2i8.into(), 2i16.into()]),
1034            )
1035            .unwrap();
1036        let meta3 = super::TupleMeta::new(3, 0);
1037        let rid3 = table_heap
1038            .insert_tuple(
1039                &meta3,
1040                &Tuple::new(schema.clone(), vec![3i8.into(), 3i16.into()]),
1041            )
1042            .unwrap();
1043
1044        let mut iterator = TableIterator::new(table_heap.clone(), ..);
1045
1046        let (rid, meta, tuple) = iterator.next().unwrap().unwrap();
1047        assert_eq!(rid, rid1);
1048        assert_eq!(meta, meta1);
1049        assert_eq!(tuple.data, vec![1i8.into(), 1i16.into()]);
1050
1051        let (rid, meta, tuple) = iterator.next().unwrap().unwrap();
1052        assert_eq!(rid, rid2);
1053        assert_eq!(meta, meta2);
1054        assert_eq!(tuple.data, vec![2i8.into(), 2i16.into()]);
1055
1056        let (rid, meta, tuple) = iterator.next().unwrap().unwrap();
1057        assert_eq!(rid, rid3);
1058        assert_eq!(meta, meta3);
1059        assert_eq!(tuple.data, vec![3i8.into(), 3i16.into()]);
1060
1061        assert!(iterator.next().unwrap().is_none());
1062    }
1063
1064    #[test]
1065    pub fn test_streaming_seq_scan_ring() {
1066        // Force streaming mode regardless of table size
1067        std::env::set_var("QUILL_STREAM_SCAN", "1");
1068        std::env::set_var("QUILL_STREAM_READAHEAD", "2");
1069
1070        let temp_dir = TempDir::new().unwrap();
1071        let temp_path = temp_dir.path().join("test.db");
1072
1073        let schema = Arc::new(Schema::new(vec![
1074            Column::new("a", DataType::Int8, false),
1075            Column::new("b", DataType::Int16, false),
1076        ]));
1077
1078        let disk_manager = DiskManager::try_new(temp_path).unwrap();
1079        let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
1080        let buffer_pool = Arc::new(BufferManager::new(128, disk_scheduler));
1081        let table_heap = Arc::new(TableHeap::try_new(schema.clone(), buffer_pool).unwrap());
1082
1083        // Insert many rows to span multiple pages
1084        let rows = 1000;
1085        for i in 0..rows {
1086            let _rid = table_heap
1087                .insert_tuple(
1088                    &super::TupleMeta::new(1, 0),
1089                    &Tuple::new(schema.clone(), vec![(i as i8).into(), (i as i16).into()]),
1090                )
1091                .unwrap();
1092        }
1093
1094        // Ensure data is persisted before direct I/O streaming
1095        table_heap.buffer_pool.flush_all_pages().unwrap();
1096
1097        // Iterate full range; should go through streaming ring buffer
1098        let mut it = TableIterator::new(table_heap.clone(), ..);
1099        let mut cnt = 0usize;
1100        while let Some((_rid, _meta, _t)) = it.next().unwrap() {
1101            cnt += 1;
1102        }
1103        assert_eq!(cnt, rows);
1104    }
1105
1106    #[test]
1107    pub fn test_streaming_respects_bounds_and_fallbacks() {
1108        // Force-enable streaming globally; iterator should still fallback for ranged scans
1109        std::env::set_var("QUILL_STREAM_SCAN", "1");
1110        std::env::set_var("QUILL_STREAM_READAHEAD", "2");
1111
1112        let temp_dir = TempDir::new().unwrap();
1113        let temp_path = temp_dir.path().join("test.db");
1114
1115        let schema = Arc::new(Schema::new(vec![
1116            Column::new("a", DataType::Int8, false),
1117            Column::new("b", DataType::Int16, false),
1118        ]));
1119
1120        let disk_manager = DiskManager::try_new(temp_path).unwrap();
1121        let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
1122        let buffer_pool = Arc::new(BufferManager::new(128, disk_scheduler));
1123        let table_heap = Arc::new(TableHeap::try_new(schema.clone(), buffer_pool).unwrap());
1124
1125        let rid1 = table_heap
1126            .insert_tuple(
1127                &super::TupleMeta::new(1, 0),
1128                &Tuple::new(schema.clone(), vec![1i8.into(), 1i16.into()]),
1129            )
1130            .unwrap();
1131        let rid2 = table_heap
1132            .insert_tuple(
1133                &super::TupleMeta::new(2, 0),
1134                &Tuple::new(schema.clone(), vec![2i8.into(), 2i16.into()]),
1135            )
1136            .unwrap();
1137        let rid3 = table_heap
1138            .insert_tuple(
1139                &super::TupleMeta::new(3, 0),
1140                &Tuple::new(schema.clone(), vec![3i8.into(), 3i16.into()]),
1141            )
1142            .unwrap();
1143
1144        // Create ranged iterator with streaming hint=true; must fallback and only return rid1..=rid2
1145        let mut it = TableIterator::new_with_hint(table_heap.clone(), rid1..=rid2, Some(true));
1146
1147        let got1 = it.next().unwrap().unwrap();
1148        let got2 = it.next().unwrap().unwrap();
1149        let got3 = it.next().unwrap();
1150
1151        assert_eq!(got1.0, rid1);
1152        assert_eq!(got2.0, rid2);
1153        assert!(got3.is_none());
1154
1155        // Sanity: ensure rid3 exists but not returned in range
1156        let (_m, t3) = table_heap.full_tuple(rid3).unwrap();
1157        assert_eq!(t3.data, vec![3i8.into(), 3i16.into()]);
1158    }
1159
1160    #[test]
1161    pub fn test_recover_set_tuple_meta_and_bytes() {
1162        let temp_dir = TempDir::new().unwrap();
1163        let temp_path = temp_dir.path().join("test.db");
1164
1165        let schema = Arc::new(Schema::new(vec![
1166            Column::new("a", DataType::Int8, false),
1167            Column::new("b", DataType::Int16, false),
1168        ]));
1169        let disk_manager = DiskManager::try_new(temp_path).unwrap();
1170        let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
1171        let buffer_pool = Arc::new(BufferManager::new(128, disk_scheduler));
1172        let table_heap = TableHeap::try_new(schema.clone(), buffer_pool.clone()).unwrap();
1173
1174        // Insert a row
1175        let rid = table_heap
1176            .insert_tuple(
1177                &EMPTY_TUPLE_META,
1178                &Tuple::new(schema.clone(), vec![1i8.into(), 10i16.into()]),
1179            )
1180            .unwrap();
1181
1182        // Change bytes via recovery API
1183        let new_tuple = Tuple::new(schema.clone(), vec![2i8.into(), 20i16.into()]);
1184        let new_bytes = TupleCodec::encode(&new_tuple);
1185        table_heap
1186            .recover_set_tuple_bytes(rid, &new_bytes)
1187            .expect("recover bytes");
1188
1189        // Verify tuple data changed
1190        let (_m, t) = table_heap.full_tuple(rid).unwrap();
1191        assert_eq!(t.data, vec![2i8.into(), 20i16.into()]);
1192
1193        // Mark deleted via recovery API and verify
1194        let mut meta = table_heap.tuple_meta(rid).unwrap();
1195        meta.is_deleted = true;
1196        table_heap
1197            .recover_set_tuple_meta(rid, meta)
1198            .expect("recover meta");
1199        let m2 = table_heap.tuple_meta(rid).unwrap();
1200        assert!(m2.is_deleted);
1201    }
1202
1203    #[test]
1204    pub fn test_recover_repack_on_size_mismatch() {
1205        let temp_dir = TempDir::new().unwrap();
1206        let temp_path = temp_dir.path().join("test.db");
1207
1208        let schema = Arc::new(Schema::new(vec![
1209            Column::new("a", DataType::Int8, false),
1210            Column::new("b", DataType::Int16, false),
1211        ]));
1212        let disk_manager = DiskManager::try_new(temp_path).unwrap();
1213        let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
1214        let buffer_pool = Arc::new(BufferManager::new(128, disk_scheduler));
1215        let table_heap = TableHeap::try_new(schema.clone(), buffer_pool.clone()).unwrap();
1216
1217        let rid = table_heap
1218            .insert_tuple(
1219                &EMPTY_TUPLE_META,
1220                &Tuple::new(schema.clone(), vec![1i8.into(), 10i16.into()]),
1221            )
1222            .unwrap();
1223
1224        // Create a tuple with different encoded length and recover-set it
1225        let larger_tuple = Tuple::new(schema.clone(), vec![99i8.into(), 300i16.into()]);
1226        let larger_bytes = TupleCodec::encode(&larger_tuple);
1227        table_heap
1228            .recover_set_tuple_bytes(rid, &larger_bytes)
1229            .expect("recover larger bytes");
1230
1231        let (_m, t2) = table_heap.full_tuple(rid).unwrap();
1232        assert_eq!(t2.data, vec![99i8.into(), 300i16.into()]);
1233    }
1234
1235    #[test]
1236    fn vacuum_slot_if_reclaims_tuple() {
1237        let temp_dir = TempDir::new().unwrap();
1238        let temp_path = temp_dir.path().join("vacuum.db");
1239        let schema = Arc::new(Schema::new(vec![Column::new("v", DataType::Int32, false)]));
1240        let disk_manager = DiskManager::try_new(temp_path).unwrap();
1241        let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
1242        let buffer_pool = Arc::new(BufferManager::new(32, disk_scheduler));
1243        let heap = TableHeap::try_new(schema.clone(), buffer_pool).unwrap();
1244
1245        let meta = super::TupleMeta::new(1, 0);
1246        let tuple = Tuple::new(schema.clone(), vec![ScalarValue::Int32(Some(5))]);
1247        let rid = heap.insert_tuple(&meta, &tuple).unwrap();
1248
1249        assert!(heap.full_tuple(rid).is_ok());
1250        assert!(heap.vacuum_slot_if(rid, |_| true).unwrap());
1251        assert!(heap.full_tuple(rid).is_err());
1252        assert!(heap.get_first_rid().unwrap().is_none());
1253    }
1254
1255    #[test]
1256    fn vacuum_slot_if_respects_predicate() {
1257        let temp_dir = TempDir::new().unwrap();
1258        let temp_path = temp_dir.path().join("vacuum_predicate.db");
1259        let schema = Arc::new(Schema::new(vec![Column::new("v", DataType::Int32, false)]));
1260        let disk_manager = DiskManager::try_new(temp_path).unwrap();
1261        let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
1262        let buffer_pool = Arc::new(BufferManager::new(32, disk_scheduler));
1263        let heap = TableHeap::try_new(schema.clone(), buffer_pool).unwrap();
1264
1265        let meta = super::TupleMeta::new(42, 0);
1266        let tuple = Tuple::new(schema.clone(), vec![ScalarValue::Int32(Some(9))]);
1267        let rid = heap.insert_tuple(&meta, &tuple).unwrap();
1268
1269        assert!(!heap
1270            .vacuum_slot_if(rid, |current| current.insert_txn_id == 0)
1271            .unwrap());
1272        let (meta_after, tuple_after) = heap.full_tuple(rid).unwrap();
1273        assert_eq!(meta_after.insert_txn_id, 42);
1274        assert_eq!(tuple_after, tuple);
1275    }
1276}