Skip to main content

quill_sql/storage/
engine.rs

1use std::fmt;
2use std::ops::Bound;
3use std::sync::Arc;
4
5use crate::catalog::{Catalog, SchemaRef};
6use crate::error::{QuillSQLError, QuillSQLResult};
7use crate::storage::holt::{HoltIndexHandle, HoltStore, HoltTableHandle};
8use crate::storage::record::{RecordId, TupleMeta};
9use crate::storage::tuple::Tuple;
10use crate::transaction::{TransactionId, TxnContext};
11use crate::utils::table_ref::TableReference;
12
13#[derive(Debug, Clone)]
14pub struct IndexScanRequest {
15    pub start: Bound<Tuple>,
16    pub end: Bound<Tuple>,
17}
18
19impl IndexScanRequest {
20    pub fn new(start: Bound<Tuple>, end: Bound<Tuple>) -> Self {
21        Self { start, end }
22    }
23}
24
25pub trait TupleStream {
26    fn next(&mut self) -> QuillSQLResult<Option<(RecordId, TupleMeta, Tuple)>>;
27}
28
29pub trait TableHandle: Send + Sync {
30    fn table_ref(&self) -> &TableReference;
31    fn schema(&self) -> SchemaRef;
32    fn full_scan(&self) -> QuillSQLResult<Box<dyn TupleStream>>;
33    fn full_tuple(&self, rid: RecordId) -> QuillSQLResult<(TupleMeta, Tuple)>;
34
35    fn insert(
36        &self,
37        txn: &mut TxnContext<'_>,
38        tuple: &Tuple,
39        indexes: &[Arc<dyn IndexHandle>],
40    ) -> QuillSQLResult<()>;
41
42    fn delete(
43        &self,
44        txn: &mut TxnContext<'_>,
45        rid: RecordId,
46        prev_meta: TupleMeta,
47        prev_tuple: Tuple,
48        indexes: &[Arc<dyn IndexHandle>],
49    ) -> QuillSQLResult<()>;
50
51    fn update(
52        &self,
53        txn: &mut TxnContext<'_>,
54        rid: RecordId,
55        new_tuple: Tuple,
56        prev_meta: TupleMeta,
57        prev_tuple: Tuple,
58        indexes: &[Arc<dyn IndexHandle>],
59    ) -> QuillSQLResult<RecordId>;
60
61    fn prepare_row_for_write(
62        &self,
63        txn: &mut TxnContext<'_>,
64        rid: RecordId,
65        observed_meta: &TupleMeta,
66    ) -> QuillSQLResult<Option<(TupleMeta, Tuple)>>;
67
68    fn undo_insert(&self, rid: RecordId, txn_id: TransactionId) -> QuillSQLResult<()>;
69
70    fn undo_delete(
71        &self,
72        rid: RecordId,
73        prev_meta: TupleMeta,
74        prev_tuple: Tuple,
75    ) -> QuillSQLResult<()>;
76}
77
78pub trait IndexHandle: Send + Sync {
79    fn name(&self) -> &str;
80    fn key_schema(&self) -> SchemaRef;
81    fn index_id(&self) -> u64;
82    fn insert(&self, key: &Tuple, rid: RecordId, txn_id: TransactionId) -> QuillSQLResult<()>;
83    fn delete(&self, key: &Tuple, rid: RecordId, txn_id: TransactionId) -> QuillSQLResult<()>;
84    fn range_scan(
85        &self,
86        table: Arc<dyn TableHandle>,
87        request: IndexScanRequest,
88    ) -> QuillSQLResult<Box<dyn TupleStream>>;
89}
90
91pub struct HoltStorage {
92    holt_store: Arc<HoltStore>,
93}
94
95impl HoltStorage {
96    pub fn new(holt_store: Arc<HoltStore>) -> Self {
97        Self { holt_store }
98    }
99
100    pub fn table(&self, catalog: &Catalog, table: &TableReference) -> QuillSQLResult<TableBinding> {
101        let table_id = catalog.table_id(table)?;
102        let schema = catalog.table_schema(table)?;
103        let handle: Arc<dyn TableHandle> = Arc::new(HoltTableHandle::new(
104            table.clone(),
105            schema,
106            table_id,
107            self.holt_store.clone(),
108        ));
109        let indexes = catalog.table_indexes(table)?;
110        let index_handles = indexes
111            .into_iter()
112            .map(|index| {
113                Ok(Arc::new(HoltIndexHandle::new(
114                    index.name,
115                    index.key_schema,
116                    index.index_id,
117                    self.holt_store.clone(),
118                )) as Arc<dyn IndexHandle>)
119            })
120            .collect::<QuillSQLResult<Vec<_>>>()?;
121        Ok(TableBinding::new(handle, index_handles))
122    }
123}
124
125#[derive(Clone)]
126pub struct TableBinding {
127    table: Arc<dyn TableHandle>,
128    indexes: Arc<Vec<Arc<dyn IndexHandle>>>,
129}
130
131impl TableBinding {
132    fn new(table: Arc<dyn TableHandle>, indexes: Vec<Arc<dyn IndexHandle>>) -> Self {
133        Self {
134            table,
135            indexes: Arc::new(indexes),
136        }
137    }
138
139    pub fn indexes(&self) -> &[Arc<dyn IndexHandle>] {
140        self.indexes.as_ref()
141    }
142
143    pub fn scan(&self) -> QuillSQLResult<Box<dyn TupleStream>> {
144        self.table.full_scan()
145    }
146
147    pub fn insert(&self, txn: &mut TxnContext<'_>, tuple: &Tuple) -> QuillSQLResult<()> {
148        self.table.insert(txn, tuple, self.indexes())
149    }
150
151    pub fn delete(
152        &self,
153        txn: &mut TxnContext<'_>,
154        rid: RecordId,
155        prev_meta: TupleMeta,
156        prev_tuple: Tuple,
157    ) -> QuillSQLResult<()> {
158        self.table
159            .delete(txn, rid, prev_meta, prev_tuple, self.indexes())
160    }
161
162    pub fn update(
163        &self,
164        txn: &mut TxnContext<'_>,
165        rid: RecordId,
166        new_tuple: Tuple,
167        prev_meta: TupleMeta,
168        prev_tuple: Tuple,
169    ) -> QuillSQLResult<RecordId> {
170        self.table
171            .update(txn, rid, new_tuple, prev_meta, prev_tuple, self.indexes())
172    }
173
174    pub fn prepare_row_for_write(
175        &self,
176        txn: &mut TxnContext<'_>,
177        rid: RecordId,
178        observed_meta: &TupleMeta,
179    ) -> QuillSQLResult<Option<(TupleMeta, Tuple)>> {
180        self.table.prepare_row_for_write(txn, rid, observed_meta)
181    }
182
183    pub fn index_scan(
184        &self,
185        name: &str,
186        request: IndexScanRequest,
187    ) -> QuillSQLResult<Box<dyn TupleStream>> {
188        let handle = self
189            .indexes()
190            .iter()
191            .find(|idx| idx.name() == name)
192            .ok_or_else(|| QuillSQLError::Execution(format!("index {} not found", name)))?;
193        handle.range_scan(self.table.clone(), request)
194    }
195}
196
197impl fmt::Debug for TableBinding {
198    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
199        f.debug_struct("TableBinding")
200            .field("table", &self.table.table_ref())
201            .field("index_count", &self.indexes.len())
202            .finish()
203    }
204}