quill_sql/execution/physical_plan/
seq_scan.rs

1use std::collections::VecDeque;
2
3use parking_lot::Mutex;
4
5use crate::catalog::SchemaRef;
6use crate::error::QuillSQLError;
7use crate::storage::page::{RecordId, TupleMeta};
8use crate::storage::table_heap::TableIterator;
9use crate::transaction::LockMode;
10use crate::utils::table_ref::TableReference;
11use crate::{
12    error::QuillSQLResult,
13    execution::{ExecutionContext, VolcanoExecutor},
14    storage::tuple::Tuple,
15};
16
17const PREFETCH_BATCH: usize = 64;
18
19#[derive(Debug)]
20pub struct PhysicalSeqScan {
21    pub table: TableReference,
22    pub table_schema: SchemaRef,
23    pub streaming_hint: Option<bool>,
24
25    iterator: Mutex<Option<TableIterator>>,
26    prefetch: Mutex<VecDeque<(RecordId, TupleMeta, Tuple)>>,
27}
28
29impl PhysicalSeqScan {
30    pub fn new(table: TableReference, table_schema: SchemaRef) -> Self {
31        PhysicalSeqScan {
32            table,
33            table_schema,
34            streaming_hint: None,
35            iterator: Mutex::new(None),
36            prefetch: Mutex::new(VecDeque::new()),
37        }
38    }
39
40    fn refill_buffer(&self) -> QuillSQLResult<bool> {
41        let mut fetched = VecDeque::with_capacity(PREFETCH_BATCH);
42        {
43            let mut guard = self.iterator.lock();
44            let iterator = guard.as_mut().ok_or_else(|| {
45                QuillSQLError::Execution("table iterator not created".to_string())
46            })?;
47            for _ in 0..PREFETCH_BATCH {
48                match iterator.next()? {
49                    Some(entry) => fetched.push_back(entry),
50                    None => break,
51                }
52            }
53        }
54        if fetched.is_empty() {
55            return Ok(false);
56        }
57        let mut buffer = self.prefetch.lock();
58        buffer.extend(fetched);
59        Ok(true)
60    }
61
62    fn consume_row(
63        &self,
64        context: &mut ExecutionContext,
65        rid: RecordId,
66        meta: TupleMeta,
67        tuple: Tuple,
68    ) -> QuillSQLResult<Option<Tuple>> {
69        context.read_visible_tuple(&self.table, rid, &meta, tuple)
70    }
71}
72
73impl VolcanoExecutor for PhysicalSeqScan {
74    fn init(&self, context: &mut ExecutionContext) -> QuillSQLResult<()> {
75        context.lock_table(self.table.clone(), LockMode::IntentionShared)?;
76        let table_heap = context.table_heap(&self.table)?;
77        let iter = if let Some(h) = self.streaming_hint {
78            TableIterator::new_with_hint(table_heap, .., Some(h))
79        } else {
80            TableIterator::new(table_heap, ..)
81        };
82        {
83            let mut guard = self.iterator.lock();
84            *guard = Some(iter);
85        }
86        self.prefetch.lock().clear();
87        Ok(())
88    }
89
90    fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>> {
91        loop {
92            if let Some((rid, meta, tuple)) = {
93                let mut buffer = self.prefetch.lock();
94                buffer.pop_front()
95            } {
96                if let Some(result) = self.consume_row(context, rid, meta, tuple)? {
97                    return Ok(Some(result));
98                }
99                continue;
100            }
101
102            if !self.refill_buffer()? {
103                return Ok(None);
104            }
105        }
106    }
107
108    fn output_schema(&self) -> SchemaRef {
109        self.table_schema.clone()
110    }
111}
112
113impl std::fmt::Display for PhysicalSeqScan {
114    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115        write!(f, "SeqScan")
116    }
117}