quill_sql/execution/physical_plan/
seq_scan.rs1use std::cell::RefCell;
4use std::sync::OnceLock;
5
6use super::scan::ScanPrefetch;
7use crate::catalog::SchemaRef;
8use crate::execution::physical_plan::{resolve_table_binding, stream_not_ready};
9use crate::storage::{
10 engine::{TableBinding, TupleStream},
11 record::{RecordId, TupleMeta},
12};
13use crate::transaction::LockMode;
14use crate::utils::table_ref::TableReference;
15use crate::{
16 error::QuillSQLResult,
17 execution::{ExecutionContext, VolcanoExecutor},
18 storage::tuple::Tuple,
19};
20
21const PREFETCH_BATCH: usize = 64;
22
23pub struct PhysicalSeqScan {
24 pub table: TableReference,
25 pub table_schema: SchemaRef,
26
27 iterator: RefCell<Option<Box<dyn TupleStream>>>,
28 prefetch: ScanPrefetch,
29 table_binding: OnceLock<TableBinding>,
30}
31
32impl PhysicalSeqScan {
33 pub fn new(table: TableReference, table_schema: SchemaRef) -> Self {
34 PhysicalSeqScan {
35 table,
36 table_schema,
37 iterator: RefCell::new(None),
38 prefetch: ScanPrefetch::new(PREFETCH_BATCH),
39 table_binding: OnceLock::new(),
40 }
41 }
42
43 fn consume_row(
44 &self,
45 context: &mut ExecutionContext,
46 rid: RecordId,
47 meta: TupleMeta,
48 tuple: Tuple,
49 ) -> QuillSQLResult<Option<Tuple>> {
50 context
51 .txn_ctx_mut()
52 .read_visible_tuple(&self.table, rid, &meta, tuple)
53 }
54}
55
56impl VolcanoExecutor for PhysicalSeqScan {
57 fn init(&self, context: &mut ExecutionContext) -> QuillSQLResult<()> {
58 context
59 .txn_ctx_mut()
60 .lock_table(self.table.clone(), LockMode::IntentionShared)?;
61 let binding = resolve_table_binding(&self.table_binding, context, &self.table)?;
62 let stream = binding.scan()?;
63 self.iterator.replace(Some(stream));
64 self.prefetch.clear();
65 Ok(())
66 }
67
68 fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>> {
69 loop {
70 if let Some((rid, meta, tuple)) = self.prefetch.pop_front() {
71 if let Some(result) = self.consume_row(context, rid, meta, tuple)? {
72 return Ok(Some(result));
73 }
74 continue;
75 }
76
77 if !self.prefetch.refill(|limit, out| {
78 let mut guard = self.iterator.borrow_mut();
79 let stream = guard.as_mut().ok_or_else(|| stream_not_ready("SeqScan"))?;
80 for _ in 0..limit {
81 match stream.next()? {
82 Some(entry) => out.push_back(entry),
83 None => break,
84 }
85 }
86 Ok(())
87 })? {
88 return Ok(None);
89 }
90 }
91 }
92
93 fn output_schema(&self) -> SchemaRef {
94 self.table_schema.clone()
95 }
96}
97
98impl std::fmt::Display for PhysicalSeqScan {
99 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100 write!(f, "SeqScan")
101 }
102}
103
104impl std::fmt::Debug for PhysicalSeqScan {
105 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106 f.debug_struct("PhysicalSeqScan")
107 .field("table", &self.table)
108 .field("table_schema", &self.table_schema)
109 .finish()
110 }
111}