quill_sql/storage/
table_heap.rs

1use crate::buffer::{AtomicPageId, PageId, WritePageGuard, INVALID_PAGE_ID};
2use crate::catalog::{SchemaRef, EMPTY_SCHEMA_REF};
3use crate::config::TableScanConfig;
4use crate::storage::codec::TablePageCodec;
5use crate::storage::disk_scheduler::{DiskCommandResultReceiver, DiskScheduler};
6use crate::storage::page::{RecordId, TablePage, TupleMeta, INVALID_RID};
7use crate::transaction::{CommandId, TransactionId};
8use crate::{
9    buffer::BufferManager,
10    error::{QuillSQLError, QuillSQLResult},
11};
12use bytes::BytesMut;
13use std::collections::Bound;
14use std::collections::VecDeque;
15use std::ops::RangeBounds;
16use std::sync::atomic::{AtomicU32, Ordering};
17use std::sync::Arc;
18
19use crate::recovery::wal::codec::{PageDeltaPayload, PageWritePayload};
20use crate::recovery::wal_record::WalRecordPayload;
21use crate::storage::codec::TupleCodec;
22use crate::storage::heap::wal_codec::{
23    HeapDeletePayload, HeapInsertPayload, HeapRecordPayload, HeapUpdatePayload, RelationIdent,
24    TupleMetaRepr,
25};
26use crate::storage::tuple::Tuple;
27use crate::utils::ring_buffer::RingBuffer;
28
29#[derive(Debug)]
30pub struct TableHeap {
31    pub schema: SchemaRef,
32    pub buffer_pool: Arc<BufferManager>,
33    pub first_page_id: AtomicPageId,
34    pub last_page_id: AtomicPageId,
35}
36
37impl TableHeap {
38    /// Creates a new table heap. This involves allocating an initial page.
39    pub fn try_new(schema: SchemaRef, buffer_pool: Arc<BufferManager>) -> QuillSQLResult<Self> {
40        // new_page() returns a WritePageGuard.
41        let mut first_page_guard = buffer_pool.new_page()?;
42        let first_page_id = first_page_guard.page_id();
43
44        // Initialize the first page as an empty TablePage.
45        let table_page = TablePage::new(schema.clone(), INVALID_PAGE_ID);
46        let encoded_data = TablePageCodec::encode(&table_page);
47
48        // Use DerefMut to get a mutable reference and update the page data.
49        // This also marks the page as dirty automatically.
50        first_page_guard.data_mut().copy_from_slice(&encoded_data);
51        first_page_guard.set_lsn(table_page.lsn());
52
53        // The first_page_guard is dropped here, automatically unpinning the page.
54        Ok(Self {
55            schema,
56            buffer_pool,
57            first_page_id: AtomicU32::new(first_page_id),
58            last_page_id: AtomicU32::new(first_page_id),
59        })
60    }
61
62    fn write_back_page(
63        &self,
64        page_id: PageId,
65        guard: &mut WritePageGuard,
66        table_page: &mut TablePage,
67    ) -> QuillSQLResult<()> {
68        let prev_lsn = guard.lsn();
69        if let Some(wal) = self.buffer_pool.wal_manager() {
70            // FPW: if first touch since checkpoint, force full-page image
71            if wal.fpw_first_touch(page_id) {
72                let new_image = TablePageCodec::encode(table_page);
73                let result = wal.append_record_with(|ctx| {
74                    table_page.set_lsn(ctx.end_lsn);
75                    WalRecordPayload::PageWrite(PageWritePayload {
76                        page_id,
77                        prev_page_lsn: prev_lsn,
78                        page_image: new_image.clone(),
79                    })
80                })?;
81                guard.overwrite(&new_image, Some(result.end_lsn));
82                return Ok(());
83            }
84            // Encode new page image
85            let new_image = TablePageCodec::encode(table_page);
86            // Compute minimal contiguous diff with current buffer page bytes
87            let old = guard.data();
88            let (start, end) =
89                if let Some((s, e)) = crate::utils::util::find_contiguous_diff(old, &new_image) {
90                    (s, e)
91                } else {
92                    return Ok(());
93                };
94            let diff_len = end - start;
95            // Threshold: centralized default PAGE_SIZE/16 (env removed)
96            let delta_threshold = crate::buffer::PAGE_SIZE / 16;
97            let result = if diff_len <= delta_threshold {
98                wal.append_record_with(|ctx| {
99                    table_page.set_lsn(ctx.end_lsn);
100                    WalRecordPayload::PageDelta(PageDeltaPayload {
101                        page_id,
102                        prev_page_lsn: prev_lsn,
103                        offset: start as u16,
104                        data: new_image[start..end].to_vec(),
105                    })
106                })?
107            } else {
108                wal.append_record_with(|ctx| {
109                    table_page.set_lsn(ctx.end_lsn);
110                    WalRecordPayload::PageWrite(PageWritePayload {
111                        page_id,
112                        prev_page_lsn: prev_lsn,
113                        page_image: new_image.clone(),
114                    })
115                })?
116            };
117            guard.overwrite(&new_image, Some(result.end_lsn));
118            // WritePageGuard will capture recLSN via dirty-page table
119        } else {
120            table_page.set_lsn(prev_lsn);
121            let encoded = TablePageCodec::encode(table_page);
122            guard.overwrite(&encoded, Some(table_page.lsn()));
123        }
124        Ok(())
125    }
126
127    pub(crate) fn relation_ident(&self) -> RelationIdent {
128        RelationIdent {
129            root_page_id: self.first_page_id.load(Ordering::SeqCst),
130        }
131    }
132
133    fn append_heap_record(&self, payload: HeapRecordPayload) -> QuillSQLResult<()> {
134        if let Some(wal) = self.buffer_pool.wal_manager() {
135            let _wal_result =
136                wal.append_record_with(|_| WalRecordPayload::Heap(payload.clone()))?;
137        }
138        Ok(())
139    }
140
141    /// Inserts a tuple into the table.
142    ///
143    /// This function inserts the given tuple into the table. If the last page in the table
144    /// has enough space for the tuple, it is inserted there. Otherwise, a new page is allocated
145    /// and the tuple is inserted there.
146    ///
147    /// Parameters:
148    /// - `meta`: The metadata associated with the tuple.
149    /// - `tuple`: The tuple to be inserted.
150    ///
151    /// Returns:
152    /// An `Option` containing the `Rid` of the inserted tuple if successful, otherwise `None`.
153    /// Inserts a tuple into the table.
154    pub fn insert_tuple(&self, meta: &TupleMeta, tuple: &Tuple) -> QuillSQLResult<RecordId> {
155        let mut current_page_id = self.last_page_id.load(Ordering::SeqCst);
156
157        loop {
158            let mut current_page_guard = self.buffer_pool.fetch_page_write(current_page_id)?;
159            let mut table_page =
160                TablePageCodec::decode(current_page_guard.data(), self.schema.clone())?.0;
161            table_page.set_lsn(current_page_guard.lsn());
162
163            if table_page.next_tuple_offset(tuple).is_ok() {
164                let tuple_bytes = TupleCodec::encode(tuple);
165                let slot_id = table_page.insert_tuple(meta, tuple)?;
166                let relation = self.relation_ident();
167                let tuple_meta = TupleMetaRepr::from(*meta);
168                let payload = HeapRecordPayload::Insert(HeapInsertPayload {
169                    relation,
170                    page_id: current_page_id,
171                    slot_id,
172                    op_txn_id: meta.insert_txn_id,
173                    tuple_meta,
174                    tuple_data: tuple_bytes,
175                });
176                self.append_heap_record(payload.clone())?;
177                self.write_back_page(current_page_id, &mut current_page_guard, &mut table_page)?;
178                return Ok(RecordId::new(current_page_id, slot_id as u32));
179            }
180
181            let mut new_page_guard = self.buffer_pool.new_page()?;
182            let new_page_id = new_page_guard.page_id();
183            let mut new_table_page = TablePage::new(self.schema.clone(), INVALID_PAGE_ID);
184            self.write_back_page(new_page_id, &mut new_page_guard, &mut new_table_page)?;
185
186            table_page.header.next_page_id = new_page_id;
187            self.write_back_page(current_page_id, &mut current_page_guard, &mut table_page)?;
188            drop(current_page_guard);
189
190            self.last_page_id.store(new_page_id, Ordering::SeqCst);
191            current_page_id = new_page_id;
192        }
193    }
194
195    pub fn update_tuple(&self, rid: RecordId, tuple: Tuple) -> QuillSQLResult<()> {
196        let mut page_guard = self.buffer_pool.fetch_page_write(rid.page_id)?;
197        let mut table_page = TablePageCodec::decode(page_guard.data(), self.schema.clone())?.0;
198        table_page.set_lsn(page_guard.lsn());
199
200        let slot = rid.slot_num as u16;
201        let (old_meta, old_tuple) = table_page.tuple(slot)?;
202        let old_tuple_bytes = TupleCodec::encode(&old_tuple);
203        let new_tuple_bytes = TupleCodec::encode(&tuple);
204        table_page.update_tuple(tuple, rid.slot_num as u16)?;
205        let new_meta = table_page.header.tuple_infos[slot as usize].meta;
206        let relation = self.relation_ident();
207        self.append_heap_record(HeapRecordPayload::Update(HeapUpdatePayload {
208            relation,
209            page_id: rid.page_id,
210            slot_id: slot,
211            op_txn_id: new_meta.insert_txn_id,
212            new_tuple_meta: TupleMetaRepr::from(new_meta),
213            new_tuple_data: new_tuple_bytes,
214            old_tuple_meta: Some(TupleMetaRepr::from(old_meta)),
215            old_tuple_data: Some(old_tuple_bytes),
216        }))?;
217        self.write_back_page(rid.page_id, &mut page_guard, &mut table_page)
218    }
219
220    pub fn update_tuple_meta(&self, meta: TupleMeta, rid: RecordId) -> QuillSQLResult<()> {
221        let mut page_guard = self.buffer_pool.fetch_page_write(rid.page_id)?;
222        let mut table_page = TablePageCodec::decode(page_guard.data(), self.schema.clone())?.0;
223        table_page.set_lsn(page_guard.lsn());
224
225        let slot = rid.slot_num as u16;
226        let (old_meta, old_tuple) = table_page.tuple(slot)?;
227        let old_tuple_bytes = TupleCodec::encode(&old_tuple);
228        table_page.update_tuple_meta(meta, slot)?;
229        let relation = self.relation_ident();
230        let payload = if meta.is_deleted && !old_meta.is_deleted {
231            HeapRecordPayload::Delete(HeapDeletePayload {
232                relation,
233                page_id: rid.page_id,
234                slot_id: slot,
235                op_txn_id: meta.delete_txn_id,
236                old_tuple_meta: TupleMetaRepr::from(old_meta),
237                old_tuple_data: Some(old_tuple_bytes),
238            })
239        } else {
240            let (_, current_tuple) = table_page.tuple(slot)?;
241            let new_tuple_bytes = TupleCodec::encode(&current_tuple);
242            HeapRecordPayload::Update(HeapUpdatePayload {
243                relation,
244                page_id: rid.page_id,
245                slot_id: slot,
246                op_txn_id: meta.insert_txn_id,
247                new_tuple_meta: TupleMetaRepr::from(meta),
248                new_tuple_data: new_tuple_bytes,
249                old_tuple_meta: Some(TupleMetaRepr::from(old_meta)),
250                old_tuple_data: Some(old_tuple_bytes),
251            })
252        };
253        self.append_heap_record(payload)?;
254        self.write_back_page(rid.page_id, &mut page_guard, &mut table_page)
255    }
256
257    pub fn full_tuple(&self, rid: RecordId) -> QuillSQLResult<(TupleMeta, Tuple)> {
258        let (_, table_page) = self
259            .buffer_pool
260            .fetch_table_page(rid.page_id, self.schema.clone())?;
261        let result = table_page.tuple(rid.slot_num as u16)?;
262        Ok(result)
263    }
264
265    pub fn tuple(&self, rid: RecordId) -> QuillSQLResult<Tuple> {
266        let (_meta, tuple) = self.full_tuple(rid)?;
267        Ok(tuple)
268    }
269
270    pub fn tuple_meta(&self, rid: RecordId) -> QuillSQLResult<TupleMeta> {
271        let (meta, _tuple) = self.full_tuple(rid)?;
272        Ok(meta)
273    }
274
275    pub fn get_first_rid(&self) -> QuillSQLResult<Option<RecordId>> {
276        let first_page_id = self.first_page_id.load(Ordering::SeqCst);
277        let (_, table_page) = self
278            .buffer_pool
279            .fetch_table_page(first_page_id, self.schema.clone())?;
280
281        if table_page.header.num_tuples == 0 {
282            Ok(None)
283        } else {
284            Ok(Some(RecordId::new(first_page_id, 0)))
285        }
286    }
287
288    pub fn get_next_rid(&self, rid: RecordId) -> QuillSQLResult<Option<RecordId>> {
289        let (_, table_page) = self
290            .buffer_pool
291            .fetch_table_page(rid.page_id, self.schema.clone())?;
292        let next_rid = table_page.get_next_rid(&rid);
293        if next_rid.is_some() {
294            return Ok(next_rid);
295        }
296
297        if table_page.header.next_page_id == INVALID_PAGE_ID {
298            return Ok(None);
299        }
300        let (_, next_table_page) = self
301            .buffer_pool
302            .fetch_table_page(table_page.header.next_page_id, self.schema.clone())?;
303
304        if next_table_page.header.num_tuples == 0 {
305            return Ok(None);
306        }
307        Ok(Some(RecordId::new(table_page.header.next_page_id, 0)))
308    }
309
310    /// Construct a lightweight TableHeap view for recovery operations.
311    /// This instance uses an empty schema and does not rely on first/last page ids.
312    pub fn recovery_view(buffer_pool: Arc<BufferManager>) -> Self {
313        Self {
314            schema: EMPTY_SCHEMA_REF.clone(),
315            buffer_pool,
316            first_page_id: AtomicU32::new(0),
317            last_page_id: AtomicU32::new(0),
318        }
319    }
320
321    /// Recovery-only API: set tuple meta without emitting WAL.
322    /// Only used by RecoveryManager during UNDO.
323    pub fn recover_set_tuple_meta(&self, rid: RecordId, meta: TupleMeta) -> QuillSQLResult<()> {
324        let mut guard = self.buffer_pool.fetch_page_write(rid.page_id)?;
325        let (mut header, hdr_len) =
326            crate::storage::codec::TablePageHeaderCodec::decode(guard.data())?;
327        if (rid.slot_num as usize) >= header.tuple_infos.len() {
328            return Ok(());
329        }
330        let info = &mut header.tuple_infos[rid.slot_num as usize];
331        if info.meta.is_deleted != meta.is_deleted {
332            if meta.is_deleted {
333                header.num_deleted_tuples = header.num_deleted_tuples.saturating_add(1);
334            } else {
335                header.num_deleted_tuples = header.num_deleted_tuples.saturating_sub(1);
336            }
337        }
338        info.meta = meta;
339        let new_header = crate::storage::codec::TablePageHeaderCodec::encode(&header);
340        let copy_len = std::cmp::min(hdr_len, new_header.len());
341        guard.data_mut()[0..copy_len].copy_from_slice(&new_header[..copy_len]);
342        guard.mark_dirty();
343        Ok(())
344    }
345
346    /// Recovery-only API: set tuple raw bytes without emitting WAL.
347    /// If size mismatches, repack tuple area and update offsets.
348    pub fn recover_set_tuple_bytes(&self, rid: RecordId, new_bytes: &[u8]) -> QuillSQLResult<()> {
349        let mut guard = self.buffer_pool.fetch_page_write(rid.page_id)?;
350        let (mut header, _hdr_len) =
351            crate::storage::codec::TablePageHeaderCodec::decode(guard.data())?;
352        if (rid.slot_num as usize) >= header.tuple_infos.len() {
353            return Ok(());
354        }
355        let slot = rid.slot_num as usize;
356        let info = &mut header.tuple_infos[slot];
357        let off = info.offset as usize;
358        let sz = info.size as usize;
359        if new_bytes.len() == sz {
360            if off + sz <= crate::buffer::PAGE_SIZE {
361                guard.data_mut()[off..off + sz].copy_from_slice(new_bytes);
362            }
363            guard.mark_dirty();
364            return Ok(());
365        }
366        let n = header.tuple_infos.len();
367        let mut tuples: Vec<Vec<u8>> = Vec::with_capacity(n);
368        for i in 0..n {
369            let inf = &header.tuple_infos[i];
370            let s = &guard.data()[inf.offset as usize..(inf.offset + inf.size) as usize];
371            if i == slot {
372                tuples.push(new_bytes.to_vec());
373            } else {
374                tuples.push(s.to_vec());
375            }
376        }
377        let mut tail = crate::buffer::PAGE_SIZE;
378        for i in 0..n {
379            let sz = tuples[i].len();
380            tail = tail.saturating_sub(sz);
381            header.tuple_infos[i].offset = tail as u16;
382            header.tuple_infos[i].size = sz as u16;
383        }
384        let new_header = crate::storage::codec::TablePageHeaderCodec::encode(&header);
385        for b in guard.data_mut().iter_mut() {
386            *b = 0;
387        }
388        let hdr_copy = std::cmp::min(new_header.len(), crate::buffer::PAGE_SIZE);
389        guard.data_mut()[0..hdr_copy].copy_from_slice(&new_header[..hdr_copy]);
390        for i in 0..n {
391            let off = header.tuple_infos[i].offset as usize;
392            let sz = header.tuple_infos[i].size as usize;
393            if off + sz <= crate::buffer::PAGE_SIZE {
394                guard.data_mut()[off..off + sz].copy_from_slice(&tuples[i][..sz]);
395            }
396        }
397        guard.mark_dirty();
398        Ok(())
399    }
400
401    pub fn recover_restore_tuple(
402        &self,
403        rid: RecordId,
404        meta: TupleMeta,
405        tuple: &Tuple,
406    ) -> QuillSQLResult<()> {
407        let bytes = TupleCodec::encode(tuple);
408        self.recover_set_tuple_bytes(rid, &bytes)?;
409        self.recover_set_tuple_meta(rid, meta)
410    }
411
412    pub fn recover_delete_tuple(
413        &self,
414        rid: RecordId,
415        txn_id: TransactionId,
416        cid: CommandId,
417    ) -> QuillSQLResult<()> {
418        let mut meta = self.tuple_meta(rid)?;
419        if meta.is_deleted {
420            return Ok(());
421        }
422        meta.mark_deleted(txn_id, cid);
423        self.recover_set_tuple_meta(rid, meta)
424    }
425
426    pub fn delete_tuple(
427        &self,
428        rid: RecordId,
429        txn_id: TransactionId,
430        cid: CommandId,
431    ) -> QuillSQLResult<()> {
432        let mut page_guard = self.buffer_pool.fetch_page_write(rid.page_id)?;
433        let mut table_page = TablePageCodec::decode(page_guard.data(), self.schema.clone())?.0;
434        table_page.set_lsn(page_guard.lsn());
435
436        let slot = rid.slot_num as u16;
437        let (mut meta, tuple) = table_page.tuple(slot)?;
438        if meta.is_deleted {
439            return Ok(());
440        }
441        // capture old meta before mutation for WAL logging
442        let old_meta = meta;
443        meta.mark_deleted(txn_id, cid);
444        table_page.update_tuple_meta(meta, slot)?;
445        let old_tuple_bytes = TupleCodec::encode(&tuple);
446
447        let relation = self.relation_ident();
448        let payload = HeapRecordPayload::Delete(HeapDeletePayload {
449            relation,
450            page_id: rid.page_id,
451            slot_id: slot,
452            op_txn_id: txn_id,
453            old_tuple_meta: TupleMetaRepr::from(old_meta),
454            old_tuple_data: Some(old_tuple_bytes),
455        });
456        self.append_heap_record(payload)?;
457        self.write_back_page(rid.page_id, &mut page_guard, &mut table_page)
458    }
459}
460
461#[derive(Debug)]
462pub struct TableIterator {
463    heap: Arc<TableHeap>,
464    start_bound: Bound<RecordId>,
465    end_bound: Bound<RecordId>,
466    cursor: RecordId,
467    started: bool,
468    ended: bool,
469    strategy: ScanStrategy,
470}
471
472#[derive(Debug)]
473enum ScanStrategy {
474    /// Existing behavior: go through buffer pool (page_table/LRU-K)
475    Cached,
476    /// Streaming with generic ring buffer holding decoded tuples
477    Streaming(StreamScanState),
478}
479
480#[derive(Debug)]
481struct StreamScanState {
482    ring: RingBuffer<(RecordId, TupleMeta, Tuple)>,
483    first_page: bool,
484    prefetch: StreamPrefetchState,
485}
486
487#[derive(Debug)]
488struct StreamPrefetchState {
489    pending: VecDeque<PageId>,
490    inflight: VecDeque<StreamBatch>,
491    ready: VecDeque<(PageId, BytesMut)>,
492    readahead: usize,
493    exhausted: bool,
494}
495
496#[derive(Debug)]
497struct StreamBatch {
498    page_ids: Vec<PageId>,
499    rx: DiskCommandResultReceiver<Vec<BytesMut>>,
500}
501
502impl StreamPrefetchState {
503    fn ensure_ready(&mut self, scheduler: &Arc<DiskScheduler>) -> QuillSQLResult<()> {
504        while !self.exhausted && self.ready.is_empty() {
505            let capacity = self
506                .readahead
507                .saturating_sub(self.ready.len() + self.inflight.len());
508            if capacity > 0 && !self.pending.is_empty() {
509                self.schedule_batch(scheduler, capacity)?;
510                continue;
511            }
512            if let Some(batch) = self.inflight.pop_front() {
513                let buffers = batch.rx.recv().map_err(|e| {
514                    QuillSQLError::Internal(format!("DiskScheduler channel disconnected: {}", e))
515                })??;
516                for (pid, bytes) in batch.page_ids.into_iter().zip(buffers.into_iter()) {
517                    self.ready.push_back((pid, bytes));
518                }
519            } else {
520                self.exhausted = true;
521            }
522        }
523        Ok(())
524    }
525
526    fn maybe_schedule(&mut self, scheduler: &Arc<DiskScheduler>) -> QuillSQLResult<()> {
527        if self.exhausted {
528            return Ok(());
529        }
530        let capacity = self
531            .readahead
532            .saturating_sub(self.ready.len() + self.inflight.len());
533        if capacity == 0 || self.pending.is_empty() {
534            return Ok(());
535        }
536        self.schedule_batch(scheduler, capacity)
537    }
538
539    fn schedule_batch(
540        &mut self,
541        scheduler: &Arc<DiskScheduler>,
542        limit: usize,
543    ) -> QuillSQLResult<()> {
544        let mut ids = Vec::with_capacity(limit);
545        while ids.len() < limit {
546            if let Some(pid) = self.pending.pop_front() {
547                ids.push(pid);
548            } else {
549                break;
550            }
551        }
552        if ids.is_empty() {
553            return Ok(());
554        }
555        let rx = scheduler.schedule_read_pages(ids.clone())?;
556        self.inflight.push_back(StreamBatch { page_ids: ids, rx });
557        Ok(())
558    }
559}
560
561impl TableIterator {
562    pub fn new<R: RangeBounds<RecordId>>(heap: Arc<TableHeap>, range: R) -> Self {
563        Self::new_with_hint(heap, range, None)
564    }
565
566    pub fn new_with_hint<R: RangeBounds<RecordId>>(
567        heap: Arc<TableHeap>,
568        range: R,
569        streaming_hint: Option<bool>,
570    ) -> Self {
571        let start = range.start_bound().cloned();
572        let end = range.end_bound().cloned();
573
574        // Centralized config (remove env): use TableScanConfig defaults
575        let cfg = TableScanConfig::default();
576        let pool_quarter = (heap.buffer_pool.buffer_pool().capacity().max(1) / 4) as u32;
577        let threshold: u32 = cfg.stream_threshold_pages.unwrap_or(pool_quarter.max(1));
578        let readahead: usize = cfg.readahead_pages;
579
580        let approx_pages = heap
581            .last_page_id
582            .load(Ordering::SeqCst)
583            .saturating_sub(heap.first_page_id.load(Ordering::SeqCst))
584            + 1;
585
586        let is_full_scan = matches!(start, Bound::Unbounded) && matches!(end, Bound::Unbounded);
587
588        // Requested streaming decision
589        let requested_stream = match streaming_hint {
590            Some(true) => true,
591            Some(false) => false,
592            None => cfg.stream_scan_enable || approx_pages >= threshold,
593        };
594        // If explicitly hinted true, allow streaming even for ranged scans. Otherwise only for full scan.
595        let use_streaming = if matches!(streaming_hint, Some(true)) {
596            true
597        } else {
598            is_full_scan && requested_stream
599        };
600
601        let strategy = if use_streaming {
602            let tuple_ring_cap = readahead.max(1).saturating_mul(1024);
603            let ring = RingBuffer::with_capacity(tuple_ring_cap);
604            let default_first = heap.first_page_id.load(Ordering::SeqCst);
605            let start_pid = match &start {
606                Bound::Included(r) | Bound::Excluded(r) => r.page_id,
607                Bound::Unbounded => default_first,
608            };
609            let mut pending = VecDeque::new();
610            let exhausted = if start_pid == INVALID_PAGE_ID {
611                true
612            } else {
613                pending.push_back(start_pid);
614                false
615            };
616            let prefetch = StreamPrefetchState {
617                pending,
618                inflight: VecDeque::new(),
619                ready: VecDeque::new(),
620                readahead: readahead.max(1),
621                exhausted,
622            };
623            ScanStrategy::Streaming(StreamScanState {
624                ring,
625                first_page: true,
626                prefetch,
627            })
628        } else {
629            ScanStrategy::Cached
630        };
631
632        Self {
633            heap,
634            start_bound: start,
635            end_bound: end,
636            cursor: INVALID_RID,
637            started: false,
638            ended: false,
639            strategy,
640        }
641    }
642
643    pub fn next(&mut self) -> QuillSQLResult<Option<(RecordId, TupleMeta, Tuple)>> {
644        if self.ended {
645            return Ok(None);
646        }
647
648        // Streaming now supports bounded scans; no unconditional fallback required here.
649
650        // Clone refs needed by streaming helper before borrowing self.strategy mutably
651        let heap_arc = self.heap.clone();
652        let schema = self.heap.schema.clone();
653        let start_bound_cloned = self.start_bound.clone();
654
655        // Streaming strategy (only supports full scan). For other ranges fallback to Cached.
656        if let ScanStrategy::Streaming(state) = &mut self.strategy {
657            // Initialize on first call
658            if !self.started {
659                self.started = true;
660                if state.prefetch.exhausted {
661                    self.ended = true;
662                    return Ok(None);
663                }
664                // Ensure disk visibility for streaming reads
665                self.heap.buffer_pool.flush_all_pages()?;
666                fill_stream_ring(&heap_arc, schema.clone(), &start_bound_cloned, state)?;
667            }
668
669            loop {
670                if let Some((rid, meta, tuple)) = state.ring.pop() {
671                    // Respect end bound
672                    match &self.end_bound {
673                        Bound::Unbounded => return Ok(Some((rid, meta, tuple))),
674                        Bound::Included(end) => {
675                            if rid == *end {
676                                self.ended = true;
677                            }
678                            return Ok(Some((rid, meta, tuple)));
679                        }
680                        Bound::Excluded(end) => {
681                            if rid == *end {
682                                self.ended = true;
683                                return Ok(None);
684                            }
685                            return Ok(Some((rid, meta, tuple)));
686                        }
687                    }
688                } else {
689                    if state.prefetch.exhausted {
690                        self.ended = true;
691                        return Ok(None);
692                    }
693                    fill_stream_ring(&heap_arc, schema.clone(), &start_bound_cloned, state)?;
694                    if state.ring.is_empty() {
695                        if state.prefetch.exhausted {
696                            self.ended = true;
697                        }
698                        return Ok(None);
699                    }
700                }
701            }
702        }
703
704        // Cached strategy (original)
705        if self.started {
706            match self.end_bound {
707                Bound::Included(rid) => {
708                    if let Some(next_rid) = self.heap.get_next_rid(self.cursor)? {
709                        self.cursor = next_rid;
710                        if self.cursor == rid {
711                            self.ended = true;
712                        }
713                        let (meta, tuple) = self.heap.full_tuple(self.cursor)?;
714                        Ok(Some((self.cursor, meta, tuple)))
715                    } else {
716                        Ok(None)
717                    }
718                }
719                Bound::Excluded(rid) => {
720                    if let Some(next_rid) = self.heap.get_next_rid(self.cursor)? {
721                        if next_rid == rid {
722                            self.ended = true;
723                            Ok(None)
724                        } else {
725                            self.cursor = next_rid;
726                            let (meta, tuple) = self.heap.full_tuple(self.cursor)?;
727                            Ok(Some((self.cursor, meta, tuple)))
728                        }
729                    } else {
730                        Ok(None)
731                    }
732                }
733                Bound::Unbounded => {
734                    if let Some(next_rid) = self.heap.get_next_rid(self.cursor)? {
735                        self.cursor = next_rid;
736                        let (meta, tuple) = self.heap.full_tuple(self.cursor)?;
737                        Ok(Some((self.cursor, meta, tuple)))
738                    } else {
739                        Ok(None)
740                    }
741                }
742            }
743        } else {
744            self.started = true;
745            match self.start_bound {
746                Bound::Included(rid) => {
747                    self.cursor = rid;
748                    let (meta, tuple) = self.heap.full_tuple(self.cursor)?;
749                    Ok(Some((self.cursor, meta, tuple)))
750                }
751                Bound::Excluded(rid) => {
752                    if let Some(next_rid) = self.heap.get_next_rid(rid)? {
753                        self.cursor = next_rid;
754                        let (meta, tuple) = self.heap.full_tuple(self.cursor)?;
755                        Ok(Some((self.cursor, meta, tuple)))
756                    } else {
757                        self.ended = true;
758                        Ok(None)
759                    }
760                }
761                Bound::Unbounded => {
762                    if let Some(first_rid) = self.heap.get_first_rid()? {
763                        self.cursor = first_rid;
764                        let (meta, tuple) = self.heap.full_tuple(self.cursor)?;
765                        Ok(Some((self.cursor, meta, tuple)))
766                    } else {
767                        self.ended = true;
768                        Ok(None)
769                    }
770                }
771            }
772        }
773    }
774}
775
776fn fill_stream_ring(
777    heap: &Arc<TableHeap>,
778    schema: SchemaRef,
779    start_bound: &Bound<RecordId>,
780    state: &mut StreamScanState,
781) -> QuillSQLResult<()> {
782    let scheduler = heap.buffer_pool.buffer_pool().disk_scheduler();
783    while state.ring.len() < state.ring.capacity() && !state.prefetch.exhausted {
784        state.prefetch.ensure_ready(&scheduler)?;
785        let Some((pid, bytes)) = state.prefetch.ready.pop_front() else {
786            break;
787        };
788
789        let (page, _) = TablePageCodec::decode(&bytes, schema.clone())?;
790        if page.header.next_page_id != INVALID_PAGE_ID {
791            state.prefetch.pending.push_back(page.header.next_page_id);
792        }
793        state.prefetch.maybe_schedule(&scheduler)?;
794
795        let start_slot = if state.first_page {
796            state.first_page = false;
797            match start_bound {
798                Bound::Included(r) if r.page_id == pid => r.slot_num as usize,
799                Bound::Excluded(r) if r.page_id == pid => r.slot_num as usize + 1,
800                _ => 0,
801            }
802        } else {
803            0
804        };
805
806        for slot in start_slot..page.header.num_tuples as usize {
807            let rid = RecordId::new(pid, slot as u32);
808            let (meta, tuple) = page.tuple(slot as u16)?;
809            state.ring.push((rid, meta, tuple));
810            if state.ring.len() >= state.ring.capacity() {
811                break;
812            }
813        }
814    }
815    Ok(())
816}
817
818#[cfg(test)]
819mod tests {
820
821    use std::sync::Arc;
822    use tempfile::TempDir;
823
824    use crate::buffer::BufferManager;
825    use crate::catalog::{Column, DataType, Schema};
826    use crate::storage::codec::TupleCodec;
827    use crate::storage::disk_manager::DiskManager;
828    use crate::storage::disk_scheduler::DiskScheduler;
829    use crate::storage::page::EMPTY_TUPLE_META;
830    use crate::storage::table_heap::TableIterator;
831    use crate::storage::{table_heap::TableHeap, tuple::Tuple};
832
833    #[test]
834    pub fn test_table_heap_update_tuple_meta() {
835        let temp_dir = TempDir::new().unwrap();
836        let temp_path = temp_dir.path().join("test.db");
837
838        let schema = Arc::new(Schema::new(vec![
839            Column::new("a", DataType::Int8, false),
840            Column::new("b", DataType::Int16, false),
841        ]));
842        let disk_manager = DiskManager::try_new(temp_path).unwrap();
843        let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
844        let buffer_pool = Arc::new(BufferManager::new(1000, disk_scheduler));
845        let table_heap = TableHeap::try_new(schema.clone(), buffer_pool).unwrap();
846
847        let _rid1 = table_heap
848            .insert_tuple(
849                &EMPTY_TUPLE_META,
850                &Tuple::new(schema.clone(), vec![1i8.into(), 1i16.into()]),
851            )
852            .unwrap();
853        let rid2 = table_heap
854            .insert_tuple(
855                &EMPTY_TUPLE_META,
856                &Tuple::new(schema.clone(), vec![2i8.into(), 2i16.into()]),
857            )
858            .unwrap();
859        let _rid3 = table_heap
860            .insert_tuple(
861                &EMPTY_TUPLE_META,
862                &Tuple::new(schema.clone(), vec![3i8.into(), 3i16.into()]),
863            )
864            .unwrap();
865
866        let mut meta = table_heap.tuple_meta(rid2).unwrap();
867        meta.insert_txn_id = 1;
868        meta.mark_deleted(2, 0);
869        meta.is_deleted = true;
870        table_heap.update_tuple_meta(meta, rid2).unwrap();
871
872        let meta = table_heap.tuple_meta(rid2).unwrap();
873        assert_eq!(meta.insert_txn_id, 1);
874        assert_eq!(meta.delete_txn_id, 2);
875        assert_eq!(meta.delete_cid, 0);
876        assert!(meta.is_deleted);
877    }
878
879    #[test]
880    pub fn test_table_heap_insert_tuple() {
881        let temp_dir = TempDir::new().unwrap();
882        let temp_path = temp_dir.path().join("test.db");
883
884        let schema = Arc::new(Schema::new(vec![
885            Column::new("a", DataType::Int8, false),
886            Column::new("b", DataType::Int16, false),
887        ]));
888        let disk_manager = DiskManager::try_new(temp_path).unwrap();
889        let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
890        let buffer_pool = Arc::new(BufferManager::new(1000, disk_scheduler));
891        let table_heap = TableHeap::try_new(schema.clone(), buffer_pool).unwrap();
892
893        let meta1 = super::TupleMeta::new(1, 0);
894        let rid1 = table_heap
895            .insert_tuple(
896                &meta1,
897                &Tuple::new(schema.clone(), vec![1i8.into(), 1i16.into()]),
898            )
899            .unwrap();
900        let meta2 = super::TupleMeta::new(2, 0);
901        let rid2 = table_heap
902            .insert_tuple(
903                &meta2,
904                &Tuple::new(schema.clone(), vec![2i8.into(), 2i16.into()]),
905            )
906            .unwrap();
907        let meta3 = super::TupleMeta::new(3, 0);
908        let rid3 = table_heap
909            .insert_tuple(
910                &meta3,
911                &Tuple::new(schema.clone(), vec![3i8.into(), 3i16.into()]),
912            )
913            .unwrap();
914
915        let (meta, tuple) = table_heap.full_tuple(rid1).unwrap();
916        assert_eq!(meta, meta1);
917        assert_eq!(tuple.data, vec![1i8.into(), 1i16.into()]);
918
919        let (meta, tuple) = table_heap.full_tuple(rid2).unwrap();
920        assert_eq!(meta, meta2);
921        assert_eq!(tuple.data, vec![2i8.into(), 2i16.into()]);
922
923        let (meta, tuple) = table_heap.full_tuple(rid3).unwrap();
924        assert_eq!(meta, meta3);
925        assert_eq!(tuple.data, vec![3i8.into(), 3i16.into()]);
926    }
927
928    #[test]
929    pub fn test_table_heap_iterator() {
930        let temp_dir = TempDir::new().unwrap();
931        let temp_path = temp_dir.path().join("test.db");
932
933        let schema = Arc::new(Schema::new(vec![
934            Column::new("a", DataType::Int8, false),
935            Column::new("b", DataType::Int16, false),
936        ]));
937
938        let disk_manager = DiskManager::try_new(temp_path).unwrap();
939        let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
940        let buffer_pool = Arc::new(BufferManager::new(1000, disk_scheduler));
941        let table_heap = Arc::new(TableHeap::try_new(schema.clone(), buffer_pool).unwrap());
942
943        let meta1 = super::TupleMeta::new(1, 0);
944        let rid1 = table_heap
945            .insert_tuple(
946                &meta1,
947                &Tuple::new(schema.clone(), vec![1i8.into(), 1i16.into()]),
948            )
949            .unwrap();
950        let meta2 = super::TupleMeta::new(2, 0);
951        let rid2 = table_heap
952            .insert_tuple(
953                &meta2,
954                &Tuple::new(schema.clone(), vec![2i8.into(), 2i16.into()]),
955            )
956            .unwrap();
957        let meta3 = super::TupleMeta::new(3, 0);
958        let rid3 = table_heap
959            .insert_tuple(
960                &meta3,
961                &Tuple::new(schema.clone(), vec![3i8.into(), 3i16.into()]),
962            )
963            .unwrap();
964
965        let mut iterator = TableIterator::new(table_heap.clone(), ..);
966
967        let (rid, meta, tuple) = iterator.next().unwrap().unwrap();
968        assert_eq!(rid, rid1);
969        assert_eq!(meta, meta1);
970        assert_eq!(tuple.data, vec![1i8.into(), 1i16.into()]);
971
972        let (rid, meta, tuple) = iterator.next().unwrap().unwrap();
973        assert_eq!(rid, rid2);
974        assert_eq!(meta, meta2);
975        assert_eq!(tuple.data, vec![2i8.into(), 2i16.into()]);
976
977        let (rid, meta, tuple) = iterator.next().unwrap().unwrap();
978        assert_eq!(rid, rid3);
979        assert_eq!(meta, meta3);
980        assert_eq!(tuple.data, vec![3i8.into(), 3i16.into()]);
981
982        assert!(iterator.next().unwrap().is_none());
983    }
984
985    #[test]
986    pub fn test_streaming_seq_scan_ring() {
987        // Force streaming mode regardless of table size
988        std::env::set_var("QUILL_STREAM_SCAN", "1");
989        std::env::set_var("QUILL_STREAM_READAHEAD", "2");
990
991        let temp_dir = TempDir::new().unwrap();
992        let temp_path = temp_dir.path().join("test.db");
993
994        let schema = Arc::new(Schema::new(vec![
995            Column::new("a", DataType::Int8, false),
996            Column::new("b", DataType::Int16, false),
997        ]));
998
999        let disk_manager = DiskManager::try_new(temp_path).unwrap();
1000        let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
1001        let buffer_pool = Arc::new(BufferManager::new(128, disk_scheduler));
1002        let table_heap = Arc::new(TableHeap::try_new(schema.clone(), buffer_pool).unwrap());
1003
1004        // Insert many rows to span multiple pages
1005        let rows = 1000;
1006        for i in 0..rows {
1007            let _rid = table_heap
1008                .insert_tuple(
1009                    &super::TupleMeta::new(1, 0),
1010                    &Tuple::new(schema.clone(), vec![(i as i8).into(), (i as i16).into()]),
1011                )
1012                .unwrap();
1013        }
1014
1015        // Ensure data is persisted before direct I/O streaming
1016        table_heap.buffer_pool.flush_all_pages().unwrap();
1017
1018        // Iterate full range; should go through streaming ring buffer
1019        let mut it = TableIterator::new(table_heap.clone(), ..);
1020        let mut cnt = 0usize;
1021        while let Some((_rid, _meta, _t)) = it.next().unwrap() {
1022            cnt += 1;
1023        }
1024        assert_eq!(cnt, rows);
1025    }
1026
1027    #[test]
1028    pub fn test_streaming_respects_bounds_and_fallbacks() {
1029        // Force-enable streaming globally; iterator should still fallback for ranged scans
1030        std::env::set_var("QUILL_STREAM_SCAN", "1");
1031        std::env::set_var("QUILL_STREAM_READAHEAD", "2");
1032
1033        let temp_dir = TempDir::new().unwrap();
1034        let temp_path = temp_dir.path().join("test.db");
1035
1036        let schema = Arc::new(Schema::new(vec![
1037            Column::new("a", DataType::Int8, false),
1038            Column::new("b", DataType::Int16, false),
1039        ]));
1040
1041        let disk_manager = DiskManager::try_new(temp_path).unwrap();
1042        let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
1043        let buffer_pool = Arc::new(BufferManager::new(128, disk_scheduler));
1044        let table_heap = Arc::new(TableHeap::try_new(schema.clone(), buffer_pool).unwrap());
1045
1046        let rid1 = table_heap
1047            .insert_tuple(
1048                &super::TupleMeta::new(1, 0),
1049                &Tuple::new(schema.clone(), vec![1i8.into(), 1i16.into()]),
1050            )
1051            .unwrap();
1052        let rid2 = table_heap
1053            .insert_tuple(
1054                &super::TupleMeta::new(2, 0),
1055                &Tuple::new(schema.clone(), vec![2i8.into(), 2i16.into()]),
1056            )
1057            .unwrap();
1058        let rid3 = table_heap
1059            .insert_tuple(
1060                &super::TupleMeta::new(3, 0),
1061                &Tuple::new(schema.clone(), vec![3i8.into(), 3i16.into()]),
1062            )
1063            .unwrap();
1064
1065        // Create ranged iterator with streaming hint=true; must fallback and only return rid1..=rid2
1066        let mut it = TableIterator::new_with_hint(table_heap.clone(), rid1..=rid2, Some(true));
1067
1068        let got1 = it.next().unwrap().unwrap();
1069        let got2 = it.next().unwrap().unwrap();
1070        let got3 = it.next().unwrap();
1071
1072        assert_eq!(got1.0, rid1);
1073        assert_eq!(got2.0, rid2);
1074        assert!(got3.is_none());
1075
1076        // Sanity: ensure rid3 exists but not returned in range
1077        let (_m, t3) = table_heap.full_tuple(rid3).unwrap();
1078        assert_eq!(t3.data, vec![3i8.into(), 3i16.into()]);
1079    }
1080
1081    #[test]
1082    pub fn test_recover_set_tuple_meta_and_bytes() {
1083        let temp_dir = TempDir::new().unwrap();
1084        let temp_path = temp_dir.path().join("test.db");
1085
1086        let schema = Arc::new(Schema::new(vec![
1087            Column::new("a", DataType::Int8, false),
1088            Column::new("b", DataType::Int16, false),
1089        ]));
1090        let disk_manager = DiskManager::try_new(temp_path).unwrap();
1091        let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
1092        let buffer_pool = Arc::new(BufferManager::new(128, disk_scheduler));
1093        let table_heap = TableHeap::try_new(schema.clone(), buffer_pool.clone()).unwrap();
1094
1095        // Insert a row
1096        let rid = table_heap
1097            .insert_tuple(
1098                &EMPTY_TUPLE_META,
1099                &Tuple::new(schema.clone(), vec![1i8.into(), 10i16.into()]),
1100            )
1101            .unwrap();
1102
1103        // Change bytes via recovery API
1104        let new_tuple = Tuple::new(schema.clone(), vec![2i8.into(), 20i16.into()]);
1105        let new_bytes = TupleCodec::encode(&new_tuple);
1106        table_heap
1107            .recover_set_tuple_bytes(rid, &new_bytes)
1108            .expect("recover bytes");
1109
1110        // Verify tuple data changed
1111        let (_m, t) = table_heap.full_tuple(rid).unwrap();
1112        assert_eq!(t.data, vec![2i8.into(), 20i16.into()]);
1113
1114        // Mark deleted via recovery API and verify
1115        let mut meta = table_heap.tuple_meta(rid).unwrap();
1116        meta.is_deleted = true;
1117        table_heap
1118            .recover_set_tuple_meta(rid, meta)
1119            .expect("recover meta");
1120        let m2 = table_heap.tuple_meta(rid).unwrap();
1121        assert!(m2.is_deleted);
1122    }
1123
1124    #[test]
1125    pub fn test_recover_repack_on_size_mismatch() {
1126        let temp_dir = TempDir::new().unwrap();
1127        let temp_path = temp_dir.path().join("test.db");
1128
1129        let schema = Arc::new(Schema::new(vec![
1130            Column::new("a", DataType::Int8, false),
1131            Column::new("b", DataType::Int16, false),
1132        ]));
1133        let disk_manager = DiskManager::try_new(temp_path).unwrap();
1134        let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
1135        let buffer_pool = Arc::new(BufferManager::new(128, disk_scheduler));
1136        let table_heap = TableHeap::try_new(schema.clone(), buffer_pool.clone()).unwrap();
1137
1138        let rid = table_heap
1139            .insert_tuple(
1140                &EMPTY_TUPLE_META,
1141                &Tuple::new(schema.clone(), vec![1i8.into(), 10i16.into()]),
1142            )
1143            .unwrap();
1144
1145        // Create a tuple with different encoded length and recover-set it
1146        let larger_tuple = Tuple::new(schema.clone(), vec![99i8.into(), 300i16.into()]);
1147        let larger_bytes = TupleCodec::encode(&larger_tuple);
1148        table_heap
1149            .recover_set_tuple_bytes(rid, &larger_bytes)
1150            .expect("recover larger bytes");
1151
1152        let (_m, t2) = table_heap.full_tuple(rid).unwrap();
1153        assert_eq!(t2.data, vec![99i8.into(), 300i16.into()]);
1154    }
1155}