1use crate::buffer::{AtomicPageId, PageId, WritePageGuard, INVALID_PAGE_ID};
2use crate::catalog::{SchemaRef, EMPTY_SCHEMA_REF};
3use crate::config::TableScanConfig;
4use crate::storage::codec::TablePageCodec;
5use crate::storage::disk_scheduler::{DiskCommandResultReceiver, DiskScheduler};
6use crate::storage::page::{RecordId, TablePage, TupleMeta, INVALID_RID};
7use crate::transaction::{CommandId, TransactionId};
8use crate::{
9 buffer::BufferManager,
10 error::{QuillSQLError, QuillSQLResult},
11};
12use bytes::BytesMut;
13use std::collections::Bound;
14use std::collections::VecDeque;
15use std::ops::RangeBounds;
16use std::sync::atomic::{AtomicU32, Ordering};
17use std::sync::Arc;
18
19use crate::recovery::wal::codec::{PageDeltaPayload, PageWritePayload};
20use crate::recovery::wal_record::WalRecordPayload;
21use crate::storage::codec::TupleCodec;
22use crate::storage::heap::wal_codec::{
23 HeapDeletePayload, HeapInsertPayload, HeapRecordPayload, HeapUpdatePayload, RelationIdent,
24 TupleMetaRepr,
25};
26use crate::storage::tuple::Tuple;
27use crate::utils::ring_buffer::RingBuffer;
28
29#[derive(Debug)]
30pub struct TableHeap {
31 pub schema: SchemaRef,
32 pub buffer_pool: Arc<BufferManager>,
33 pub first_page_id: AtomicPageId,
34 pub last_page_id: AtomicPageId,
35}
36
37impl TableHeap {
38 pub fn try_new(schema: SchemaRef, buffer_pool: Arc<BufferManager>) -> QuillSQLResult<Self> {
40 let mut first_page_guard = buffer_pool.new_page()?;
42 let first_page_id = first_page_guard.page_id();
43
44 let table_page = TablePage::new(schema.clone(), INVALID_PAGE_ID);
46 let encoded_data = TablePageCodec::encode(&table_page);
47
48 first_page_guard.data_mut().copy_from_slice(&encoded_data);
51 first_page_guard.set_lsn(table_page.lsn());
52
53 Ok(Self {
55 schema,
56 buffer_pool,
57 first_page_id: AtomicU32::new(first_page_id),
58 last_page_id: AtomicU32::new(first_page_id),
59 })
60 }
61
62 fn write_back_page(
63 &self,
64 page_id: PageId,
65 guard: &mut WritePageGuard,
66 table_page: &mut TablePage,
67 ) -> QuillSQLResult<()> {
68 let prev_lsn = guard.lsn();
69 if let Some(wal) = self.buffer_pool.wal_manager() {
70 if wal.fpw_first_touch(page_id) {
72 let new_image = TablePageCodec::encode(table_page);
73 let result = wal.append_record_with(|ctx| {
74 table_page.set_lsn(ctx.end_lsn);
75 WalRecordPayload::PageWrite(PageWritePayload {
76 page_id,
77 prev_page_lsn: prev_lsn,
78 page_image: new_image.clone(),
79 })
80 })?;
81 guard.overwrite(&new_image, Some(result.end_lsn));
82 return Ok(());
83 }
84 let new_image = TablePageCodec::encode(table_page);
86 let old = guard.data();
88 let (start, end) =
89 if let Some((s, e)) = crate::utils::util::find_contiguous_diff(old, &new_image) {
90 (s, e)
91 } else {
92 return Ok(());
93 };
94 let diff_len = end - start;
95 let delta_threshold = crate::buffer::PAGE_SIZE / 16;
97 let result = if diff_len <= delta_threshold {
98 wal.append_record_with(|ctx| {
99 table_page.set_lsn(ctx.end_lsn);
100 WalRecordPayload::PageDelta(PageDeltaPayload {
101 page_id,
102 prev_page_lsn: prev_lsn,
103 offset: start as u16,
104 data: new_image[start..end].to_vec(),
105 })
106 })?
107 } else {
108 wal.append_record_with(|ctx| {
109 table_page.set_lsn(ctx.end_lsn);
110 WalRecordPayload::PageWrite(PageWritePayload {
111 page_id,
112 prev_page_lsn: prev_lsn,
113 page_image: new_image.clone(),
114 })
115 })?
116 };
117 guard.overwrite(&new_image, Some(result.end_lsn));
118 } else {
120 table_page.set_lsn(prev_lsn);
121 let encoded = TablePageCodec::encode(table_page);
122 guard.overwrite(&encoded, Some(table_page.lsn()));
123 }
124 Ok(())
125 }
126
127 pub(crate) fn relation_ident(&self) -> RelationIdent {
128 RelationIdent {
129 root_page_id: self.first_page_id.load(Ordering::SeqCst),
130 }
131 }
132
133 fn append_heap_record(&self, payload: HeapRecordPayload) -> QuillSQLResult<()> {
134 if let Some(wal) = self.buffer_pool.wal_manager() {
135 let _wal_result =
136 wal.append_record_with(|_| WalRecordPayload::Heap(payload.clone()))?;
137 }
138 Ok(())
139 }
140
141 pub fn insert_tuple(&self, meta: &TupleMeta, tuple: &Tuple) -> QuillSQLResult<RecordId> {
155 let mut current_page_id = self.last_page_id.load(Ordering::SeqCst);
156
157 loop {
158 let mut current_page_guard = self.buffer_pool.fetch_page_write(current_page_id)?;
159 let mut table_page =
160 TablePageCodec::decode(current_page_guard.data(), self.schema.clone())?.0;
161 table_page.set_lsn(current_page_guard.lsn());
162
163 if table_page.next_tuple_offset(tuple).is_ok() {
164 let tuple_bytes = TupleCodec::encode(tuple);
165 let slot_id = table_page.insert_tuple(meta, tuple)?;
166 let relation = self.relation_ident();
167 let tuple_meta = TupleMetaRepr::from(*meta);
168 let payload = HeapRecordPayload::Insert(HeapInsertPayload {
169 relation,
170 page_id: current_page_id,
171 slot_id,
172 op_txn_id: meta.insert_txn_id,
173 tuple_meta,
174 tuple_data: tuple_bytes,
175 });
176 self.append_heap_record(payload.clone())?;
177 self.write_back_page(current_page_id, &mut current_page_guard, &mut table_page)?;
178 return Ok(RecordId::new(current_page_id, slot_id as u32));
179 }
180
181 let mut new_page_guard = self.buffer_pool.new_page()?;
182 let new_page_id = new_page_guard.page_id();
183 let mut new_table_page = TablePage::new(self.schema.clone(), INVALID_PAGE_ID);
184 self.write_back_page(new_page_id, &mut new_page_guard, &mut new_table_page)?;
185
186 table_page.header.next_page_id = new_page_id;
187 self.write_back_page(current_page_id, &mut current_page_guard, &mut table_page)?;
188 drop(current_page_guard);
189
190 self.last_page_id.store(new_page_id, Ordering::SeqCst);
191 current_page_id = new_page_id;
192 }
193 }
194
195 pub fn update_tuple(&self, rid: RecordId, tuple: Tuple) -> QuillSQLResult<()> {
196 let mut page_guard = self.buffer_pool.fetch_page_write(rid.page_id)?;
197 let mut table_page = TablePageCodec::decode(page_guard.data(), self.schema.clone())?.0;
198 table_page.set_lsn(page_guard.lsn());
199
200 let slot = rid.slot_num as u16;
201 let (old_meta, old_tuple) = table_page.tuple(slot)?;
202 let old_tuple_bytes = TupleCodec::encode(&old_tuple);
203 let new_tuple_bytes = TupleCodec::encode(&tuple);
204 table_page.update_tuple(tuple, rid.slot_num as u16)?;
205 let new_meta = table_page.header.tuple_infos[slot as usize].meta;
206 let relation = self.relation_ident();
207 self.append_heap_record(HeapRecordPayload::Update(HeapUpdatePayload {
208 relation,
209 page_id: rid.page_id,
210 slot_id: slot,
211 op_txn_id: new_meta.insert_txn_id,
212 new_tuple_meta: TupleMetaRepr::from(new_meta),
213 new_tuple_data: new_tuple_bytes,
214 old_tuple_meta: Some(TupleMetaRepr::from(old_meta)),
215 old_tuple_data: Some(old_tuple_bytes),
216 }))?;
217 self.write_back_page(rid.page_id, &mut page_guard, &mut table_page)
218 }
219
220 pub fn update_tuple_meta(&self, meta: TupleMeta, rid: RecordId) -> QuillSQLResult<()> {
221 let mut page_guard = self.buffer_pool.fetch_page_write(rid.page_id)?;
222 let mut table_page = TablePageCodec::decode(page_guard.data(), self.schema.clone())?.0;
223 table_page.set_lsn(page_guard.lsn());
224
225 let slot = rid.slot_num as u16;
226 let (old_meta, old_tuple) = table_page.tuple(slot)?;
227 let old_tuple_bytes = TupleCodec::encode(&old_tuple);
228 table_page.update_tuple_meta(meta, slot)?;
229 let relation = self.relation_ident();
230 let payload = if meta.is_deleted && !old_meta.is_deleted {
231 HeapRecordPayload::Delete(HeapDeletePayload {
232 relation,
233 page_id: rid.page_id,
234 slot_id: slot,
235 op_txn_id: meta.delete_txn_id,
236 old_tuple_meta: TupleMetaRepr::from(old_meta),
237 old_tuple_data: Some(old_tuple_bytes),
238 })
239 } else {
240 let (_, current_tuple) = table_page.tuple(slot)?;
241 let new_tuple_bytes = TupleCodec::encode(¤t_tuple);
242 HeapRecordPayload::Update(HeapUpdatePayload {
243 relation,
244 page_id: rid.page_id,
245 slot_id: slot,
246 op_txn_id: meta.insert_txn_id,
247 new_tuple_meta: TupleMetaRepr::from(meta),
248 new_tuple_data: new_tuple_bytes,
249 old_tuple_meta: Some(TupleMetaRepr::from(old_meta)),
250 old_tuple_data: Some(old_tuple_bytes),
251 })
252 };
253 self.append_heap_record(payload)?;
254 self.write_back_page(rid.page_id, &mut page_guard, &mut table_page)
255 }
256
257 pub fn full_tuple(&self, rid: RecordId) -> QuillSQLResult<(TupleMeta, Tuple)> {
258 let (_, table_page) = self
259 .buffer_pool
260 .fetch_table_page(rid.page_id, self.schema.clone())?;
261 let result = table_page.tuple(rid.slot_num as u16)?;
262 Ok(result)
263 }
264
265 pub fn tuple(&self, rid: RecordId) -> QuillSQLResult<Tuple> {
266 let (_meta, tuple) = self.full_tuple(rid)?;
267 Ok(tuple)
268 }
269
270 pub fn tuple_meta(&self, rid: RecordId) -> QuillSQLResult<TupleMeta> {
271 let (meta, _tuple) = self.full_tuple(rid)?;
272 Ok(meta)
273 }
274
275 pub fn get_first_rid(&self) -> QuillSQLResult<Option<RecordId>> {
276 let first_page_id = self.first_page_id.load(Ordering::SeqCst);
277 let (_, table_page) = self
278 .buffer_pool
279 .fetch_table_page(first_page_id, self.schema.clone())?;
280
281 if table_page.header.num_tuples == 0 {
282 Ok(None)
283 } else {
284 Ok(Some(RecordId::new(first_page_id, 0)))
285 }
286 }
287
288 pub fn get_next_rid(&self, rid: RecordId) -> QuillSQLResult<Option<RecordId>> {
289 let (_, table_page) = self
290 .buffer_pool
291 .fetch_table_page(rid.page_id, self.schema.clone())?;
292 let next_rid = table_page.get_next_rid(&rid);
293 if next_rid.is_some() {
294 return Ok(next_rid);
295 }
296
297 if table_page.header.next_page_id == INVALID_PAGE_ID {
298 return Ok(None);
299 }
300 let (_, next_table_page) = self
301 .buffer_pool
302 .fetch_table_page(table_page.header.next_page_id, self.schema.clone())?;
303
304 if next_table_page.header.num_tuples == 0 {
305 return Ok(None);
306 }
307 Ok(Some(RecordId::new(table_page.header.next_page_id, 0)))
308 }
309
310 pub fn recovery_view(buffer_pool: Arc<BufferManager>) -> Self {
313 Self {
314 schema: EMPTY_SCHEMA_REF.clone(),
315 buffer_pool,
316 first_page_id: AtomicU32::new(0),
317 last_page_id: AtomicU32::new(0),
318 }
319 }
320
321 pub fn recover_set_tuple_meta(&self, rid: RecordId, meta: TupleMeta) -> QuillSQLResult<()> {
324 let mut guard = self.buffer_pool.fetch_page_write(rid.page_id)?;
325 let (mut header, hdr_len) =
326 crate::storage::codec::TablePageHeaderCodec::decode(guard.data())?;
327 if (rid.slot_num as usize) >= header.tuple_infos.len() {
328 return Ok(());
329 }
330 let info = &mut header.tuple_infos[rid.slot_num as usize];
331 if info.meta.is_deleted != meta.is_deleted {
332 if meta.is_deleted {
333 header.num_deleted_tuples = header.num_deleted_tuples.saturating_add(1);
334 } else {
335 header.num_deleted_tuples = header.num_deleted_tuples.saturating_sub(1);
336 }
337 }
338 info.meta = meta;
339 let new_header = crate::storage::codec::TablePageHeaderCodec::encode(&header);
340 let copy_len = std::cmp::min(hdr_len, new_header.len());
341 guard.data_mut()[0..copy_len].copy_from_slice(&new_header[..copy_len]);
342 guard.mark_dirty();
343 Ok(())
344 }
345
346 pub fn recover_set_tuple_bytes(&self, rid: RecordId, new_bytes: &[u8]) -> QuillSQLResult<()> {
349 let mut guard = self.buffer_pool.fetch_page_write(rid.page_id)?;
350 let (mut header, _hdr_len) =
351 crate::storage::codec::TablePageHeaderCodec::decode(guard.data())?;
352 if (rid.slot_num as usize) >= header.tuple_infos.len() {
353 return Ok(());
354 }
355 let slot = rid.slot_num as usize;
356 let info = &mut header.tuple_infos[slot];
357 let off = info.offset as usize;
358 let sz = info.size as usize;
359 if new_bytes.len() == sz {
360 if off + sz <= crate::buffer::PAGE_SIZE {
361 guard.data_mut()[off..off + sz].copy_from_slice(new_bytes);
362 }
363 guard.mark_dirty();
364 return Ok(());
365 }
366 let n = header.tuple_infos.len();
367 let mut tuples: Vec<Vec<u8>> = Vec::with_capacity(n);
368 for i in 0..n {
369 let inf = &header.tuple_infos[i];
370 let s = &guard.data()[inf.offset as usize..(inf.offset + inf.size) as usize];
371 if i == slot {
372 tuples.push(new_bytes.to_vec());
373 } else {
374 tuples.push(s.to_vec());
375 }
376 }
377 let mut tail = crate::buffer::PAGE_SIZE;
378 for i in 0..n {
379 let sz = tuples[i].len();
380 tail = tail.saturating_sub(sz);
381 header.tuple_infos[i].offset = tail as u16;
382 header.tuple_infos[i].size = sz as u16;
383 }
384 let new_header = crate::storage::codec::TablePageHeaderCodec::encode(&header);
385 for b in guard.data_mut().iter_mut() {
386 *b = 0;
387 }
388 let hdr_copy = std::cmp::min(new_header.len(), crate::buffer::PAGE_SIZE);
389 guard.data_mut()[0..hdr_copy].copy_from_slice(&new_header[..hdr_copy]);
390 for i in 0..n {
391 let off = header.tuple_infos[i].offset as usize;
392 let sz = header.tuple_infos[i].size as usize;
393 if off + sz <= crate::buffer::PAGE_SIZE {
394 guard.data_mut()[off..off + sz].copy_from_slice(&tuples[i][..sz]);
395 }
396 }
397 guard.mark_dirty();
398 Ok(())
399 }
400
401 pub fn recover_restore_tuple(
402 &self,
403 rid: RecordId,
404 meta: TupleMeta,
405 tuple: &Tuple,
406 ) -> QuillSQLResult<()> {
407 let bytes = TupleCodec::encode(tuple);
408 self.recover_set_tuple_bytes(rid, &bytes)?;
409 self.recover_set_tuple_meta(rid, meta)
410 }
411
412 pub fn recover_delete_tuple(
413 &self,
414 rid: RecordId,
415 txn_id: TransactionId,
416 cid: CommandId,
417 ) -> QuillSQLResult<()> {
418 let mut meta = self.tuple_meta(rid)?;
419 if meta.is_deleted {
420 return Ok(());
421 }
422 meta.mark_deleted(txn_id, cid);
423 self.recover_set_tuple_meta(rid, meta)
424 }
425
426 pub fn delete_tuple(
427 &self,
428 rid: RecordId,
429 txn_id: TransactionId,
430 cid: CommandId,
431 ) -> QuillSQLResult<()> {
432 let mut page_guard = self.buffer_pool.fetch_page_write(rid.page_id)?;
433 let mut table_page = TablePageCodec::decode(page_guard.data(), self.schema.clone())?.0;
434 table_page.set_lsn(page_guard.lsn());
435
436 let slot = rid.slot_num as u16;
437 let (mut meta, tuple) = table_page.tuple(slot)?;
438 if meta.is_deleted {
439 return Ok(());
440 }
441 let old_meta = meta;
443 meta.mark_deleted(txn_id, cid);
444 table_page.update_tuple_meta(meta, slot)?;
445 let old_tuple_bytes = TupleCodec::encode(&tuple);
446
447 let relation = self.relation_ident();
448 let payload = HeapRecordPayload::Delete(HeapDeletePayload {
449 relation,
450 page_id: rid.page_id,
451 slot_id: slot,
452 op_txn_id: txn_id,
453 old_tuple_meta: TupleMetaRepr::from(old_meta),
454 old_tuple_data: Some(old_tuple_bytes),
455 });
456 self.append_heap_record(payload)?;
457 self.write_back_page(rid.page_id, &mut page_guard, &mut table_page)
458 }
459}
460
461#[derive(Debug)]
462pub struct TableIterator {
463 heap: Arc<TableHeap>,
464 start_bound: Bound<RecordId>,
465 end_bound: Bound<RecordId>,
466 cursor: RecordId,
467 started: bool,
468 ended: bool,
469 strategy: ScanStrategy,
470}
471
472#[derive(Debug)]
473enum ScanStrategy {
474 Cached,
476 Streaming(StreamScanState),
478}
479
480#[derive(Debug)]
481struct StreamScanState {
482 ring: RingBuffer<(RecordId, TupleMeta, Tuple)>,
483 first_page: bool,
484 prefetch: StreamPrefetchState,
485}
486
487#[derive(Debug)]
488struct StreamPrefetchState {
489 pending: VecDeque<PageId>,
490 inflight: VecDeque<StreamBatch>,
491 ready: VecDeque<(PageId, BytesMut)>,
492 readahead: usize,
493 exhausted: bool,
494}
495
496#[derive(Debug)]
497struct StreamBatch {
498 page_ids: Vec<PageId>,
499 rx: DiskCommandResultReceiver<Vec<BytesMut>>,
500}
501
502impl StreamPrefetchState {
503 fn ensure_ready(&mut self, scheduler: &Arc<DiskScheduler>) -> QuillSQLResult<()> {
504 while !self.exhausted && self.ready.is_empty() {
505 let capacity = self
506 .readahead
507 .saturating_sub(self.ready.len() + self.inflight.len());
508 if capacity > 0 && !self.pending.is_empty() {
509 self.schedule_batch(scheduler, capacity)?;
510 continue;
511 }
512 if let Some(batch) = self.inflight.pop_front() {
513 let buffers = batch.rx.recv().map_err(|e| {
514 QuillSQLError::Internal(format!("DiskScheduler channel disconnected: {}", e))
515 })??;
516 for (pid, bytes) in batch.page_ids.into_iter().zip(buffers.into_iter()) {
517 self.ready.push_back((pid, bytes));
518 }
519 } else {
520 self.exhausted = true;
521 }
522 }
523 Ok(())
524 }
525
526 fn maybe_schedule(&mut self, scheduler: &Arc<DiskScheduler>) -> QuillSQLResult<()> {
527 if self.exhausted {
528 return Ok(());
529 }
530 let capacity = self
531 .readahead
532 .saturating_sub(self.ready.len() + self.inflight.len());
533 if capacity == 0 || self.pending.is_empty() {
534 return Ok(());
535 }
536 self.schedule_batch(scheduler, capacity)
537 }
538
539 fn schedule_batch(
540 &mut self,
541 scheduler: &Arc<DiskScheduler>,
542 limit: usize,
543 ) -> QuillSQLResult<()> {
544 let mut ids = Vec::with_capacity(limit);
545 while ids.len() < limit {
546 if let Some(pid) = self.pending.pop_front() {
547 ids.push(pid);
548 } else {
549 break;
550 }
551 }
552 if ids.is_empty() {
553 return Ok(());
554 }
555 let rx = scheduler.schedule_read_pages(ids.clone())?;
556 self.inflight.push_back(StreamBatch { page_ids: ids, rx });
557 Ok(())
558 }
559}
560
561impl TableIterator {
562 pub fn new<R: RangeBounds<RecordId>>(heap: Arc<TableHeap>, range: R) -> Self {
563 Self::new_with_hint(heap, range, None)
564 }
565
566 pub fn new_with_hint<R: RangeBounds<RecordId>>(
567 heap: Arc<TableHeap>,
568 range: R,
569 streaming_hint: Option<bool>,
570 ) -> Self {
571 let start = range.start_bound().cloned();
572 let end = range.end_bound().cloned();
573
574 let cfg = TableScanConfig::default();
576 let pool_quarter = (heap.buffer_pool.buffer_pool().capacity().max(1) / 4) as u32;
577 let threshold: u32 = cfg.stream_threshold_pages.unwrap_or(pool_quarter.max(1));
578 let readahead: usize = cfg.readahead_pages;
579
580 let approx_pages = heap
581 .last_page_id
582 .load(Ordering::SeqCst)
583 .saturating_sub(heap.first_page_id.load(Ordering::SeqCst))
584 + 1;
585
586 let is_full_scan = matches!(start, Bound::Unbounded) && matches!(end, Bound::Unbounded);
587
588 let requested_stream = match streaming_hint {
590 Some(true) => true,
591 Some(false) => false,
592 None => cfg.stream_scan_enable || approx_pages >= threshold,
593 };
594 let use_streaming = if matches!(streaming_hint, Some(true)) {
596 true
597 } else {
598 is_full_scan && requested_stream
599 };
600
601 let strategy = if use_streaming {
602 let tuple_ring_cap = readahead.max(1).saturating_mul(1024);
603 let ring = RingBuffer::with_capacity(tuple_ring_cap);
604 let default_first = heap.first_page_id.load(Ordering::SeqCst);
605 let start_pid = match &start {
606 Bound::Included(r) | Bound::Excluded(r) => r.page_id,
607 Bound::Unbounded => default_first,
608 };
609 let mut pending = VecDeque::new();
610 let exhausted = if start_pid == INVALID_PAGE_ID {
611 true
612 } else {
613 pending.push_back(start_pid);
614 false
615 };
616 let prefetch = StreamPrefetchState {
617 pending,
618 inflight: VecDeque::new(),
619 ready: VecDeque::new(),
620 readahead: readahead.max(1),
621 exhausted,
622 };
623 ScanStrategy::Streaming(StreamScanState {
624 ring,
625 first_page: true,
626 prefetch,
627 })
628 } else {
629 ScanStrategy::Cached
630 };
631
632 Self {
633 heap,
634 start_bound: start,
635 end_bound: end,
636 cursor: INVALID_RID,
637 started: false,
638 ended: false,
639 strategy,
640 }
641 }
642
643 pub fn next(&mut self) -> QuillSQLResult<Option<(RecordId, TupleMeta, Tuple)>> {
644 if self.ended {
645 return Ok(None);
646 }
647
648 let heap_arc = self.heap.clone();
652 let schema = self.heap.schema.clone();
653 let start_bound_cloned = self.start_bound.clone();
654
655 if let ScanStrategy::Streaming(state) = &mut self.strategy {
657 if !self.started {
659 self.started = true;
660 if state.prefetch.exhausted {
661 self.ended = true;
662 return Ok(None);
663 }
664 self.heap.buffer_pool.flush_all_pages()?;
666 fill_stream_ring(&heap_arc, schema.clone(), &start_bound_cloned, state)?;
667 }
668
669 loop {
670 if let Some((rid, meta, tuple)) = state.ring.pop() {
671 match &self.end_bound {
673 Bound::Unbounded => return Ok(Some((rid, meta, tuple))),
674 Bound::Included(end) => {
675 if rid == *end {
676 self.ended = true;
677 }
678 return Ok(Some((rid, meta, tuple)));
679 }
680 Bound::Excluded(end) => {
681 if rid == *end {
682 self.ended = true;
683 return Ok(None);
684 }
685 return Ok(Some((rid, meta, tuple)));
686 }
687 }
688 } else {
689 if state.prefetch.exhausted {
690 self.ended = true;
691 return Ok(None);
692 }
693 fill_stream_ring(&heap_arc, schema.clone(), &start_bound_cloned, state)?;
694 if state.ring.is_empty() {
695 if state.prefetch.exhausted {
696 self.ended = true;
697 }
698 return Ok(None);
699 }
700 }
701 }
702 }
703
704 if self.started {
706 match self.end_bound {
707 Bound::Included(rid) => {
708 if let Some(next_rid) = self.heap.get_next_rid(self.cursor)? {
709 self.cursor = next_rid;
710 if self.cursor == rid {
711 self.ended = true;
712 }
713 let (meta, tuple) = self.heap.full_tuple(self.cursor)?;
714 Ok(Some((self.cursor, meta, tuple)))
715 } else {
716 Ok(None)
717 }
718 }
719 Bound::Excluded(rid) => {
720 if let Some(next_rid) = self.heap.get_next_rid(self.cursor)? {
721 if next_rid == rid {
722 self.ended = true;
723 Ok(None)
724 } else {
725 self.cursor = next_rid;
726 let (meta, tuple) = self.heap.full_tuple(self.cursor)?;
727 Ok(Some((self.cursor, meta, tuple)))
728 }
729 } else {
730 Ok(None)
731 }
732 }
733 Bound::Unbounded => {
734 if let Some(next_rid) = self.heap.get_next_rid(self.cursor)? {
735 self.cursor = next_rid;
736 let (meta, tuple) = self.heap.full_tuple(self.cursor)?;
737 Ok(Some((self.cursor, meta, tuple)))
738 } else {
739 Ok(None)
740 }
741 }
742 }
743 } else {
744 self.started = true;
745 match self.start_bound {
746 Bound::Included(rid) => {
747 self.cursor = rid;
748 let (meta, tuple) = self.heap.full_tuple(self.cursor)?;
749 Ok(Some((self.cursor, meta, tuple)))
750 }
751 Bound::Excluded(rid) => {
752 if let Some(next_rid) = self.heap.get_next_rid(rid)? {
753 self.cursor = next_rid;
754 let (meta, tuple) = self.heap.full_tuple(self.cursor)?;
755 Ok(Some((self.cursor, meta, tuple)))
756 } else {
757 self.ended = true;
758 Ok(None)
759 }
760 }
761 Bound::Unbounded => {
762 if let Some(first_rid) = self.heap.get_first_rid()? {
763 self.cursor = first_rid;
764 let (meta, tuple) = self.heap.full_tuple(self.cursor)?;
765 Ok(Some((self.cursor, meta, tuple)))
766 } else {
767 self.ended = true;
768 Ok(None)
769 }
770 }
771 }
772 }
773 }
774}
775
776fn fill_stream_ring(
777 heap: &Arc<TableHeap>,
778 schema: SchemaRef,
779 start_bound: &Bound<RecordId>,
780 state: &mut StreamScanState,
781) -> QuillSQLResult<()> {
782 let scheduler = heap.buffer_pool.buffer_pool().disk_scheduler();
783 while state.ring.len() < state.ring.capacity() && !state.prefetch.exhausted {
784 state.prefetch.ensure_ready(&scheduler)?;
785 let Some((pid, bytes)) = state.prefetch.ready.pop_front() else {
786 break;
787 };
788
789 let (page, _) = TablePageCodec::decode(&bytes, schema.clone())?;
790 if page.header.next_page_id != INVALID_PAGE_ID {
791 state.prefetch.pending.push_back(page.header.next_page_id);
792 }
793 state.prefetch.maybe_schedule(&scheduler)?;
794
795 let start_slot = if state.first_page {
796 state.first_page = false;
797 match start_bound {
798 Bound::Included(r) if r.page_id == pid => r.slot_num as usize,
799 Bound::Excluded(r) if r.page_id == pid => r.slot_num as usize + 1,
800 _ => 0,
801 }
802 } else {
803 0
804 };
805
806 for slot in start_slot..page.header.num_tuples as usize {
807 let rid = RecordId::new(pid, slot as u32);
808 let (meta, tuple) = page.tuple(slot as u16)?;
809 state.ring.push((rid, meta, tuple));
810 if state.ring.len() >= state.ring.capacity() {
811 break;
812 }
813 }
814 }
815 Ok(())
816}
817
818#[cfg(test)]
819mod tests {
820
821 use std::sync::Arc;
822 use tempfile::TempDir;
823
824 use crate::buffer::BufferManager;
825 use crate::catalog::{Column, DataType, Schema};
826 use crate::storage::codec::TupleCodec;
827 use crate::storage::disk_manager::DiskManager;
828 use crate::storage::disk_scheduler::DiskScheduler;
829 use crate::storage::page::EMPTY_TUPLE_META;
830 use crate::storage::table_heap::TableIterator;
831 use crate::storage::{table_heap::TableHeap, tuple::Tuple};
832
833 #[test]
834 pub fn test_table_heap_update_tuple_meta() {
835 let temp_dir = TempDir::new().unwrap();
836 let temp_path = temp_dir.path().join("test.db");
837
838 let schema = Arc::new(Schema::new(vec![
839 Column::new("a", DataType::Int8, false),
840 Column::new("b", DataType::Int16, false),
841 ]));
842 let disk_manager = DiskManager::try_new(temp_path).unwrap();
843 let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
844 let buffer_pool = Arc::new(BufferManager::new(1000, disk_scheduler));
845 let table_heap = TableHeap::try_new(schema.clone(), buffer_pool).unwrap();
846
847 let _rid1 = table_heap
848 .insert_tuple(
849 &EMPTY_TUPLE_META,
850 &Tuple::new(schema.clone(), vec![1i8.into(), 1i16.into()]),
851 )
852 .unwrap();
853 let rid2 = table_heap
854 .insert_tuple(
855 &EMPTY_TUPLE_META,
856 &Tuple::new(schema.clone(), vec![2i8.into(), 2i16.into()]),
857 )
858 .unwrap();
859 let _rid3 = table_heap
860 .insert_tuple(
861 &EMPTY_TUPLE_META,
862 &Tuple::new(schema.clone(), vec![3i8.into(), 3i16.into()]),
863 )
864 .unwrap();
865
866 let mut meta = table_heap.tuple_meta(rid2).unwrap();
867 meta.insert_txn_id = 1;
868 meta.mark_deleted(2, 0);
869 meta.is_deleted = true;
870 table_heap.update_tuple_meta(meta, rid2).unwrap();
871
872 let meta = table_heap.tuple_meta(rid2).unwrap();
873 assert_eq!(meta.insert_txn_id, 1);
874 assert_eq!(meta.delete_txn_id, 2);
875 assert_eq!(meta.delete_cid, 0);
876 assert!(meta.is_deleted);
877 }
878
879 #[test]
880 pub fn test_table_heap_insert_tuple() {
881 let temp_dir = TempDir::new().unwrap();
882 let temp_path = temp_dir.path().join("test.db");
883
884 let schema = Arc::new(Schema::new(vec![
885 Column::new("a", DataType::Int8, false),
886 Column::new("b", DataType::Int16, false),
887 ]));
888 let disk_manager = DiskManager::try_new(temp_path).unwrap();
889 let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
890 let buffer_pool = Arc::new(BufferManager::new(1000, disk_scheduler));
891 let table_heap = TableHeap::try_new(schema.clone(), buffer_pool).unwrap();
892
893 let meta1 = super::TupleMeta::new(1, 0);
894 let rid1 = table_heap
895 .insert_tuple(
896 &meta1,
897 &Tuple::new(schema.clone(), vec![1i8.into(), 1i16.into()]),
898 )
899 .unwrap();
900 let meta2 = super::TupleMeta::new(2, 0);
901 let rid2 = table_heap
902 .insert_tuple(
903 &meta2,
904 &Tuple::new(schema.clone(), vec![2i8.into(), 2i16.into()]),
905 )
906 .unwrap();
907 let meta3 = super::TupleMeta::new(3, 0);
908 let rid3 = table_heap
909 .insert_tuple(
910 &meta3,
911 &Tuple::new(schema.clone(), vec![3i8.into(), 3i16.into()]),
912 )
913 .unwrap();
914
915 let (meta, tuple) = table_heap.full_tuple(rid1).unwrap();
916 assert_eq!(meta, meta1);
917 assert_eq!(tuple.data, vec![1i8.into(), 1i16.into()]);
918
919 let (meta, tuple) = table_heap.full_tuple(rid2).unwrap();
920 assert_eq!(meta, meta2);
921 assert_eq!(tuple.data, vec![2i8.into(), 2i16.into()]);
922
923 let (meta, tuple) = table_heap.full_tuple(rid3).unwrap();
924 assert_eq!(meta, meta3);
925 assert_eq!(tuple.data, vec![3i8.into(), 3i16.into()]);
926 }
927
928 #[test]
929 pub fn test_table_heap_iterator() {
930 let temp_dir = TempDir::new().unwrap();
931 let temp_path = temp_dir.path().join("test.db");
932
933 let schema = Arc::new(Schema::new(vec![
934 Column::new("a", DataType::Int8, false),
935 Column::new("b", DataType::Int16, false),
936 ]));
937
938 let disk_manager = DiskManager::try_new(temp_path).unwrap();
939 let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
940 let buffer_pool = Arc::new(BufferManager::new(1000, disk_scheduler));
941 let table_heap = Arc::new(TableHeap::try_new(schema.clone(), buffer_pool).unwrap());
942
943 let meta1 = super::TupleMeta::new(1, 0);
944 let rid1 = table_heap
945 .insert_tuple(
946 &meta1,
947 &Tuple::new(schema.clone(), vec![1i8.into(), 1i16.into()]),
948 )
949 .unwrap();
950 let meta2 = super::TupleMeta::new(2, 0);
951 let rid2 = table_heap
952 .insert_tuple(
953 &meta2,
954 &Tuple::new(schema.clone(), vec![2i8.into(), 2i16.into()]),
955 )
956 .unwrap();
957 let meta3 = super::TupleMeta::new(3, 0);
958 let rid3 = table_heap
959 .insert_tuple(
960 &meta3,
961 &Tuple::new(schema.clone(), vec![3i8.into(), 3i16.into()]),
962 )
963 .unwrap();
964
965 let mut iterator = TableIterator::new(table_heap.clone(), ..);
966
967 let (rid, meta, tuple) = iterator.next().unwrap().unwrap();
968 assert_eq!(rid, rid1);
969 assert_eq!(meta, meta1);
970 assert_eq!(tuple.data, vec![1i8.into(), 1i16.into()]);
971
972 let (rid, meta, tuple) = iterator.next().unwrap().unwrap();
973 assert_eq!(rid, rid2);
974 assert_eq!(meta, meta2);
975 assert_eq!(tuple.data, vec![2i8.into(), 2i16.into()]);
976
977 let (rid, meta, tuple) = iterator.next().unwrap().unwrap();
978 assert_eq!(rid, rid3);
979 assert_eq!(meta, meta3);
980 assert_eq!(tuple.data, vec![3i8.into(), 3i16.into()]);
981
982 assert!(iterator.next().unwrap().is_none());
983 }
984
985 #[test]
986 pub fn test_streaming_seq_scan_ring() {
987 std::env::set_var("QUILL_STREAM_SCAN", "1");
989 std::env::set_var("QUILL_STREAM_READAHEAD", "2");
990
991 let temp_dir = TempDir::new().unwrap();
992 let temp_path = temp_dir.path().join("test.db");
993
994 let schema = Arc::new(Schema::new(vec![
995 Column::new("a", DataType::Int8, false),
996 Column::new("b", DataType::Int16, false),
997 ]));
998
999 let disk_manager = DiskManager::try_new(temp_path).unwrap();
1000 let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
1001 let buffer_pool = Arc::new(BufferManager::new(128, disk_scheduler));
1002 let table_heap = Arc::new(TableHeap::try_new(schema.clone(), buffer_pool).unwrap());
1003
1004 let rows = 1000;
1006 for i in 0..rows {
1007 let _rid = table_heap
1008 .insert_tuple(
1009 &super::TupleMeta::new(1, 0),
1010 &Tuple::new(schema.clone(), vec![(i as i8).into(), (i as i16).into()]),
1011 )
1012 .unwrap();
1013 }
1014
1015 table_heap.buffer_pool.flush_all_pages().unwrap();
1017
1018 let mut it = TableIterator::new(table_heap.clone(), ..);
1020 let mut cnt = 0usize;
1021 while let Some((_rid, _meta, _t)) = it.next().unwrap() {
1022 cnt += 1;
1023 }
1024 assert_eq!(cnt, rows);
1025 }
1026
1027 #[test]
1028 pub fn test_streaming_respects_bounds_and_fallbacks() {
1029 std::env::set_var("QUILL_STREAM_SCAN", "1");
1031 std::env::set_var("QUILL_STREAM_READAHEAD", "2");
1032
1033 let temp_dir = TempDir::new().unwrap();
1034 let temp_path = temp_dir.path().join("test.db");
1035
1036 let schema = Arc::new(Schema::new(vec![
1037 Column::new("a", DataType::Int8, false),
1038 Column::new("b", DataType::Int16, false),
1039 ]));
1040
1041 let disk_manager = DiskManager::try_new(temp_path).unwrap();
1042 let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
1043 let buffer_pool = Arc::new(BufferManager::new(128, disk_scheduler));
1044 let table_heap = Arc::new(TableHeap::try_new(schema.clone(), buffer_pool).unwrap());
1045
1046 let rid1 = table_heap
1047 .insert_tuple(
1048 &super::TupleMeta::new(1, 0),
1049 &Tuple::new(schema.clone(), vec![1i8.into(), 1i16.into()]),
1050 )
1051 .unwrap();
1052 let rid2 = table_heap
1053 .insert_tuple(
1054 &super::TupleMeta::new(2, 0),
1055 &Tuple::new(schema.clone(), vec![2i8.into(), 2i16.into()]),
1056 )
1057 .unwrap();
1058 let rid3 = table_heap
1059 .insert_tuple(
1060 &super::TupleMeta::new(3, 0),
1061 &Tuple::new(schema.clone(), vec![3i8.into(), 3i16.into()]),
1062 )
1063 .unwrap();
1064
1065 let mut it = TableIterator::new_with_hint(table_heap.clone(), rid1..=rid2, Some(true));
1067
1068 let got1 = it.next().unwrap().unwrap();
1069 let got2 = it.next().unwrap().unwrap();
1070 let got3 = it.next().unwrap();
1071
1072 assert_eq!(got1.0, rid1);
1073 assert_eq!(got2.0, rid2);
1074 assert!(got3.is_none());
1075
1076 let (_m, t3) = table_heap.full_tuple(rid3).unwrap();
1078 assert_eq!(t3.data, vec![3i8.into(), 3i16.into()]);
1079 }
1080
1081 #[test]
1082 pub fn test_recover_set_tuple_meta_and_bytes() {
1083 let temp_dir = TempDir::new().unwrap();
1084 let temp_path = temp_dir.path().join("test.db");
1085
1086 let schema = Arc::new(Schema::new(vec![
1087 Column::new("a", DataType::Int8, false),
1088 Column::new("b", DataType::Int16, false),
1089 ]));
1090 let disk_manager = DiskManager::try_new(temp_path).unwrap();
1091 let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
1092 let buffer_pool = Arc::new(BufferManager::new(128, disk_scheduler));
1093 let table_heap = TableHeap::try_new(schema.clone(), buffer_pool.clone()).unwrap();
1094
1095 let rid = table_heap
1097 .insert_tuple(
1098 &EMPTY_TUPLE_META,
1099 &Tuple::new(schema.clone(), vec![1i8.into(), 10i16.into()]),
1100 )
1101 .unwrap();
1102
1103 let new_tuple = Tuple::new(schema.clone(), vec![2i8.into(), 20i16.into()]);
1105 let new_bytes = TupleCodec::encode(&new_tuple);
1106 table_heap
1107 .recover_set_tuple_bytes(rid, &new_bytes)
1108 .expect("recover bytes");
1109
1110 let (_m, t) = table_heap.full_tuple(rid).unwrap();
1112 assert_eq!(t.data, vec![2i8.into(), 20i16.into()]);
1113
1114 let mut meta = table_heap.tuple_meta(rid).unwrap();
1116 meta.is_deleted = true;
1117 table_heap
1118 .recover_set_tuple_meta(rid, meta)
1119 .expect("recover meta");
1120 let m2 = table_heap.tuple_meta(rid).unwrap();
1121 assert!(m2.is_deleted);
1122 }
1123
1124 #[test]
1125 pub fn test_recover_repack_on_size_mismatch() {
1126 let temp_dir = TempDir::new().unwrap();
1127 let temp_path = temp_dir.path().join("test.db");
1128
1129 let schema = Arc::new(Schema::new(vec![
1130 Column::new("a", DataType::Int8, false),
1131 Column::new("b", DataType::Int16, false),
1132 ]));
1133 let disk_manager = DiskManager::try_new(temp_path).unwrap();
1134 let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
1135 let buffer_pool = Arc::new(BufferManager::new(128, disk_scheduler));
1136 let table_heap = TableHeap::try_new(schema.clone(), buffer_pool.clone()).unwrap();
1137
1138 let rid = table_heap
1139 .insert_tuple(
1140 &EMPTY_TUPLE_META,
1141 &Tuple::new(schema.clone(), vec![1i8.into(), 10i16.into()]),
1142 )
1143 .unwrap();
1144
1145 let larger_tuple = Tuple::new(schema.clone(), vec![99i8.into(), 300i16.into()]);
1147 let larger_bytes = TupleCodec::encode(&larger_tuple);
1148 table_heap
1149 .recover_set_tuple_bytes(rid, &larger_bytes)
1150 .expect("recover larger bytes");
1151
1152 let (_m, t2) = table_heap.full_tuple(rid).unwrap();
1153 assert_eq!(t2.data, vec![99i8.into(), 300i16.into()]);
1154 }
1155}