1use std::fmt;
2use std::ops::Bound;
3use std::sync::Arc;
4
5use crate::catalog::{Catalog, SchemaRef};
6use crate::error::{QuillSQLError, QuillSQLResult};
7use crate::storage::holt::{HoltIndexHandle, HoltStore, HoltTableHandle};
8use crate::storage::record::{RecordId, TupleMeta};
9use crate::storage::tuple::Tuple;
10use crate::transaction::{TransactionId, TxnContext};
11use crate::utils::table_ref::TableReference;
12
13#[derive(Debug, Clone)]
14pub struct IndexScanRequest {
15 pub start: Bound<Tuple>,
16 pub end: Bound<Tuple>,
17}
18
19impl IndexScanRequest {
20 pub fn new(start: Bound<Tuple>, end: Bound<Tuple>) -> Self {
21 Self { start, end }
22 }
23}
24
25pub trait TupleStream {
26 fn next(&mut self) -> QuillSQLResult<Option<(RecordId, TupleMeta, Tuple)>>;
27}
28
29pub trait TableHandle: Send + Sync {
30 fn table_ref(&self) -> &TableReference;
31 fn schema(&self) -> SchemaRef;
32 fn full_scan(&self) -> QuillSQLResult<Box<dyn TupleStream>>;
33 fn full_tuple(&self, rid: RecordId) -> QuillSQLResult<(TupleMeta, Tuple)>;
34
35 fn insert(
36 &self,
37 txn: &mut TxnContext<'_>,
38 tuple: &Tuple,
39 indexes: &[Arc<dyn IndexHandle>],
40 ) -> QuillSQLResult<()>;
41
42 fn delete(
43 &self,
44 txn: &mut TxnContext<'_>,
45 rid: RecordId,
46 prev_meta: TupleMeta,
47 prev_tuple: Tuple,
48 indexes: &[Arc<dyn IndexHandle>],
49 ) -> QuillSQLResult<()>;
50
51 fn update(
52 &self,
53 txn: &mut TxnContext<'_>,
54 rid: RecordId,
55 new_tuple: Tuple,
56 prev_meta: TupleMeta,
57 prev_tuple: Tuple,
58 indexes: &[Arc<dyn IndexHandle>],
59 ) -> QuillSQLResult<RecordId>;
60
61 fn prepare_row_for_write(
62 &self,
63 txn: &mut TxnContext<'_>,
64 rid: RecordId,
65 observed_meta: &TupleMeta,
66 ) -> QuillSQLResult<Option<(TupleMeta, Tuple)>>;
67
68 fn undo_insert(&self, rid: RecordId, txn_id: TransactionId) -> QuillSQLResult<()>;
69
70 fn undo_delete(
71 &self,
72 rid: RecordId,
73 prev_meta: TupleMeta,
74 prev_tuple: Tuple,
75 ) -> QuillSQLResult<()>;
76}
77
78pub trait IndexHandle: Send + Sync {
79 fn name(&self) -> &str;
80 fn key_schema(&self) -> SchemaRef;
81 fn index_id(&self) -> u64;
82 fn insert(&self, key: &Tuple, rid: RecordId, txn_id: TransactionId) -> QuillSQLResult<()>;
83 fn delete(&self, key: &Tuple, rid: RecordId, txn_id: TransactionId) -> QuillSQLResult<()>;
84 fn range_scan(
85 &self,
86 table: Arc<dyn TableHandle>,
87 request: IndexScanRequest,
88 ) -> QuillSQLResult<Box<dyn TupleStream>>;
89}
90
91pub struct HoltStorage {
92 holt_store: Arc<HoltStore>,
93}
94
95impl HoltStorage {
96 pub fn new(holt_store: Arc<HoltStore>) -> Self {
97 Self { holt_store }
98 }
99
100 pub fn table(&self, catalog: &Catalog, table: &TableReference) -> QuillSQLResult<TableBinding> {
101 let table_id = catalog.table_id(table)?;
102 let schema = catalog.table_schema(table)?;
103 let handle: Arc<dyn TableHandle> = Arc::new(HoltTableHandle::new(
104 table.clone(),
105 schema,
106 table_id,
107 self.holt_store.clone(),
108 ));
109 let indexes = catalog.table_indexes(table)?;
110 let index_handles = indexes
111 .into_iter()
112 .map(|index| {
113 Ok(Arc::new(HoltIndexHandle::new(
114 index.name,
115 index.key_schema,
116 index.index_id,
117 self.holt_store.clone(),
118 )) as Arc<dyn IndexHandle>)
119 })
120 .collect::<QuillSQLResult<Vec<_>>>()?;
121 Ok(TableBinding::new(handle, index_handles))
122 }
123}
124
125#[derive(Clone)]
126pub struct TableBinding {
127 table: Arc<dyn TableHandle>,
128 indexes: Arc<Vec<Arc<dyn IndexHandle>>>,
129}
130
131impl TableBinding {
132 fn new(table: Arc<dyn TableHandle>, indexes: Vec<Arc<dyn IndexHandle>>) -> Self {
133 Self {
134 table,
135 indexes: Arc::new(indexes),
136 }
137 }
138
139 pub fn indexes(&self) -> &[Arc<dyn IndexHandle>] {
140 self.indexes.as_ref()
141 }
142
143 pub fn scan(&self) -> QuillSQLResult<Box<dyn TupleStream>> {
144 self.table.full_scan()
145 }
146
147 pub fn insert(&self, txn: &mut TxnContext<'_>, tuple: &Tuple) -> QuillSQLResult<()> {
148 self.table.insert(txn, tuple, self.indexes())
149 }
150
151 pub fn delete(
152 &self,
153 txn: &mut TxnContext<'_>,
154 rid: RecordId,
155 prev_meta: TupleMeta,
156 prev_tuple: Tuple,
157 ) -> QuillSQLResult<()> {
158 self.table
159 .delete(txn, rid, prev_meta, prev_tuple, self.indexes())
160 }
161
162 pub fn update(
163 &self,
164 txn: &mut TxnContext<'_>,
165 rid: RecordId,
166 new_tuple: Tuple,
167 prev_meta: TupleMeta,
168 prev_tuple: Tuple,
169 ) -> QuillSQLResult<RecordId> {
170 self.table
171 .update(txn, rid, new_tuple, prev_meta, prev_tuple, self.indexes())
172 }
173
174 pub fn prepare_row_for_write(
175 &self,
176 txn: &mut TxnContext<'_>,
177 rid: RecordId,
178 observed_meta: &TupleMeta,
179 ) -> QuillSQLResult<Option<(TupleMeta, Tuple)>> {
180 self.table.prepare_row_for_write(txn, rid, observed_meta)
181 }
182
183 pub fn index_scan(
184 &self,
185 name: &str,
186 request: IndexScanRequest,
187 ) -> QuillSQLResult<Box<dyn TupleStream>> {
188 let handle = self
189 .indexes()
190 .iter()
191 .find(|idx| idx.name() == name)
192 .ok_or_else(|| QuillSQLError::Execution(format!("index {} not found", name)))?;
193 handle.range_scan(self.table.clone(), request)
194 }
195}
196
197impl fmt::Debug for TableBinding {
198 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
199 f.debug_struct("TableBinding")
200 .field("table", &self.table.table_ref())
201 .field("index_count", &self.indexes.len())
202 .finish()
203 }
204}