quill_sql/execution/physical_plan/
seq_scan.rs

1//! Table sequential scan operator (full-table read with MVCC filtering).
2
3use std::cell::RefCell;
4use std::sync::OnceLock;
5
6use super::scan::ScanPrefetch;
7use crate::catalog::SchemaRef;
8use crate::execution::physical_plan::{resolve_table_binding, stream_not_ready};
9use crate::storage::{
10    engine::{TableBinding, TupleStream},
11    page::{RecordId, TupleMeta},
12};
13use crate::transaction::LockMode;
14use crate::utils::table_ref::TableReference;
15use crate::{
16    error::QuillSQLResult,
17    execution::{ExecutionContext, VolcanoExecutor},
18    storage::tuple::Tuple,
19};
20
21const PREFETCH_BATCH: usize = 64;
22
23pub struct PhysicalSeqScan {
24    pub table: TableReference,
25    pub table_schema: SchemaRef,
26
27    iterator: RefCell<Option<Box<dyn TupleStream>>>,
28    prefetch: ScanPrefetch,
29    table_binding: OnceLock<TableBinding>,
30}
31
32impl PhysicalSeqScan {
33    pub fn new(table: TableReference, table_schema: SchemaRef) -> Self {
34        PhysicalSeqScan {
35            table,
36            table_schema,
37            iterator: RefCell::new(None),
38            prefetch: ScanPrefetch::new(PREFETCH_BATCH),
39            table_binding: OnceLock::new(),
40        }
41    }
42
43    fn consume_row(
44        &self,
45        context: &mut ExecutionContext,
46        rid: RecordId,
47        meta: TupleMeta,
48        tuple: Tuple,
49    ) -> QuillSQLResult<Option<Tuple>> {
50        context
51            .txn_ctx_mut()
52            .read_visible_tuple(&self.table, rid, &meta, tuple)
53    }
54}
55
56impl VolcanoExecutor for PhysicalSeqScan {
57    fn init(&self, context: &mut ExecutionContext) -> QuillSQLResult<()> {
58        context
59            .txn_ctx_mut()
60            .lock_table(self.table.clone(), LockMode::IntentionShared)?;
61        let binding = resolve_table_binding(&self.table_binding, context, &self.table)?;
62        let stream = binding.scan()?;
63        self.iterator.replace(Some(stream));
64        self.prefetch.clear();
65        Ok(())
66    }
67
68    fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>> {
69        loop {
70            if let Some((rid, meta, tuple)) = self.prefetch.pop_front() {
71                if let Some(result) = self.consume_row(context, rid, meta, tuple)? {
72                    return Ok(Some(result));
73                }
74                continue;
75            }
76
77            if !self.prefetch.refill(|limit, out| {
78                let mut guard = self.iterator.borrow_mut();
79                let stream = guard.as_mut().ok_or_else(|| stream_not_ready("SeqScan"))?;
80                for _ in 0..limit {
81                    match stream.next()? {
82                        Some(entry) => out.push_back(entry),
83                        None => break,
84                    }
85                }
86                Ok(())
87            })? {
88                return Ok(None);
89            }
90        }
91    }
92
93    fn output_schema(&self) -> SchemaRef {
94        self.table_schema.clone()
95    }
96}
97
98impl std::fmt::Display for PhysicalSeqScan {
99    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100        write!(f, "SeqScan")
101    }
102}
103
104impl std::fmt::Debug for PhysicalSeqScan {
105    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106        f.debug_struct("PhysicalSeqScan")
107            .field("table", &self.table)
108            .field("table_schema", &self.table_schema)
109            .finish()
110    }
111}