quill_sql/storage/
engine.rs

1use std::fmt;
2use std::ops::Bound;
3use std::sync::Arc;
4
5use crate::{
6    catalog::{Catalog, SchemaRef},
7    error::QuillSQLResult,
8    storage::{
9        index::btree_index::{BPlusTreeIndex, TreeIndexIterator},
10        mvcc_heap::MvccHeap,
11        page::{RecordId, TupleMeta},
12        table_heap::{TableHeap, TableIterator},
13        tuple::Tuple,
14    },
15    transaction::TxnContext,
16    utils::table_ref::TableReference,
17};
18
19#[derive(Debug, Clone)]
20pub struct IndexScanRequest {
21    pub start: Bound<Tuple>,
22    pub end: Bound<Tuple>,
23}
24
25impl IndexScanRequest {
26    pub fn new(start: Bound<Tuple>, end: Bound<Tuple>) -> Self {
27        Self { start, end }
28    }
29}
30
31pub trait TupleStream {
32    fn next(&mut self) -> QuillSQLResult<Option<(RecordId, TupleMeta, Tuple)>>;
33}
34
35pub trait TableHandle: Send + Sync {
36    fn table_ref(&self) -> &TableReference;
37    fn schema(&self) -> SchemaRef;
38    fn table_heap(&self) -> Arc<TableHeap>;
39    fn full_scan(&self) -> QuillSQLResult<Box<dyn TupleStream>>;
40
41    fn insert(
42        &self,
43        txn: &mut TxnContext<'_>,
44        tuple: &Tuple,
45        indexes: &[Arc<dyn IndexHandle>],
46    ) -> QuillSQLResult<()>;
47
48    fn delete(
49        &self,
50        txn: &mut TxnContext<'_>,
51        rid: RecordId,
52        prev_meta: TupleMeta,
53        prev_tuple: Tuple,
54        indexes: &[Arc<dyn IndexHandle>],
55    ) -> QuillSQLResult<()>;
56
57    fn update(
58        &self,
59        txn: &mut TxnContext<'_>,
60        rid: RecordId,
61        new_tuple: Tuple,
62        prev_meta: TupleMeta,
63        prev_tuple: Tuple,
64        indexes: &[Arc<dyn IndexHandle>],
65    ) -> QuillSQLResult<RecordId>;
66
67    fn prepare_row_for_write(
68        &self,
69        txn: &mut TxnContext<'_>,
70        rid: RecordId,
71        observed_meta: &TupleMeta,
72    ) -> QuillSQLResult<Option<(TupleMeta, Tuple)>>;
73}
74
75pub trait IndexHandle: Send + Sync {
76    fn name(&self) -> &str;
77    fn key_schema(&self) -> SchemaRef;
78    fn index(&self) -> Arc<BPlusTreeIndex>;
79    fn range_scan(
80        &self,
81        table: Arc<dyn TableHandle>,
82        request: IndexScanRequest,
83    ) -> QuillSQLResult<Box<dyn TupleStream>>;
84}
85
86pub trait StorageEngine: Send + Sync {
87    fn table(&self, catalog: &Catalog, table: &TableReference) -> QuillSQLResult<TableBinding>;
88}
89
90#[derive(Default)]
91pub struct DefaultStorageEngine;
92
93#[derive(Clone)]
94pub struct TableBinding {
95    table: Arc<dyn TableHandle>,
96    indexes: Arc<Vec<Arc<dyn IndexHandle>>>,
97}
98
99impl TableBinding {
100    fn new(table: Arc<dyn TableHandle>, indexes: Vec<Arc<dyn IndexHandle>>) -> Self {
101        Self {
102            table,
103            indexes: Arc::new(indexes),
104        }
105    }
106
107    pub fn table(&self) -> Arc<dyn TableHandle> {
108        self.table.clone()
109    }
110
111    pub fn table_heap(&self) -> Arc<TableHeap> {
112        self.table.table_heap()
113    }
114
115    pub fn indexes(&self) -> &[Arc<dyn IndexHandle>] {
116        self.indexes.as_ref()
117    }
118
119    pub fn scan(&self) -> QuillSQLResult<Box<dyn TupleStream>> {
120        self.table.full_scan()
121    }
122
123    pub fn insert(&self, txn: &mut TxnContext<'_>, tuple: &Tuple) -> QuillSQLResult<()> {
124        self.table.insert(txn, tuple, self.indexes())
125    }
126
127    pub fn delete(
128        &self,
129        txn: &mut TxnContext<'_>,
130        rid: RecordId,
131        prev_meta: TupleMeta,
132        prev_tuple: Tuple,
133    ) -> QuillSQLResult<()> {
134        self.table
135            .delete(txn, rid, prev_meta, prev_tuple, self.indexes())
136    }
137
138    pub fn update(
139        &self,
140        txn: &mut TxnContext<'_>,
141        rid: RecordId,
142        new_tuple: Tuple,
143        prev_meta: TupleMeta,
144        prev_tuple: Tuple,
145    ) -> QuillSQLResult<RecordId> {
146        self.table
147            .update(txn, rid, new_tuple, prev_meta, prev_tuple, self.indexes())
148    }
149
150    pub fn prepare_row_for_write(
151        &self,
152        txn: &mut TxnContext<'_>,
153        rid: RecordId,
154        observed_meta: &TupleMeta,
155    ) -> QuillSQLResult<Option<(TupleMeta, Tuple)>> {
156        self.table.prepare_row_for_write(txn, rid, observed_meta)
157    }
158
159    pub fn index_scan(
160        &self,
161        name: &str,
162        request: IndexScanRequest,
163    ) -> QuillSQLResult<Box<dyn TupleStream>> {
164        let handle = self
165            .indexes()
166            .iter()
167            .find(|idx| idx.name() == name)
168            .ok_or_else(|| {
169                crate::error::QuillSQLError::Execution(format!("index {} not found", name))
170            })?;
171        handle.range_scan(self.table(), request)
172    }
173}
174
175impl fmt::Debug for TableBinding {
176    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
177        f.debug_struct("TableBinding")
178            .field("table", &self.table.table_ref())
179            .field("index_count", &self.indexes.len())
180            .finish()
181    }
182}
183
184struct HeapTableHandle {
185    table_ref: TableReference,
186    heap: Arc<TableHeap>,
187}
188
189impl HeapTableHandle {
190    fn new(table_ref: TableReference, heap: Arc<TableHeap>) -> Self {
191        Self { table_ref, heap }
192    }
193}
194
195impl TableHandle for HeapTableHandle {
196    fn table_ref(&self) -> &TableReference {
197        &self.table_ref
198    }
199
200    fn schema(&self) -> SchemaRef {
201        self.heap.schema.clone()
202    }
203
204    fn table_heap(&self) -> Arc<TableHeap> {
205        self.heap.clone()
206    }
207
208    fn full_scan(&self) -> QuillSQLResult<Box<dyn TupleStream>> {
209        let iterator = TableIterator::new(self.heap.clone(), ..);
210        Ok(Box::new(HeapTableStream { iterator }))
211    }
212
213    fn insert(
214        &self,
215        txn: &mut TxnContext<'_>,
216        tuple: &Tuple,
217        indexes: &[Arc<dyn IndexHandle>],
218    ) -> QuillSQLResult<()> {
219        txn.ensure_writable(self.table_ref(), "INSERT")?;
220        let mvcc = MvccHeap::new(self.heap.clone());
221        let (rid, _) = mvcc.insert(tuple, txn.txn_id(), txn.command_id())?;
222
223        let mut index_links = Vec::new();
224        for handle in indexes {
225            if let Ok(key_tuple) = tuple.project_with_schema(handle.key_schema()) {
226                let index = handle.index();
227                index.insert_with_txn(&key_tuple, rid, txn.txn_id())?;
228                index_links.push((index, key_tuple));
229            }
230        }
231
232        txn.transaction_mut()
233            .push_insert_undo(self.heap.clone(), rid, index_links);
234        Ok(())
235    }
236
237    fn delete(
238        &self,
239        txn: &mut TxnContext<'_>,
240        rid: RecordId,
241        _prev_meta: TupleMeta,
242        prev_tuple: Tuple,
243        indexes: &[Arc<dyn IndexHandle>],
244    ) -> QuillSQLResult<()> {
245        txn.ensure_writable(self.table_ref(), "DELETE")?;
246        let mvcc = MvccHeap::new(self.heap.clone());
247        let prev_meta = mvcc.mark_deleted(rid, txn.txn_id(), txn.command_id())?;
248
249        let mut index_links = Vec::new();
250        for handle in indexes {
251            if let Ok(key_tuple) = prev_tuple.project_with_schema(handle.key_schema()) {
252                let index = handle.index();
253                index.delete_with_txn(&key_tuple, txn.txn_id())?;
254                index_links.push((index, key_tuple));
255            }
256        }
257
258        txn.transaction_mut().push_delete_undo(
259            self.heap.clone(),
260            rid,
261            prev_meta,
262            prev_tuple,
263            index_links,
264        );
265        Ok(())
266    }
267
268    fn update(
269        &self,
270        txn: &mut TxnContext<'_>,
271        rid: RecordId,
272        new_tuple: Tuple,
273        prev_meta: TupleMeta,
274        prev_tuple: Tuple,
275        indexes: &[Arc<dyn IndexHandle>],
276    ) -> QuillSQLResult<RecordId> {
277        txn.ensure_writable(self.table_ref(), "UPDATE")?;
278        let mvcc = MvccHeap::new(self.heap.clone());
279        let (new_rid, _) = mvcc.update(rid, new_tuple.clone(), txn.txn_id(), txn.command_id())?;
280
281        let mut old_keys = Vec::new();
282        for handle in indexes {
283            if let Ok(old_key_tuple) = prev_tuple.project_with_schema(handle.key_schema()) {
284                let index = handle.index();
285                index.delete_with_txn(&old_key_tuple, txn.txn_id())?;
286                old_keys.push((index, old_key_tuple));
287            }
288        }
289
290        let mut new_keys = Vec::new();
291        for handle in indexes {
292            if let Ok(new_key_tuple) = new_tuple.project_with_schema(handle.key_schema()) {
293                let index = handle.index();
294                index.insert_with_txn(&new_key_tuple, new_rid, txn.txn_id())?;
295                new_keys.push((index, new_key_tuple));
296            }
297        }
298
299        txn.transaction_mut().push_update_undo(
300            self.heap.clone(),
301            rid,
302            new_rid,
303            prev_meta,
304            prev_tuple,
305            new_keys,
306            old_keys,
307        );
308        Ok(new_rid)
309    }
310
311    fn prepare_row_for_write(
312        &self,
313        txn: &mut TxnContext<'_>,
314        rid: RecordId,
315        observed_meta: &TupleMeta,
316    ) -> QuillSQLResult<Option<(TupleMeta, Tuple)>> {
317        if !txn.is_visible(observed_meta) {
318            return Ok(None);
319        }
320        txn.lock_row_exclusive(self.table_ref(), rid)?;
321        let mvcc = MvccHeap::new(self.heap.clone());
322        let (current_meta, current_tuple) = mvcc.full_tuple(rid)?;
323        if !txn.is_visible(&current_meta) {
324            txn.unlock_row(self.table_ref(), rid);
325            return Ok(None);
326        }
327        Ok(Some((current_meta, current_tuple)))
328    }
329}
330
331struct HeapTableStream {
332    iterator: TableIterator,
333}
334
335impl TupleStream for HeapTableStream {
336    fn next(&mut self) -> QuillSQLResult<Option<(RecordId, TupleMeta, Tuple)>> {
337        self.iterator.next()
338    }
339}
340
341struct BTreeIndexHandle {
342    name: String,
343    index: Arc<BPlusTreeIndex>,
344}
345
346impl BTreeIndexHandle {
347    fn new(name: String, index: Arc<BPlusTreeIndex>) -> Self {
348        Self { name, index }
349    }
350}
351
352impl IndexHandle for BTreeIndexHandle {
353    fn name(&self) -> &str {
354        &self.name
355    }
356
357    fn key_schema(&self) -> SchemaRef {
358        self.index.key_schema.clone()
359    }
360
361    fn index(&self) -> Arc<BPlusTreeIndex> {
362        self.index.clone()
363    }
364
365    fn range_scan(
366        &self,
367        table: Arc<dyn TableHandle>,
368        request: IndexScanRequest,
369    ) -> QuillSQLResult<Box<dyn TupleStream>> {
370        let iterator = TreeIndexIterator::new(self.index.clone(), (request.start, request.end));
371        Ok(Box::new(BTreeIndexStream { iterator, table }))
372    }
373}
374
375struct BTreeIndexStream {
376    iterator: TreeIndexIterator,
377    table: Arc<dyn TableHandle>,
378}
379
380impl TupleStream for BTreeIndexStream {
381    fn next(&mut self) -> QuillSQLResult<Option<(RecordId, TupleMeta, Tuple)>> {
382        loop {
383            let Some(rid) = self.iterator.next()? else {
384                return Ok(None);
385            };
386            if let Ok((meta, tuple)) = self.table.table_heap().full_tuple(rid) {
387                return Ok(Some((rid, meta, tuple)));
388            }
389        }
390    }
391}
392
393impl StorageEngine for DefaultStorageEngine {
394    fn table(&self, catalog: &Catalog, table: &TableReference) -> QuillSQLResult<TableBinding> {
395        let heap = catalog.table_heap(table)?;
396        let handle: Arc<dyn TableHandle> = Arc::new(HeapTableHandle::new(table.clone(), heap));
397        let indexes = catalog.table_indexes(table)?;
398        let index_handles = indexes
399            .into_iter()
400            .map(|(name, index)| {
401                Arc::new(BTreeIndexHandle::new(name, index)) as Arc<dyn IndexHandle>
402            })
403            .collect();
404        Ok(TableBinding::new(handle, index_handles))
405    }
406}