quill_sql/execution/physical_plan/
seq_scan.rs1use std::collections::VecDeque;
2
3use parking_lot::Mutex;
4
5use crate::catalog::SchemaRef;
6use crate::error::QuillSQLError;
7use crate::storage::page::{RecordId, TupleMeta};
8use crate::storage::table_heap::TableIterator;
9use crate::transaction::LockMode;
10use crate::utils::table_ref::TableReference;
11use crate::{
12 error::QuillSQLResult,
13 execution::{ExecutionContext, VolcanoExecutor},
14 storage::tuple::Tuple,
15};
16
17const PREFETCH_BATCH: usize = 64;
18
19#[derive(Debug)]
20pub struct PhysicalSeqScan {
21 pub table: TableReference,
22 pub table_schema: SchemaRef,
23 pub streaming_hint: Option<bool>,
24
25 iterator: Mutex<Option<TableIterator>>,
26 prefetch: Mutex<VecDeque<(RecordId, TupleMeta, Tuple)>>,
27}
28
29impl PhysicalSeqScan {
30 pub fn new(table: TableReference, table_schema: SchemaRef) -> Self {
31 PhysicalSeqScan {
32 table,
33 table_schema,
34 streaming_hint: None,
35 iterator: Mutex::new(None),
36 prefetch: Mutex::new(VecDeque::new()),
37 }
38 }
39
40 fn refill_buffer(&self) -> QuillSQLResult<bool> {
41 let mut fetched = VecDeque::with_capacity(PREFETCH_BATCH);
42 {
43 let mut guard = self.iterator.lock();
44 let iterator = guard.as_mut().ok_or_else(|| {
45 QuillSQLError::Execution("table iterator not created".to_string())
46 })?;
47 for _ in 0..PREFETCH_BATCH {
48 match iterator.next()? {
49 Some(entry) => fetched.push_back(entry),
50 None => break,
51 }
52 }
53 }
54 if fetched.is_empty() {
55 return Ok(false);
56 }
57 let mut buffer = self.prefetch.lock();
58 buffer.extend(fetched);
59 Ok(true)
60 }
61
62 fn consume_row(
63 &self,
64 context: &mut ExecutionContext,
65 rid: RecordId,
66 meta: TupleMeta,
67 tuple: Tuple,
68 ) -> QuillSQLResult<Option<Tuple>> {
69 context.read_visible_tuple(&self.table, rid, &meta, tuple)
70 }
71}
72
73impl VolcanoExecutor for PhysicalSeqScan {
74 fn init(&self, context: &mut ExecutionContext) -> QuillSQLResult<()> {
75 context.lock_table(self.table.clone(), LockMode::IntentionShared)?;
76 let table_heap = context.table_heap(&self.table)?;
77 let iter = if let Some(h) = self.streaming_hint {
78 TableIterator::new_with_hint(table_heap, .., Some(h))
79 } else {
80 TableIterator::new(table_heap, ..)
81 };
82 {
83 let mut guard = self.iterator.lock();
84 *guard = Some(iter);
85 }
86 self.prefetch.lock().clear();
87 Ok(())
88 }
89
90 fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>> {
91 loop {
92 if let Some((rid, meta, tuple)) = {
93 let mut buffer = self.prefetch.lock();
94 buffer.pop_front()
95 } {
96 if let Some(result) = self.consume_row(context, rid, meta, tuple)? {
97 return Ok(Some(result));
98 }
99 continue;
100 }
101
102 if !self.refill_buffer()? {
103 return Ok(None);
104 }
105 }
106 }
107
108 fn output_schema(&self) -> SchemaRef {
109 self.table_schema.clone()
110 }
111}
112
113impl std::fmt::Display for PhysicalSeqScan {
114 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115 write!(f, "SeqScan")
116 }
117}