1use std::fmt;
2use std::ops::Bound;
3use std::sync::Arc;
4
5use crate::{
6 catalog::{Catalog, SchemaRef},
7 error::QuillSQLResult,
8 storage::{
9 index::btree_index::{BPlusTreeIndex, TreeIndexIterator},
10 mvcc_heap::MvccHeap,
11 page::{RecordId, TupleMeta},
12 table_heap::{TableHeap, TableIterator},
13 tuple::Tuple,
14 },
15 transaction::TxnContext,
16 utils::table_ref::TableReference,
17};
18
19#[derive(Debug, Clone)]
20pub struct IndexScanRequest {
21 pub start: Bound<Tuple>,
22 pub end: Bound<Tuple>,
23}
24
25impl IndexScanRequest {
26 pub fn new(start: Bound<Tuple>, end: Bound<Tuple>) -> Self {
27 Self { start, end }
28 }
29}
30
31pub trait TupleStream {
32 fn next(&mut self) -> QuillSQLResult<Option<(RecordId, TupleMeta, Tuple)>>;
33}
34
35pub trait TableHandle: Send + Sync {
36 fn table_ref(&self) -> &TableReference;
37 fn schema(&self) -> SchemaRef;
38 fn table_heap(&self) -> Arc<TableHeap>;
39 fn full_scan(&self) -> QuillSQLResult<Box<dyn TupleStream>>;
40
41 fn insert(
42 &self,
43 txn: &mut TxnContext<'_>,
44 tuple: &Tuple,
45 indexes: &[Arc<dyn IndexHandle>],
46 ) -> QuillSQLResult<()>;
47
48 fn delete(
49 &self,
50 txn: &mut TxnContext<'_>,
51 rid: RecordId,
52 prev_meta: TupleMeta,
53 prev_tuple: Tuple,
54 indexes: &[Arc<dyn IndexHandle>],
55 ) -> QuillSQLResult<()>;
56
57 fn update(
58 &self,
59 txn: &mut TxnContext<'_>,
60 rid: RecordId,
61 new_tuple: Tuple,
62 prev_meta: TupleMeta,
63 prev_tuple: Tuple,
64 indexes: &[Arc<dyn IndexHandle>],
65 ) -> QuillSQLResult<RecordId>;
66
67 fn prepare_row_for_write(
68 &self,
69 txn: &mut TxnContext<'_>,
70 rid: RecordId,
71 observed_meta: &TupleMeta,
72 ) -> QuillSQLResult<Option<(TupleMeta, Tuple)>>;
73}
74
75pub trait IndexHandle: Send + Sync {
76 fn name(&self) -> &str;
77 fn key_schema(&self) -> SchemaRef;
78 fn index(&self) -> Arc<BPlusTreeIndex>;
79 fn range_scan(
80 &self,
81 table: Arc<dyn TableHandle>,
82 request: IndexScanRequest,
83 ) -> QuillSQLResult<Box<dyn TupleStream>>;
84}
85
86pub trait StorageEngine: Send + Sync {
87 fn table(&self, catalog: &Catalog, table: &TableReference) -> QuillSQLResult<TableBinding>;
88}
89
90#[derive(Default)]
91pub struct DefaultStorageEngine;
92
93#[derive(Clone)]
94pub struct TableBinding {
95 table: Arc<dyn TableHandle>,
96 indexes: Arc<Vec<Arc<dyn IndexHandle>>>,
97}
98
99impl TableBinding {
100 fn new(table: Arc<dyn TableHandle>, indexes: Vec<Arc<dyn IndexHandle>>) -> Self {
101 Self {
102 table,
103 indexes: Arc::new(indexes),
104 }
105 }
106
107 pub fn table(&self) -> Arc<dyn TableHandle> {
108 self.table.clone()
109 }
110
111 pub fn table_heap(&self) -> Arc<TableHeap> {
112 self.table.table_heap()
113 }
114
115 pub fn indexes(&self) -> &[Arc<dyn IndexHandle>] {
116 self.indexes.as_ref()
117 }
118
119 pub fn scan(&self) -> QuillSQLResult<Box<dyn TupleStream>> {
120 self.table.full_scan()
121 }
122
123 pub fn insert(&self, txn: &mut TxnContext<'_>, tuple: &Tuple) -> QuillSQLResult<()> {
124 self.table.insert(txn, tuple, self.indexes())
125 }
126
127 pub fn delete(
128 &self,
129 txn: &mut TxnContext<'_>,
130 rid: RecordId,
131 prev_meta: TupleMeta,
132 prev_tuple: Tuple,
133 ) -> QuillSQLResult<()> {
134 self.table
135 .delete(txn, rid, prev_meta, prev_tuple, self.indexes())
136 }
137
138 pub fn update(
139 &self,
140 txn: &mut TxnContext<'_>,
141 rid: RecordId,
142 new_tuple: Tuple,
143 prev_meta: TupleMeta,
144 prev_tuple: Tuple,
145 ) -> QuillSQLResult<RecordId> {
146 self.table
147 .update(txn, rid, new_tuple, prev_meta, prev_tuple, self.indexes())
148 }
149
150 pub fn prepare_row_for_write(
151 &self,
152 txn: &mut TxnContext<'_>,
153 rid: RecordId,
154 observed_meta: &TupleMeta,
155 ) -> QuillSQLResult<Option<(TupleMeta, Tuple)>> {
156 self.table.prepare_row_for_write(txn, rid, observed_meta)
157 }
158
159 pub fn index_scan(
160 &self,
161 name: &str,
162 request: IndexScanRequest,
163 ) -> QuillSQLResult<Box<dyn TupleStream>> {
164 let handle = self
165 .indexes()
166 .iter()
167 .find(|idx| idx.name() == name)
168 .ok_or_else(|| {
169 crate::error::QuillSQLError::Execution(format!("index {} not found", name))
170 })?;
171 handle.range_scan(self.table(), request)
172 }
173}
174
175impl fmt::Debug for TableBinding {
176 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
177 f.debug_struct("TableBinding")
178 .field("table", &self.table.table_ref())
179 .field("index_count", &self.indexes.len())
180 .finish()
181 }
182}
183
184struct HeapTableHandle {
185 table_ref: TableReference,
186 heap: Arc<TableHeap>,
187}
188
189impl HeapTableHandle {
190 fn new(table_ref: TableReference, heap: Arc<TableHeap>) -> Self {
191 Self { table_ref, heap }
192 }
193}
194
195impl TableHandle for HeapTableHandle {
196 fn table_ref(&self) -> &TableReference {
197 &self.table_ref
198 }
199
200 fn schema(&self) -> SchemaRef {
201 self.heap.schema.clone()
202 }
203
204 fn table_heap(&self) -> Arc<TableHeap> {
205 self.heap.clone()
206 }
207
208 fn full_scan(&self) -> QuillSQLResult<Box<dyn TupleStream>> {
209 let iterator = TableIterator::new(self.heap.clone(), ..);
210 Ok(Box::new(HeapTableStream { iterator }))
211 }
212
213 fn insert(
214 &self,
215 txn: &mut TxnContext<'_>,
216 tuple: &Tuple,
217 indexes: &[Arc<dyn IndexHandle>],
218 ) -> QuillSQLResult<()> {
219 txn.ensure_writable(self.table_ref(), "INSERT")?;
220 let mvcc = MvccHeap::new(self.heap.clone());
221 let (rid, _) = mvcc.insert(tuple, txn.txn_id(), txn.command_id())?;
222
223 let mut index_links = Vec::new();
224 for handle in indexes {
225 if let Ok(key_tuple) = tuple.project_with_schema(handle.key_schema()) {
226 let index = handle.index();
227 index.insert_with_txn(&key_tuple, rid, txn.txn_id())?;
228 index_links.push((index, key_tuple));
229 }
230 }
231
232 txn.transaction_mut()
233 .push_insert_undo(self.heap.clone(), rid, index_links);
234 Ok(())
235 }
236
237 fn delete(
238 &self,
239 txn: &mut TxnContext<'_>,
240 rid: RecordId,
241 _prev_meta: TupleMeta,
242 prev_tuple: Tuple,
243 indexes: &[Arc<dyn IndexHandle>],
244 ) -> QuillSQLResult<()> {
245 txn.ensure_writable(self.table_ref(), "DELETE")?;
246 let mvcc = MvccHeap::new(self.heap.clone());
247 let prev_meta = mvcc.mark_deleted(rid, txn.txn_id(), txn.command_id())?;
248
249 let mut index_links = Vec::new();
250 for handle in indexes {
251 if let Ok(key_tuple) = prev_tuple.project_with_schema(handle.key_schema()) {
252 let index = handle.index();
253 index.delete_with_txn(&key_tuple, txn.txn_id())?;
254 index_links.push((index, key_tuple));
255 }
256 }
257
258 txn.transaction_mut().push_delete_undo(
259 self.heap.clone(),
260 rid,
261 prev_meta,
262 prev_tuple,
263 index_links,
264 );
265 Ok(())
266 }
267
268 fn update(
269 &self,
270 txn: &mut TxnContext<'_>,
271 rid: RecordId,
272 new_tuple: Tuple,
273 prev_meta: TupleMeta,
274 prev_tuple: Tuple,
275 indexes: &[Arc<dyn IndexHandle>],
276 ) -> QuillSQLResult<RecordId> {
277 txn.ensure_writable(self.table_ref(), "UPDATE")?;
278 let mvcc = MvccHeap::new(self.heap.clone());
279 let (new_rid, _) = mvcc.update(rid, new_tuple.clone(), txn.txn_id(), txn.command_id())?;
280
281 let mut old_keys = Vec::new();
282 for handle in indexes {
283 if let Ok(old_key_tuple) = prev_tuple.project_with_schema(handle.key_schema()) {
284 let index = handle.index();
285 index.delete_with_txn(&old_key_tuple, txn.txn_id())?;
286 old_keys.push((index, old_key_tuple));
287 }
288 }
289
290 let mut new_keys = Vec::new();
291 for handle in indexes {
292 if let Ok(new_key_tuple) = new_tuple.project_with_schema(handle.key_schema()) {
293 let index = handle.index();
294 index.insert_with_txn(&new_key_tuple, new_rid, txn.txn_id())?;
295 new_keys.push((index, new_key_tuple));
296 }
297 }
298
299 txn.transaction_mut().push_update_undo(
300 self.heap.clone(),
301 rid,
302 new_rid,
303 prev_meta,
304 prev_tuple,
305 new_keys,
306 old_keys,
307 );
308 Ok(new_rid)
309 }
310
311 fn prepare_row_for_write(
312 &self,
313 txn: &mut TxnContext<'_>,
314 rid: RecordId,
315 observed_meta: &TupleMeta,
316 ) -> QuillSQLResult<Option<(TupleMeta, Tuple)>> {
317 if !txn.is_visible(observed_meta) {
318 return Ok(None);
319 }
320 txn.lock_row_exclusive(self.table_ref(), rid)?;
321 let mvcc = MvccHeap::new(self.heap.clone());
322 let (current_meta, current_tuple) = mvcc.full_tuple(rid)?;
323 if !txn.is_visible(¤t_meta) {
324 txn.unlock_row(self.table_ref(), rid);
325 return Ok(None);
326 }
327 Ok(Some((current_meta, current_tuple)))
328 }
329}
330
331struct HeapTableStream {
332 iterator: TableIterator,
333}
334
335impl TupleStream for HeapTableStream {
336 fn next(&mut self) -> QuillSQLResult<Option<(RecordId, TupleMeta, Tuple)>> {
337 self.iterator.next()
338 }
339}
340
341struct BTreeIndexHandle {
342 name: String,
343 index: Arc<BPlusTreeIndex>,
344}
345
346impl BTreeIndexHandle {
347 fn new(name: String, index: Arc<BPlusTreeIndex>) -> Self {
348 Self { name, index }
349 }
350}
351
352impl IndexHandle for BTreeIndexHandle {
353 fn name(&self) -> &str {
354 &self.name
355 }
356
357 fn key_schema(&self) -> SchemaRef {
358 self.index.key_schema.clone()
359 }
360
361 fn index(&self) -> Arc<BPlusTreeIndex> {
362 self.index.clone()
363 }
364
365 fn range_scan(
366 &self,
367 table: Arc<dyn TableHandle>,
368 request: IndexScanRequest,
369 ) -> QuillSQLResult<Box<dyn TupleStream>> {
370 let iterator = TreeIndexIterator::new(self.index.clone(), (request.start, request.end));
371 Ok(Box::new(BTreeIndexStream { iterator, table }))
372 }
373}
374
375struct BTreeIndexStream {
376 iterator: TreeIndexIterator,
377 table: Arc<dyn TableHandle>,
378}
379
380impl TupleStream for BTreeIndexStream {
381 fn next(&mut self) -> QuillSQLResult<Option<(RecordId, TupleMeta, Tuple)>> {
382 loop {
383 let Some(rid) = self.iterator.next()? else {
384 return Ok(None);
385 };
386 if let Ok((meta, tuple)) = self.table.table_heap().full_tuple(rid) {
387 return Ok(Some((rid, meta, tuple)));
388 }
389 }
390 }
391}
392
393impl StorageEngine for DefaultStorageEngine {
394 fn table(&self, catalog: &Catalog, table: &TableReference) -> QuillSQLResult<TableBinding> {
395 let heap = catalog.table_heap(table)?;
396 let handle: Arc<dyn TableHandle> = Arc::new(HeapTableHandle::new(table.clone(), heap));
397 let indexes = catalog.table_indexes(table)?;
398 let index_handles = indexes
399 .into_iter()
400 .map(|(name, index)| {
401 Arc::new(BTreeIndexHandle::new(name, index)) as Arc<dyn IndexHandle>
402 })
403 .collect();
404 Ok(TableBinding::new(handle, index_handles))
405 }
406}