1use crate::buffer::{AtomicPageId, PageId, WritePageGuard, INVALID_PAGE_ID};
2use crate::catalog::{SchemaRef, EMPTY_SCHEMA_REF};
3use crate::config::TableScanConfig;
4use crate::storage::codec::TablePageCodec;
5use crate::storage::disk_scheduler::{DiskCommandResultReceiver, DiskScheduler};
6use crate::storage::page::{RecordId, TablePage, TupleMeta, INVALID_RID};
7use crate::transaction::{CommandId, TransactionId, INVALID_COMMAND_ID};
8use crate::{
9 buffer::BufferManager,
10 error::{QuillSQLError, QuillSQLResult},
11};
12use bytes::BytesMut;
13use std::collections::Bound;
14use std::collections::VecDeque;
15use std::ops::RangeBounds;
16use std::sync::atomic::{AtomicU32, Ordering};
17use std::sync::Arc;
18
19use crate::recovery::wal_record::WalRecordPayload;
20use crate::storage::codec::TupleCodec;
21use crate::storage::heap::wal_codec::{
22 HeapDeletePayload, HeapInsertPayload, HeapRecordPayload, HeapUpdatePayload, RelationIdent,
23 TupleMetaRepr,
24};
25use crate::storage::tuple::Tuple;
26use crate::utils::ring_buffer::RingBuffer;
27
28#[derive(Debug)]
29pub struct TableHeap {
30 pub schema: SchemaRef,
31 pub buffer_pool: Arc<BufferManager>,
32 pub first_page_id: AtomicPageId,
33 pub last_page_id: AtomicPageId,
34}
35
36impl TableHeap {
37 pub fn try_new(schema: SchemaRef, buffer_pool: Arc<BufferManager>) -> QuillSQLResult<Self> {
39 let mut first_page_guard = buffer_pool.new_page()?;
41 let first_page_id = first_page_guard.page_id();
42
43 let table_page = TablePage::new(schema.clone(), INVALID_PAGE_ID);
45 let encoded_data = TablePageCodec::encode(&table_page);
46
47 first_page_guard.data_mut().copy_from_slice(&encoded_data);
50 first_page_guard.set_lsn(table_page.lsn());
51
52 Ok(Self {
54 schema,
55 buffer_pool,
56 first_page_id: AtomicU32::new(first_page_id),
57 last_page_id: AtomicU32::new(first_page_id),
58 })
59 }
60
61 fn write_back_page(
62 &self,
63 guard: &mut WritePageGuard,
64 table_page: &mut TablePage,
65 ) -> QuillSQLResult<()> {
66 let new_image = TablePageCodec::encode(table_page);
67 guard.apply_page_image(&new_image)?;
68 Ok(())
69 }
70
71 pub(crate) fn relation_ident(&self) -> RelationIdent {
72 RelationIdent {
73 root_page_id: self.first_page_id.load(Ordering::SeqCst),
74 }
75 }
76
77 fn append_heap_record(&self, payload: HeapRecordPayload) -> QuillSQLResult<()> {
78 if let Some(wal) = self.buffer_pool.wal_manager() {
79 let _wal_result =
80 wal.append_record_with(|_| WalRecordPayload::Heap(payload.clone()))?;
81 }
82 Ok(())
83 }
84
85 pub fn insert_tuple(&self, meta: &TupleMeta, tuple: &Tuple) -> QuillSQLResult<RecordId> {
99 let mut current_page_id = self.last_page_id.load(Ordering::SeqCst);
100
101 loop {
102 let mut current_page_guard = self.buffer_pool.fetch_page_write(current_page_id)?;
103 let mut table_page =
104 TablePageCodec::decode(current_page_guard.data(), self.schema.clone())?.0;
105 table_page.set_lsn(current_page_guard.lsn());
106
107 if table_page.next_tuple_offset(tuple).is_ok() {
108 let tuple_bytes = TupleCodec::encode(tuple);
109 let slot_id = table_page.insert_tuple(meta, tuple)?;
110 let relation = self.relation_ident();
111 let tuple_meta = TupleMetaRepr::from(*meta);
112 let payload = HeapRecordPayload::Insert(HeapInsertPayload {
113 relation,
114 page_id: current_page_id,
115 slot_id,
116 op_txn_id: meta.insert_txn_id,
117 tuple_meta,
118 tuple_data: tuple_bytes,
119 });
120 self.append_heap_record(payload.clone())?;
121 self.write_back_page(&mut current_page_guard, &mut table_page)?;
122 return Ok(RecordId::new(current_page_id, slot_id as u32));
123 }
124
125 let mut new_page_guard = self.buffer_pool.new_page()?;
126 let new_page_id = new_page_guard.page_id();
127 let mut new_table_page = TablePage::new(self.schema.clone(), INVALID_PAGE_ID);
128 self.write_back_page(&mut new_page_guard, &mut new_table_page)?;
129
130 table_page.header.next_page_id = new_page_id;
131 self.write_back_page(&mut current_page_guard, &mut table_page)?;
132 drop(current_page_guard);
133
134 self.last_page_id.store(new_page_id, Ordering::SeqCst);
135 current_page_id = new_page_id;
136 }
137 }
138
139 pub fn mvcc_insert_version(
141 &self,
142 tuple: &Tuple,
143 txn_id: TransactionId,
144 cid: CommandId,
145 prev_version: Option<RecordId>,
146 ) -> QuillSQLResult<(RecordId, TupleMeta)> {
147 let mut meta = TupleMeta::new(txn_id, cid);
148 meta.set_prev_version(prev_version);
149 let rid = self.insert_tuple(&meta, tuple)?;
150 Ok((rid, meta))
151 }
152
153 pub fn mvcc_update(
156 &self,
157 current_rid: RecordId,
158 new_tuple: Tuple,
159 txn_id: TransactionId,
160 cid: CommandId,
161 ) -> QuillSQLResult<(RecordId, TupleMeta)> {
162 let (current_meta, _existing_tuple) = self.full_tuple(current_rid)?;
163 if current_meta.is_deleted && current_meta.next_version.is_some() {
164 return Err(QuillSQLError::Execution(format!(
165 "tuple {} has already been updated",
166 current_rid
167 )));
168 }
169
170 let prev_meta = current_meta;
172
173 let (new_rid, mut new_meta) =
175 self.mvcc_insert_version(&new_tuple, txn_id, cid, Some(current_rid))?;
176 new_meta.set_prev_version(Some(current_rid));
177
178 let mut updated_meta = current_meta;
180 updated_meta.mark_deleted(txn_id, cid);
181 updated_meta.set_next_version(Some(new_rid));
182 self.update_tuple_meta(updated_meta, current_rid)?;
183
184 Ok((new_rid, prev_meta))
185 }
186
187 pub fn mvcc_mark_deleted(
190 &self,
191 rid: RecordId,
192 txn_id: TransactionId,
193 cid: CommandId,
194 ) -> QuillSQLResult<TupleMeta> {
195 let (mut current_meta, _) = self.full_tuple(rid)?;
196 if current_meta.is_deleted {
197 return Ok(current_meta);
198 }
199 let prev_meta = current_meta;
200 current_meta.mark_deleted(txn_id, cid);
201 self.update_tuple_meta(current_meta, rid)?;
202 Ok(prev_meta)
203 }
204
205 #[allow(dead_code)]
209 pub fn mvcc_remove_version(&self, rid: RecordId) -> QuillSQLResult<()> {
210 let mut page_guard = self.buffer_pool.fetch_page_write(rid.page_id)?;
211 let mut table_page = TablePageCodec::decode(page_guard.data(), self.schema.clone())?.0;
212 table_page.set_lsn(page_guard.lsn());
213
214 let slot = rid.slot_num as usize;
215 if slot >= table_page.header.num_tuples as usize {
216 return Ok(());
217 }
218
219 if slot + 1 != table_page.header.num_tuples as usize {
220 let mut meta = table_page.header.tuple_infos[slot].meta;
222 if !meta.is_deleted {
223 meta.mark_deleted(0, INVALID_COMMAND_ID);
224 table_page.update_tuple_meta(meta, slot as u16)?;
225 self.write_back_page(&mut page_guard, &mut table_page)?;
226 }
227 return Ok(());
228 }
229
230 table_page.reclaim_tuple(slot as u16)?;
231 self.write_back_page(&mut page_guard, &mut table_page)
232 }
233
234 pub fn update_tuple(&self, rid: RecordId, tuple: Tuple) -> QuillSQLResult<()> {
235 let mut page_guard = self.buffer_pool.fetch_page_write(rid.page_id)?;
236 let mut table_page = TablePageCodec::decode(page_guard.data(), self.schema.clone())?.0;
237 table_page.set_lsn(page_guard.lsn());
238
239 let slot = rid.slot_num as u16;
240 let (old_meta, old_tuple) = table_page.tuple(slot)?;
241 let old_tuple_bytes = TupleCodec::encode(&old_tuple);
242 let new_tuple_bytes = TupleCodec::encode(&tuple);
243 table_page.update_tuple(tuple, rid.slot_num as u16)?;
244 let new_meta = table_page.header.tuple_infos[slot as usize].meta;
245 let relation = self.relation_ident();
246 self.append_heap_record(HeapRecordPayload::Update(HeapUpdatePayload {
247 relation,
248 page_id: rid.page_id,
249 slot_id: slot,
250 op_txn_id: new_meta.insert_txn_id,
251 new_tuple_meta: TupleMetaRepr::from(new_meta),
252 new_tuple_data: new_tuple_bytes,
253 old_tuple_meta: Some(TupleMetaRepr::from(old_meta)),
254 old_tuple_data: Some(old_tuple_bytes),
255 }))?;
256 self.write_back_page(&mut page_guard, &mut table_page)
257 }
258
259 pub fn update_tuple_meta(&self, meta: TupleMeta, rid: RecordId) -> QuillSQLResult<()> {
260 let mut page_guard = self.buffer_pool.fetch_page_write(rid.page_id)?;
261 let mut table_page = TablePageCodec::decode(page_guard.data(), self.schema.clone())?.0;
262 table_page.set_lsn(page_guard.lsn());
263
264 let slot = rid.slot_num as u16;
265 let (old_meta, old_tuple) = table_page.tuple(slot)?;
266 let old_tuple_bytes = TupleCodec::encode(&old_tuple);
267 table_page.update_tuple_meta(meta, slot)?;
268 let relation = self.relation_ident();
269 let payload = if meta.is_deleted && !old_meta.is_deleted {
270 HeapRecordPayload::Delete(HeapDeletePayload {
271 relation,
272 page_id: rid.page_id,
273 slot_id: slot,
274 op_txn_id: meta.delete_txn_id,
275 old_tuple_meta: TupleMetaRepr::from(old_meta),
276 old_tuple_data: Some(old_tuple_bytes),
277 })
278 } else {
279 let (_, current_tuple) = table_page.tuple(slot)?;
280 let new_tuple_bytes = TupleCodec::encode(¤t_tuple);
281 HeapRecordPayload::Update(HeapUpdatePayload {
282 relation,
283 page_id: rid.page_id,
284 slot_id: slot,
285 op_txn_id: meta.insert_txn_id,
286 new_tuple_meta: TupleMetaRepr::from(meta),
287 new_tuple_data: new_tuple_bytes,
288 old_tuple_meta: Some(TupleMetaRepr::from(old_meta)),
289 old_tuple_data: Some(old_tuple_bytes),
290 })
291 };
292 self.append_heap_record(payload)?;
293 self.write_back_page(&mut page_guard, &mut table_page)
294 }
295
296 pub fn full_tuple(&self, rid: RecordId) -> QuillSQLResult<(TupleMeta, Tuple)> {
297 let (_, table_page) = self
298 .buffer_pool
299 .fetch_table_page(rid.page_id, self.schema.clone())?;
300 let result = table_page.tuple(rid.slot_num as u16)?;
301 Ok(result)
302 }
303
304 pub fn tuple(&self, rid: RecordId) -> QuillSQLResult<Tuple> {
305 let (_meta, tuple) = self.full_tuple(rid)?;
306 Ok(tuple)
307 }
308
309 pub fn tuple_meta(&self, rid: RecordId) -> QuillSQLResult<TupleMeta> {
310 let (meta, _tuple) = self.full_tuple(rid)?;
311 Ok(meta)
312 }
313
314 pub fn get_first_rid(&self) -> QuillSQLResult<Option<RecordId>> {
315 let first_page_id = self.first_page_id.load(Ordering::SeqCst);
316 let (_, table_page) = self
317 .buffer_pool
318 .fetch_table_page(first_page_id, self.schema.clone())?;
319
320 if table_page.header.num_tuples == 0 {
321 Ok(None)
322 } else {
323 Ok(Some(RecordId::new(first_page_id, 0)))
324 }
325 }
326
327 pub fn get_next_rid(&self, rid: RecordId) -> QuillSQLResult<Option<RecordId>> {
328 let (_, table_page) = self
329 .buffer_pool
330 .fetch_table_page(rid.page_id, self.schema.clone())?;
331 let next_rid = table_page.get_next_rid(&rid);
332 if next_rid.is_some() {
333 return Ok(next_rid);
334 }
335
336 if table_page.header.next_page_id == INVALID_PAGE_ID {
337 return Ok(None);
338 }
339 let (_, next_table_page) = self
340 .buffer_pool
341 .fetch_table_page(table_page.header.next_page_id, self.schema.clone())?;
342
343 if next_table_page.header.num_tuples == 0 {
344 return Ok(None);
345 }
346 Ok(Some(RecordId::new(table_page.header.next_page_id, 0)))
347 }
348
349 pub fn recovery_view(buffer_pool: Arc<BufferManager>) -> Self {
352 Self {
353 schema: EMPTY_SCHEMA_REF.clone(),
354 buffer_pool,
355 first_page_id: AtomicU32::new(0),
356 last_page_id: AtomicU32::new(0),
357 }
358 }
359
360 pub fn recover_set_tuple_meta(&self, rid: RecordId, meta: TupleMeta) -> QuillSQLResult<()> {
363 let mut guard = self.buffer_pool.fetch_page_write(rid.page_id)?;
364 let (mut header, hdr_len) =
365 crate::storage::codec::TablePageHeaderCodec::decode(guard.data())?;
366 if (rid.slot_num as usize) >= header.tuple_infos.len() {
367 return Ok(());
368 }
369 let info = &mut header.tuple_infos[rid.slot_num as usize];
370 if info.meta.is_deleted != meta.is_deleted {
371 if meta.is_deleted {
372 header.num_deleted_tuples = header.num_deleted_tuples.saturating_add(1);
373 } else {
374 header.num_deleted_tuples = header.num_deleted_tuples.saturating_sub(1);
375 }
376 }
377 info.meta = meta;
378 let new_header = crate::storage::codec::TablePageHeaderCodec::encode(&header);
379 let copy_len = std::cmp::min(hdr_len, new_header.len());
380 guard.data_mut()[0..copy_len].copy_from_slice(&new_header[..copy_len]);
381 guard.mark_dirty();
382 Ok(())
383 }
384
385 pub fn recover_set_tuple_bytes(&self, rid: RecordId, new_bytes: &[u8]) -> QuillSQLResult<()> {
388 let mut guard = self.buffer_pool.fetch_page_write(rid.page_id)?;
389 let (mut header, _hdr_len) =
390 crate::storage::codec::TablePageHeaderCodec::decode(guard.data())?;
391 if (rid.slot_num as usize) >= header.tuple_infos.len() {
392 return Ok(());
393 }
394 let slot = rid.slot_num as usize;
395 let info = &mut header.tuple_infos[slot];
396 let off = info.offset as usize;
397 let sz = info.size as usize;
398 if new_bytes.len() == sz {
399 if off + sz <= crate::buffer::PAGE_SIZE {
400 guard.data_mut()[off..off + sz].copy_from_slice(new_bytes);
401 }
402 guard.mark_dirty();
403 return Ok(());
404 }
405 let n = header.tuple_infos.len();
406 let mut tuples: Vec<Vec<u8>> = Vec::with_capacity(n);
407 for i in 0..n {
408 let inf = &header.tuple_infos[i];
409 let s = &guard.data()[inf.offset as usize..(inf.offset + inf.size) as usize];
410 if i == slot {
411 tuples.push(new_bytes.to_vec());
412 } else {
413 tuples.push(s.to_vec());
414 }
415 }
416 let mut tail = crate::buffer::PAGE_SIZE;
417 for i in 0..n {
418 let sz = tuples[i].len();
419 tail = tail.saturating_sub(sz);
420 header.tuple_infos[i].offset = tail as u16;
421 header.tuple_infos[i].size = sz as u16;
422 }
423 let new_header = crate::storage::codec::TablePageHeaderCodec::encode(&header);
424 for b in guard.data_mut().iter_mut() {
425 *b = 0;
426 }
427 let hdr_copy = std::cmp::min(new_header.len(), crate::buffer::PAGE_SIZE);
428 guard.data_mut()[0..hdr_copy].copy_from_slice(&new_header[..hdr_copy]);
429 for i in 0..n {
430 let off = header.tuple_infos[i].offset as usize;
431 let sz = header.tuple_infos[i].size as usize;
432 if off + sz <= crate::buffer::PAGE_SIZE {
433 guard.data_mut()[off..off + sz].copy_from_slice(&tuples[i][..sz]);
434 }
435 }
436 guard.mark_dirty();
437 Ok(())
438 }
439
440 pub fn recover_restore_tuple(
441 &self,
442 rid: RecordId,
443 meta: TupleMeta,
444 tuple: &Tuple,
445 ) -> QuillSQLResult<()> {
446 let bytes = TupleCodec::encode(tuple);
447 self.recover_set_tuple_bytes(rid, &bytes)?;
448 self.recover_set_tuple_meta(rid, meta)
449 }
450
451 pub fn recover_delete_tuple(
452 &self,
453 rid: RecordId,
454 txn_id: TransactionId,
455 cid: CommandId,
456 ) -> QuillSQLResult<()> {
457 let mut meta = self.tuple_meta(rid)?;
458 if meta.is_deleted {
459 return Ok(());
460 }
461 meta.mark_deleted(txn_id, cid);
462 self.recover_set_tuple_meta(rid, meta)
463 }
464
465 pub fn delete_tuple(
466 &self,
467 rid: RecordId,
468 txn_id: TransactionId,
469 cid: CommandId,
470 ) -> QuillSQLResult<()> {
471 let _ = self.mvcc_mark_deleted(rid, txn_id, cid)?;
472 Ok(())
473 }
474
475 pub fn vacuum_slot_if<F>(&self, rid: RecordId, predicate: F) -> QuillSQLResult<bool>
478 where
479 F: FnOnce(&TupleMeta) -> bool,
480 {
481 let mut page_guard = self.buffer_pool.fetch_page_write(rid.page_id)?;
482 let mut table_page = TablePageCodec::decode(page_guard.data(), self.schema.clone())?.0;
483 table_page.set_lsn(page_guard.lsn());
484
485 let slot = rid.slot_num as u16;
486 if slot >= table_page.header.num_tuples {
487 return Ok(false);
488 }
489 let meta = table_page.header.tuple_infos[slot as usize].meta;
490 if !predicate(&meta) {
491 return Ok(false);
492 }
493
494 table_page.reclaim_tuple(slot)?;
495 self.write_back_page(&mut page_guard, &mut table_page)?;
496 Ok(true)
497 }
498}
499
500#[derive(Debug)]
501pub struct TableIterator {
502 heap: Arc<TableHeap>,
503 start_bound: Bound<RecordId>,
504 end_bound: Bound<RecordId>,
505 cursor: RecordId,
506 started: bool,
507 ended: bool,
508 strategy: ScanStrategy,
509}
510
511#[derive(Debug)]
512enum ScanStrategy {
513 Cached,
515 Streaming(StreamScanState),
517}
518
519#[derive(Debug)]
520struct StreamScanState {
521 ring: RingBuffer<(RecordId, TupleMeta, Tuple)>,
522 first_page: bool,
523 prefetch: StreamPrefetchState,
524}
525
526#[derive(Debug)]
527struct StreamPrefetchState {
528 pending: VecDeque<PageId>,
529 inflight: VecDeque<StreamBatch>,
530 ready: VecDeque<(PageId, BytesMut)>,
531 readahead: usize,
532 exhausted: bool,
533}
534
535#[derive(Debug)]
536struct StreamBatch {
537 page_ids: Vec<PageId>,
538 rx: DiskCommandResultReceiver<Vec<BytesMut>>,
539}
540
541impl StreamPrefetchState {
542 fn ensure_ready(&mut self, scheduler: &Arc<DiskScheduler>) -> QuillSQLResult<()> {
543 while !self.exhausted && self.ready.is_empty() {
544 let capacity = self
545 .readahead
546 .saturating_sub(self.ready.len() + self.inflight.len());
547 if capacity > 0 && !self.pending.is_empty() {
548 self.schedule_batch(scheduler, capacity)?;
549 continue;
550 }
551 if let Some(batch) = self.inflight.pop_front() {
552 let buffers = batch.rx.recv().map_err(|e| {
553 QuillSQLError::Internal(format!("DiskScheduler channel disconnected: {}", e))
554 })??;
555 for (pid, bytes) in batch.page_ids.into_iter().zip(buffers.into_iter()) {
556 self.ready.push_back((pid, bytes));
557 }
558 } else {
559 self.exhausted = true;
560 }
561 }
562 Ok(())
563 }
564
565 fn maybe_schedule(&mut self, scheduler: &Arc<DiskScheduler>) -> QuillSQLResult<()> {
566 if self.exhausted {
567 return Ok(());
568 }
569 let capacity = self
570 .readahead
571 .saturating_sub(self.ready.len() + self.inflight.len());
572 if capacity == 0 || self.pending.is_empty() {
573 return Ok(());
574 }
575 self.schedule_batch(scheduler, capacity)
576 }
577
578 fn schedule_batch(
579 &mut self,
580 scheduler: &Arc<DiskScheduler>,
581 limit: usize,
582 ) -> QuillSQLResult<()> {
583 let mut ids = Vec::with_capacity(limit);
584 while ids.len() < limit {
585 if let Some(pid) = self.pending.pop_front() {
586 ids.push(pid);
587 } else {
588 break;
589 }
590 }
591 if ids.is_empty() {
592 return Ok(());
593 }
594 let rx = scheduler.schedule_read_pages(ids.clone())?;
595 self.inflight.push_back(StreamBatch { page_ids: ids, rx });
596 Ok(())
597 }
598}
599
600impl TableIterator {
601 pub fn new<R: RangeBounds<RecordId>>(heap: Arc<TableHeap>, range: R) -> Self {
602 Self::new_with_hint(heap, range, None)
603 }
604
605 pub fn new_with_hint<R: RangeBounds<RecordId>>(
606 heap: Arc<TableHeap>,
607 range: R,
608 streaming_hint: Option<bool>,
609 ) -> Self {
610 let start = range.start_bound().cloned();
611 let end = range.end_bound().cloned();
612
613 let cfg = TableScanConfig::default();
615 let pool_quarter = (heap.buffer_pool.buffer_pool().capacity().max(1) / 4) as u32;
616 let threshold: u32 = cfg.stream_threshold_pages.unwrap_or(pool_quarter.max(1));
617 let readahead: usize = cfg.readahead_pages;
618
619 let approx_pages = heap
620 .last_page_id
621 .load(Ordering::SeqCst)
622 .saturating_sub(heap.first_page_id.load(Ordering::SeqCst))
623 + 1;
624
625 let is_full_scan = matches!(start, Bound::Unbounded) && matches!(end, Bound::Unbounded);
626
627 let requested_stream = match streaming_hint {
629 Some(true) => true,
630 Some(false) => false,
631 None => cfg.stream_scan_enable || approx_pages >= threshold,
632 };
633 let use_streaming = if matches!(streaming_hint, Some(true)) {
635 true
636 } else {
637 is_full_scan && requested_stream
638 };
639
640 let strategy = if use_streaming {
641 let tuple_ring_cap = readahead.max(1).saturating_mul(1024);
642 let ring = RingBuffer::with_capacity(tuple_ring_cap);
643 let default_first = heap.first_page_id.load(Ordering::SeqCst);
644 let start_pid = match &start {
645 Bound::Included(r) | Bound::Excluded(r) => r.page_id,
646 Bound::Unbounded => default_first,
647 };
648 let mut pending = VecDeque::new();
649 let exhausted = if start_pid == INVALID_PAGE_ID {
650 true
651 } else {
652 pending.push_back(start_pid);
653 false
654 };
655 let prefetch = StreamPrefetchState {
656 pending,
657 inflight: VecDeque::new(),
658 ready: VecDeque::new(),
659 readahead: readahead.max(1),
660 exhausted,
661 };
662 ScanStrategy::Streaming(StreamScanState {
663 ring,
664 first_page: true,
665 prefetch,
666 })
667 } else {
668 ScanStrategy::Cached
669 };
670
671 Self {
672 heap,
673 start_bound: start,
674 end_bound: end,
675 cursor: INVALID_RID,
676 started: false,
677 ended: false,
678 strategy,
679 }
680 }
681
682 pub fn next(&mut self) -> QuillSQLResult<Option<(RecordId, TupleMeta, Tuple)>> {
683 if self.ended {
684 return Ok(None);
685 }
686
687 let heap_arc = self.heap.clone();
691 let schema = self.heap.schema.clone();
692 let start_bound_cloned = self.start_bound.clone();
693
694 if let ScanStrategy::Streaming(state) = &mut self.strategy {
696 if !self.started {
698 self.started = true;
699 if state.prefetch.exhausted {
700 self.ended = true;
701 return Ok(None);
702 }
703 self.heap.buffer_pool.flush_all_pages()?;
705 fill_stream_ring(&heap_arc, schema.clone(), &start_bound_cloned, state)?;
706 }
707
708 loop {
709 if let Some((rid, meta, tuple)) = state.ring.pop() {
710 match &self.end_bound {
712 Bound::Unbounded => return Ok(Some((rid, meta, tuple))),
713 Bound::Included(end) => {
714 if rid == *end {
715 self.ended = true;
716 }
717 return Ok(Some((rid, meta, tuple)));
718 }
719 Bound::Excluded(end) => {
720 if rid == *end {
721 self.ended = true;
722 return Ok(None);
723 }
724 return Ok(Some((rid, meta, tuple)));
725 }
726 }
727 } else {
728 if state.prefetch.exhausted {
729 self.ended = true;
730 return Ok(None);
731 }
732 fill_stream_ring(&heap_arc, schema.clone(), &start_bound_cloned, state)?;
733 if state.ring.is_empty() {
734 if state.prefetch.exhausted {
735 self.ended = true;
736 }
737 return Ok(None);
738 }
739 }
740 }
741 }
742
743 if self.started {
745 match self.end_bound {
746 Bound::Included(rid) => {
747 if let Some(next_rid) = self.heap.get_next_rid(self.cursor)? {
748 self.cursor = next_rid;
749 if self.cursor == rid {
750 self.ended = true;
751 }
752 let (meta, tuple) = self.heap.full_tuple(self.cursor)?;
753 Ok(Some((self.cursor, meta, tuple)))
754 } else {
755 Ok(None)
756 }
757 }
758 Bound::Excluded(rid) => {
759 if let Some(next_rid) = self.heap.get_next_rid(self.cursor)? {
760 if next_rid == rid {
761 self.ended = true;
762 Ok(None)
763 } else {
764 self.cursor = next_rid;
765 let (meta, tuple) = self.heap.full_tuple(self.cursor)?;
766 Ok(Some((self.cursor, meta, tuple)))
767 }
768 } else {
769 Ok(None)
770 }
771 }
772 Bound::Unbounded => {
773 if let Some(next_rid) = self.heap.get_next_rid(self.cursor)? {
774 self.cursor = next_rid;
775 let (meta, tuple) = self.heap.full_tuple(self.cursor)?;
776 Ok(Some((self.cursor, meta, tuple)))
777 } else {
778 Ok(None)
779 }
780 }
781 }
782 } else {
783 self.started = true;
784 match self.start_bound {
785 Bound::Included(rid) => {
786 self.cursor = rid;
787 let (meta, tuple) = self.heap.full_tuple(self.cursor)?;
788 Ok(Some((self.cursor, meta, tuple)))
789 }
790 Bound::Excluded(rid) => {
791 if let Some(next_rid) = self.heap.get_next_rid(rid)? {
792 self.cursor = next_rid;
793 let (meta, tuple) = self.heap.full_tuple(self.cursor)?;
794 Ok(Some((self.cursor, meta, tuple)))
795 } else {
796 self.ended = true;
797 Ok(None)
798 }
799 }
800 Bound::Unbounded => {
801 if let Some(first_rid) = self.heap.get_first_rid()? {
802 self.cursor = first_rid;
803 let (meta, tuple) = self.heap.full_tuple(self.cursor)?;
804 Ok(Some((self.cursor, meta, tuple)))
805 } else {
806 self.ended = true;
807 Ok(None)
808 }
809 }
810 }
811 }
812 }
813}
814
815fn fill_stream_ring(
816 heap: &Arc<TableHeap>,
817 schema: SchemaRef,
818 start_bound: &Bound<RecordId>,
819 state: &mut StreamScanState,
820) -> QuillSQLResult<()> {
821 let scheduler = heap.buffer_pool.buffer_pool().disk_scheduler();
822 while state.ring.len() < state.ring.capacity() && !state.prefetch.exhausted {
823 state.prefetch.ensure_ready(&scheduler)?;
824 let Some((pid, bytes)) = state.prefetch.ready.pop_front() else {
825 break;
826 };
827
828 let (page, _) = TablePageCodec::decode(&bytes, schema.clone())?;
829 if page.header.next_page_id != INVALID_PAGE_ID {
830 state.prefetch.pending.push_back(page.header.next_page_id);
831 }
832 state.prefetch.maybe_schedule(&scheduler)?;
833
834 let start_slot = if state.first_page {
835 state.first_page = false;
836 match start_bound {
837 Bound::Included(r) if r.page_id == pid => r.slot_num as usize,
838 Bound::Excluded(r) if r.page_id == pid => r.slot_num as usize + 1,
839 _ => 0,
840 }
841 } else {
842 0
843 };
844
845 for slot in start_slot..page.header.num_tuples as usize {
846 let rid = RecordId::new(pid, slot as u32);
847 let (meta, tuple) = page.tuple(slot as u16)?;
848 state.ring.push((rid, meta, tuple));
849 if state.ring.len() >= state.ring.capacity() {
850 break;
851 }
852 }
853 }
854 Ok(())
855}
856
857#[cfg(test)]
858mod tests {
859
860 use std::sync::Arc;
861 use tempfile::TempDir;
862
863 use crate::buffer::BufferManager;
864 use crate::catalog::{Column, DataType, Schema};
865 use crate::storage::codec::TupleCodec;
866 use crate::storage::disk_manager::DiskManager;
867 use crate::storage::disk_scheduler::DiskScheduler;
868 use crate::storage::page::EMPTY_TUPLE_META;
869 use crate::storage::table_heap::TableIterator;
870 use crate::storage::{table_heap::TableHeap, tuple::Tuple};
871 use crate::utils::scalar::ScalarValue;
872
873 #[test]
874 pub fn test_table_heap_update_tuple_meta() {
875 let temp_dir = TempDir::new().unwrap();
876 let temp_path = temp_dir.path().join("test.db");
877
878 let schema = Arc::new(Schema::new(vec![
879 Column::new("a", DataType::Int8, false),
880 Column::new("b", DataType::Int16, false),
881 ]));
882 let disk_manager = DiskManager::try_new(temp_path).unwrap();
883 let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
884 let buffer_pool = Arc::new(BufferManager::new(1000, disk_scheduler));
885 let table_heap = TableHeap::try_new(schema.clone(), buffer_pool).unwrap();
886
887 let _rid1 = table_heap
888 .insert_tuple(
889 &EMPTY_TUPLE_META,
890 &Tuple::new(schema.clone(), vec![1i8.into(), 1i16.into()]),
891 )
892 .unwrap();
893 let rid2 = table_heap
894 .insert_tuple(
895 &EMPTY_TUPLE_META,
896 &Tuple::new(schema.clone(), vec![2i8.into(), 2i16.into()]),
897 )
898 .unwrap();
899 let _rid3 = table_heap
900 .insert_tuple(
901 &EMPTY_TUPLE_META,
902 &Tuple::new(schema.clone(), vec![3i8.into(), 3i16.into()]),
903 )
904 .unwrap();
905
906 let mut meta = table_heap.tuple_meta(rid2).unwrap();
907 meta.insert_txn_id = 1;
908 meta.mark_deleted(2, 0);
909 meta.is_deleted = true;
910 table_heap.update_tuple_meta(meta, rid2).unwrap();
911
912 let meta = table_heap.tuple_meta(rid2).unwrap();
913 assert_eq!(meta.insert_txn_id, 1);
914 assert_eq!(meta.delete_txn_id, 2);
915 assert_eq!(meta.delete_cid, 0);
916 assert!(meta.is_deleted);
917 }
918
919 #[test]
920 pub fn test_table_heap_insert_tuple() {
921 let temp_dir = TempDir::new().unwrap();
922 let temp_path = temp_dir.path().join("test.db");
923
924 let schema = Arc::new(Schema::new(vec![
925 Column::new("a", DataType::Int8, false),
926 Column::new("b", DataType::Int16, false),
927 ]));
928 let disk_manager = DiskManager::try_new(temp_path).unwrap();
929 let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
930 let buffer_pool = Arc::new(BufferManager::new(1000, disk_scheduler));
931 let table_heap = TableHeap::try_new(schema.clone(), buffer_pool).unwrap();
932
933 let meta1 = super::TupleMeta::new(1, 0);
934 let rid1 = table_heap
935 .insert_tuple(
936 &meta1,
937 &Tuple::new(schema.clone(), vec![1i8.into(), 1i16.into()]),
938 )
939 .unwrap();
940 let meta2 = super::TupleMeta::new(2, 0);
941 let rid2 = table_heap
942 .insert_tuple(
943 &meta2,
944 &Tuple::new(schema.clone(), vec![2i8.into(), 2i16.into()]),
945 )
946 .unwrap();
947 let meta3 = super::TupleMeta::new(3, 0);
948 let rid3 = table_heap
949 .insert_tuple(
950 &meta3,
951 &Tuple::new(schema.clone(), vec![3i8.into(), 3i16.into()]),
952 )
953 .unwrap();
954
955 let (meta, tuple) = table_heap.full_tuple(rid1).unwrap();
956 assert_eq!(meta, meta1);
957 assert_eq!(tuple.data, vec![1i8.into(), 1i16.into()]);
958
959 let (meta, tuple) = table_heap.full_tuple(rid2).unwrap();
960 assert_eq!(meta, meta2);
961 assert_eq!(tuple.data, vec![2i8.into(), 2i16.into()]);
962
963 let (meta, tuple) = table_heap.full_tuple(rid3).unwrap();
964 assert_eq!(meta, meta3);
965 assert_eq!(tuple.data, vec![3i8.into(), 3i16.into()]);
966 }
967
968 #[test]
969 fn mvcc_update_creates_version_chain() {
970 let temp_dir = TempDir::new().unwrap();
971 let temp_path = temp_dir.path().join("mvcc_test.db");
972
973 let schema = Arc::new(Schema::new(vec![
974 Column::new("id", DataType::Int32, false),
975 Column::new("val", DataType::Int32, false),
976 ]));
977 let disk_manager = DiskManager::try_new(temp_path).unwrap();
978 let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
979 let buffer_pool = Arc::new(BufferManager::new(256, disk_scheduler));
980 let table_heap = TableHeap::try_new(schema.clone(), buffer_pool).unwrap();
981
982 let base_tuple = Tuple::new(
983 schema.clone(),
984 vec![ScalarValue::Int32(Some(1)), ScalarValue::Int32(Some(10))],
985 );
986 let rid = table_heap
987 .insert_tuple(&super::TupleMeta::new(1, 0), &base_tuple)
988 .expect("insert base");
989
990 let updated_tuple = Tuple::new(
991 schema.clone(),
992 vec![ScalarValue::Int32(Some(1)), ScalarValue::Int32(Some(20))],
993 );
994 let (new_rid, _) = table_heap
995 .mvcc_update(rid, updated_tuple, 2, 0)
996 .expect("mvcc update");
997
998 let old_meta = table_heap.tuple_meta(rid).expect("old meta");
999 assert!(old_meta.is_deleted);
1000 assert_eq!(old_meta.next_version, Some(new_rid));
1001
1002 let new_meta = table_heap.tuple_meta(new_rid).expect("new meta");
1003 assert_eq!(new_meta.prev_version, Some(rid));
1004 assert!(!new_meta.is_deleted);
1005 }
1006
1007 #[test]
1008 pub fn test_table_heap_iterator() {
1009 let temp_dir = TempDir::new().unwrap();
1010 let temp_path = temp_dir.path().join("test.db");
1011
1012 let schema = Arc::new(Schema::new(vec![
1013 Column::new("a", DataType::Int8, false),
1014 Column::new("b", DataType::Int16, false),
1015 ]));
1016
1017 let disk_manager = DiskManager::try_new(temp_path).unwrap();
1018 let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
1019 let buffer_pool = Arc::new(BufferManager::new(1000, disk_scheduler));
1020 let table_heap = Arc::new(TableHeap::try_new(schema.clone(), buffer_pool).unwrap());
1021
1022 let meta1 = super::TupleMeta::new(1, 0);
1023 let rid1 = table_heap
1024 .insert_tuple(
1025 &meta1,
1026 &Tuple::new(schema.clone(), vec![1i8.into(), 1i16.into()]),
1027 )
1028 .unwrap();
1029 let meta2 = super::TupleMeta::new(2, 0);
1030 let rid2 = table_heap
1031 .insert_tuple(
1032 &meta2,
1033 &Tuple::new(schema.clone(), vec![2i8.into(), 2i16.into()]),
1034 )
1035 .unwrap();
1036 let meta3 = super::TupleMeta::new(3, 0);
1037 let rid3 = table_heap
1038 .insert_tuple(
1039 &meta3,
1040 &Tuple::new(schema.clone(), vec![3i8.into(), 3i16.into()]),
1041 )
1042 .unwrap();
1043
1044 let mut iterator = TableIterator::new(table_heap.clone(), ..);
1045
1046 let (rid, meta, tuple) = iterator.next().unwrap().unwrap();
1047 assert_eq!(rid, rid1);
1048 assert_eq!(meta, meta1);
1049 assert_eq!(tuple.data, vec![1i8.into(), 1i16.into()]);
1050
1051 let (rid, meta, tuple) = iterator.next().unwrap().unwrap();
1052 assert_eq!(rid, rid2);
1053 assert_eq!(meta, meta2);
1054 assert_eq!(tuple.data, vec![2i8.into(), 2i16.into()]);
1055
1056 let (rid, meta, tuple) = iterator.next().unwrap().unwrap();
1057 assert_eq!(rid, rid3);
1058 assert_eq!(meta, meta3);
1059 assert_eq!(tuple.data, vec![3i8.into(), 3i16.into()]);
1060
1061 assert!(iterator.next().unwrap().is_none());
1062 }
1063
1064 #[test]
1065 pub fn test_streaming_seq_scan_ring() {
1066 std::env::set_var("QUILL_STREAM_SCAN", "1");
1068 std::env::set_var("QUILL_STREAM_READAHEAD", "2");
1069
1070 let temp_dir = TempDir::new().unwrap();
1071 let temp_path = temp_dir.path().join("test.db");
1072
1073 let schema = Arc::new(Schema::new(vec![
1074 Column::new("a", DataType::Int8, false),
1075 Column::new("b", DataType::Int16, false),
1076 ]));
1077
1078 let disk_manager = DiskManager::try_new(temp_path).unwrap();
1079 let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
1080 let buffer_pool = Arc::new(BufferManager::new(128, disk_scheduler));
1081 let table_heap = Arc::new(TableHeap::try_new(schema.clone(), buffer_pool).unwrap());
1082
1083 let rows = 1000;
1085 for i in 0..rows {
1086 let _rid = table_heap
1087 .insert_tuple(
1088 &super::TupleMeta::new(1, 0),
1089 &Tuple::new(schema.clone(), vec![(i as i8).into(), (i as i16).into()]),
1090 )
1091 .unwrap();
1092 }
1093
1094 table_heap.buffer_pool.flush_all_pages().unwrap();
1096
1097 let mut it = TableIterator::new(table_heap.clone(), ..);
1099 let mut cnt = 0usize;
1100 while let Some((_rid, _meta, _t)) = it.next().unwrap() {
1101 cnt += 1;
1102 }
1103 assert_eq!(cnt, rows);
1104 }
1105
1106 #[test]
1107 pub fn test_streaming_respects_bounds_and_fallbacks() {
1108 std::env::set_var("QUILL_STREAM_SCAN", "1");
1110 std::env::set_var("QUILL_STREAM_READAHEAD", "2");
1111
1112 let temp_dir = TempDir::new().unwrap();
1113 let temp_path = temp_dir.path().join("test.db");
1114
1115 let schema = Arc::new(Schema::new(vec![
1116 Column::new("a", DataType::Int8, false),
1117 Column::new("b", DataType::Int16, false),
1118 ]));
1119
1120 let disk_manager = DiskManager::try_new(temp_path).unwrap();
1121 let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
1122 let buffer_pool = Arc::new(BufferManager::new(128, disk_scheduler));
1123 let table_heap = Arc::new(TableHeap::try_new(schema.clone(), buffer_pool).unwrap());
1124
1125 let rid1 = table_heap
1126 .insert_tuple(
1127 &super::TupleMeta::new(1, 0),
1128 &Tuple::new(schema.clone(), vec![1i8.into(), 1i16.into()]),
1129 )
1130 .unwrap();
1131 let rid2 = table_heap
1132 .insert_tuple(
1133 &super::TupleMeta::new(2, 0),
1134 &Tuple::new(schema.clone(), vec![2i8.into(), 2i16.into()]),
1135 )
1136 .unwrap();
1137 let rid3 = table_heap
1138 .insert_tuple(
1139 &super::TupleMeta::new(3, 0),
1140 &Tuple::new(schema.clone(), vec![3i8.into(), 3i16.into()]),
1141 )
1142 .unwrap();
1143
1144 let mut it = TableIterator::new_with_hint(table_heap.clone(), rid1..=rid2, Some(true));
1146
1147 let got1 = it.next().unwrap().unwrap();
1148 let got2 = it.next().unwrap().unwrap();
1149 let got3 = it.next().unwrap();
1150
1151 assert_eq!(got1.0, rid1);
1152 assert_eq!(got2.0, rid2);
1153 assert!(got3.is_none());
1154
1155 let (_m, t3) = table_heap.full_tuple(rid3).unwrap();
1157 assert_eq!(t3.data, vec![3i8.into(), 3i16.into()]);
1158 }
1159
1160 #[test]
1161 pub fn test_recover_set_tuple_meta_and_bytes() {
1162 let temp_dir = TempDir::new().unwrap();
1163 let temp_path = temp_dir.path().join("test.db");
1164
1165 let schema = Arc::new(Schema::new(vec![
1166 Column::new("a", DataType::Int8, false),
1167 Column::new("b", DataType::Int16, false),
1168 ]));
1169 let disk_manager = DiskManager::try_new(temp_path).unwrap();
1170 let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
1171 let buffer_pool = Arc::new(BufferManager::new(128, disk_scheduler));
1172 let table_heap = TableHeap::try_new(schema.clone(), buffer_pool.clone()).unwrap();
1173
1174 let rid = table_heap
1176 .insert_tuple(
1177 &EMPTY_TUPLE_META,
1178 &Tuple::new(schema.clone(), vec![1i8.into(), 10i16.into()]),
1179 )
1180 .unwrap();
1181
1182 let new_tuple = Tuple::new(schema.clone(), vec![2i8.into(), 20i16.into()]);
1184 let new_bytes = TupleCodec::encode(&new_tuple);
1185 table_heap
1186 .recover_set_tuple_bytes(rid, &new_bytes)
1187 .expect("recover bytes");
1188
1189 let (_m, t) = table_heap.full_tuple(rid).unwrap();
1191 assert_eq!(t.data, vec![2i8.into(), 20i16.into()]);
1192
1193 let mut meta = table_heap.tuple_meta(rid).unwrap();
1195 meta.is_deleted = true;
1196 table_heap
1197 .recover_set_tuple_meta(rid, meta)
1198 .expect("recover meta");
1199 let m2 = table_heap.tuple_meta(rid).unwrap();
1200 assert!(m2.is_deleted);
1201 }
1202
1203 #[test]
1204 pub fn test_recover_repack_on_size_mismatch() {
1205 let temp_dir = TempDir::new().unwrap();
1206 let temp_path = temp_dir.path().join("test.db");
1207
1208 let schema = Arc::new(Schema::new(vec![
1209 Column::new("a", DataType::Int8, false),
1210 Column::new("b", DataType::Int16, false),
1211 ]));
1212 let disk_manager = DiskManager::try_new(temp_path).unwrap();
1213 let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
1214 let buffer_pool = Arc::new(BufferManager::new(128, disk_scheduler));
1215 let table_heap = TableHeap::try_new(schema.clone(), buffer_pool.clone()).unwrap();
1216
1217 let rid = table_heap
1218 .insert_tuple(
1219 &EMPTY_TUPLE_META,
1220 &Tuple::new(schema.clone(), vec![1i8.into(), 10i16.into()]),
1221 )
1222 .unwrap();
1223
1224 let larger_tuple = Tuple::new(schema.clone(), vec![99i8.into(), 300i16.into()]);
1226 let larger_bytes = TupleCodec::encode(&larger_tuple);
1227 table_heap
1228 .recover_set_tuple_bytes(rid, &larger_bytes)
1229 .expect("recover larger bytes");
1230
1231 let (_m, t2) = table_heap.full_tuple(rid).unwrap();
1232 assert_eq!(t2.data, vec![99i8.into(), 300i16.into()]);
1233 }
1234
1235 #[test]
1236 fn vacuum_slot_if_reclaims_tuple() {
1237 let temp_dir = TempDir::new().unwrap();
1238 let temp_path = temp_dir.path().join("vacuum.db");
1239 let schema = Arc::new(Schema::new(vec![Column::new("v", DataType::Int32, false)]));
1240 let disk_manager = DiskManager::try_new(temp_path).unwrap();
1241 let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
1242 let buffer_pool = Arc::new(BufferManager::new(32, disk_scheduler));
1243 let heap = TableHeap::try_new(schema.clone(), buffer_pool).unwrap();
1244
1245 let meta = super::TupleMeta::new(1, 0);
1246 let tuple = Tuple::new(schema.clone(), vec![ScalarValue::Int32(Some(5))]);
1247 let rid = heap.insert_tuple(&meta, &tuple).unwrap();
1248
1249 assert!(heap.full_tuple(rid).is_ok());
1250 assert!(heap.vacuum_slot_if(rid, |_| true).unwrap());
1251 assert!(heap.full_tuple(rid).is_err());
1252 assert!(heap.get_first_rid().unwrap().is_none());
1253 }
1254
1255 #[test]
1256 fn vacuum_slot_if_respects_predicate() {
1257 let temp_dir = TempDir::new().unwrap();
1258 let temp_path = temp_dir.path().join("vacuum_predicate.db");
1259 let schema = Arc::new(Schema::new(vec![Column::new("v", DataType::Int32, false)]));
1260 let disk_manager = DiskManager::try_new(temp_path).unwrap();
1261 let disk_scheduler = Arc::new(DiskScheduler::new(Arc::new(disk_manager)));
1262 let buffer_pool = Arc::new(BufferManager::new(32, disk_scheduler));
1263 let heap = TableHeap::try_new(schema.clone(), buffer_pool).unwrap();
1264
1265 let meta = super::TupleMeta::new(42, 0);
1266 let tuple = Tuple::new(schema.clone(), vec![ScalarValue::Int32(Some(9))]);
1267 let rid = heap.insert_tuple(&meta, &tuple).unwrap();
1268
1269 assert!(!heap
1270 .vacuum_slot_if(rid, |current| current.insert_txn_id == 0)
1271 .unwrap());
1272 let (meta_after, tuple_after) = heap.full_tuple(rid).unwrap();
1273 assert_eq!(meta_after.insert_txn_id, 42);
1274 assert_eq!(tuple_after, tuple);
1275 }
1276}