1use crate::error::QuillSQLResult;
2use crate::recovery::Lsn;
3use crate::storage::codec::TupleCodec;
4use crate::storage::heap::wal_codec::{
5 HeapDeletePayload, HeapRecordPayload, HeapUpdatePayload, TupleMetaRepr,
6};
7use crate::storage::index::btree_index::BPlusTreeIndex;
8use crate::storage::page::{RecordId, TupleMeta};
9use crate::storage::table_heap::TableHeap;
10use crate::storage::tuple::Tuple;
11use sqlparser::ast::TransactionAccessMode;
12use std::str::FromStr;
13use std::sync::Arc;
14
15use super::TransactionSnapshot;
16
17pub type TransactionId = u64;
18pub type CommandId = u32;
19
20pub const INVALID_COMMAND_ID: CommandId = CommandId::MAX;
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub enum IsolationLevel {
24 ReadUncommitted,
25 ReadCommitted,
26 RepeatableRead,
27 Serializable,
28}
29
30impl IsolationLevel {
31 pub fn as_str(&self) -> &'static str {
32 match self {
33 IsolationLevel::ReadUncommitted => "read-uncommitted",
34 IsolationLevel::ReadCommitted => "read-committed",
35 IsolationLevel::RepeatableRead => "repeatable-read",
36 IsolationLevel::Serializable => "serializable",
37 }
38 }
39}
40
41impl FromStr for IsolationLevel {
42 type Err = String;
43
44 fn from_str(s: &str) -> Result<Self, Self::Err> {
45 match s.trim().to_ascii_lowercase().as_str() {
46 "read-uncommitted" | "ru" => Ok(IsolationLevel::ReadUncommitted),
47 "read-committed" | "rc" => Ok(IsolationLevel::ReadCommitted),
48 "repeatable-read" | "rr" => Ok(IsolationLevel::RepeatableRead),
49 "serializable" | "sr" | "serial" => Ok(IsolationLevel::Serializable),
50 other => Err(format!("unknown isolation level '{}'", other)),
51 }
52 }
53}
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum TransactionState {
57 Running,
58 Tainted,
59 Committed,
60 Aborted,
61}
62
63#[derive(Debug, Clone)]
64pub enum UndoAction {
65 Insert {
66 table: Arc<TableHeap>,
67 rid: RecordId,
68 indexes: Vec<(Arc<BPlusTreeIndex>, Tuple)>,
69 },
70 Update {
71 table: Arc<TableHeap>,
72 rid: RecordId,
73 prev_meta: TupleMeta,
74 prev_tuple: Tuple,
75 },
76 Delete {
77 table: Arc<TableHeap>,
78 rid: RecordId,
79 prev_meta: TupleMeta,
80 prev_tuple: Tuple,
81 },
82}
83
84impl UndoAction {
85 pub fn undo(self, txn_id: TransactionId) -> QuillSQLResult<()> {
86 match self {
87 UndoAction::Insert {
88 table,
89 rid,
90 indexes,
91 } => {
92 for (index, key) in indexes.into_iter() {
93 index.delete(&key)?;
94 }
95 table.recover_delete_tuple(rid, txn_id, 0)?;
96 Ok(())
97 }
98 UndoAction::Update {
99 table,
100 rid,
101 prev_meta,
102 prev_tuple,
103 } => {
104 table.recover_restore_tuple(rid, prev_meta, &prev_tuple)?;
105 Ok(())
106 }
107 UndoAction::Delete {
108 table,
109 rid,
110 prev_meta,
111 prev_tuple,
112 } => {
113 table.recover_restore_tuple(rid, prev_meta, &prev_tuple)?;
114 Ok(())
115 }
116 }
117 }
118
119 pub fn to_heap_payload(&self) -> QuillSQLResult<HeapRecordPayload> {
120 match self {
121 UndoAction::Insert { table, rid, .. } => {
122 let (meta, tuple) = table.full_tuple(*rid)?;
123 Ok(HeapRecordPayload::Delete(HeapDeletePayload {
124 relation: table.relation_ident(),
125 page_id: rid.page_id,
126 slot_id: rid.slot_num as u16,
127 op_txn_id: meta.insert_txn_id,
128 old_tuple_meta: TupleMetaRepr::from(meta),
129 old_tuple_data: Some(TupleCodec::encode(&tuple)),
130 }))
131 }
132 UndoAction::Update {
133 table,
134 rid,
135 prev_meta,
136 prev_tuple,
137 ..
138 } => Ok(HeapRecordPayload::Update(HeapUpdatePayload {
139 relation: table.relation_ident(),
140 page_id: rid.page_id,
141 slot_id: rid.slot_num as u16,
142 op_txn_id: prev_meta.insert_txn_id,
143 new_tuple_meta: TupleMetaRepr::from(*prev_meta),
144 new_tuple_data: TupleCodec::encode(prev_tuple),
145 old_tuple_meta: None,
146 old_tuple_data: None,
147 })),
148 UndoAction::Delete {
149 table,
150 rid,
151 prev_meta,
152 prev_tuple,
153 ..
154 } => Ok(HeapRecordPayload::Update(HeapUpdatePayload {
155 relation: table.relation_ident(),
156 page_id: rid.page_id,
157 slot_id: rid.slot_num as u16,
158 op_txn_id: prev_meta.insert_txn_id,
159 new_tuple_meta: TupleMetaRepr::from(*prev_meta),
160 new_tuple_data: TupleCodec::encode(prev_tuple),
161 old_tuple_meta: None,
162 old_tuple_data: None,
163 })),
164 }
165 }
166}
167
168pub struct Transaction {
169 id: TransactionId,
170 isolation_level: IsolationLevel,
171 access_mode: TransactionAccessMode,
172 state: TransactionState,
173 synchronous_commit: bool,
174 begin_lsn: Option<Lsn>,
175 last_lsn: Option<Lsn>,
176 undo_actions: Vec<UndoAction>,
177 current_command_id: CommandId,
178 next_command_id: CommandId,
179 snapshot: Option<TransactionSnapshot>,
180}
181
182impl Transaction {
183 pub fn new(
184 id: TransactionId,
185 isolation_level: IsolationLevel,
186 access_mode: TransactionAccessMode,
187 synchronous_commit: bool,
188 ) -> Self {
189 Self {
190 id,
191 isolation_level,
192 access_mode,
193 state: TransactionState::Running,
194 synchronous_commit,
195 begin_lsn: None,
196 last_lsn: None,
197 undo_actions: Vec::new(),
198 current_command_id: INVALID_COMMAND_ID,
199 next_command_id: 0,
200 snapshot: None,
201 }
202 }
203
204 pub fn id(&self) -> TransactionId {
205 self.id
206 }
207
208 pub fn isolation_level(&self) -> IsolationLevel {
209 self.isolation_level
210 }
211
212 pub fn access_mode(&self) -> TransactionAccessMode {
213 self.access_mode
214 }
215
216 pub fn set_isolation_level(&mut self, isolation_level: IsolationLevel) {
217 self.isolation_level = isolation_level;
218 if matches!(
219 isolation_level,
220 IsolationLevel::ReadCommitted | IsolationLevel::ReadUncommitted
221 ) {
222 self.clear_snapshot();
223 }
224 }
225
226 pub fn state(&self) -> TransactionState {
227 self.state
228 }
229
230 pub fn synchronous_commit(&self) -> bool {
231 self.synchronous_commit
232 }
233
234 pub fn begin_lsn(&self) -> Option<Lsn> {
235 self.begin_lsn
236 }
237
238 pub fn last_lsn(&self) -> Option<Lsn> {
239 self.last_lsn
240 }
241
242 pub fn begin_command(&mut self) -> CommandId {
243 let cid = self.next_command_id;
244 self.current_command_id = cid;
245 self.next_command_id = self.next_command_id.wrapping_add(1);
246 cid
247 }
248
249 pub fn current_command_id(&self) -> CommandId {
250 self.current_command_id
251 }
252
253 pub(crate) fn set_begin_lsn(&mut self, lsn: Lsn) {
254 self.begin_lsn = Some(lsn);
255 self.last_lsn = Some(lsn);
256 }
257
258 pub(crate) fn record_lsn(&mut self, lsn: Lsn) {
259 self.last_lsn = Some(lsn);
260 }
261
262 pub(crate) fn set_state(&mut self, state: TransactionState) {
263 self.state = state;
264 }
265
266 #[allow(dead_code)]
267 pub(crate) fn set_access_mode(&mut self, access_mode: TransactionAccessMode) {
268 self.access_mode = access_mode;
269 }
270
271 pub fn update_access_mode(&mut self, access_mode: TransactionAccessMode) {
272 self.access_mode = access_mode;
273 }
274
275 #[allow(dead_code)]
276 pub(crate) fn mark_tainted(&mut self) {
277 self.state = TransactionState::Tainted;
278 }
279
280 pub fn push_insert_undo(
281 &mut self,
282 table: Arc<TableHeap>,
283 rid: RecordId,
284 indexes: Vec<(Arc<BPlusTreeIndex>, Tuple)>,
285 ) {
286 self.undo_actions.push(UndoAction::Insert {
287 table,
288 rid,
289 indexes,
290 });
291 }
292
293 pub fn push_update_undo(
294 &mut self,
295 table: Arc<TableHeap>,
296 old_rid: RecordId,
297 new_rid: RecordId,
298 prev_meta: TupleMeta,
299 prev_tuple: Tuple,
300 new_keys: Vec<(Arc<BPlusTreeIndex>, Tuple)>,
301 ) {
302 self.undo_actions.push(UndoAction::Update {
303 table: table.clone(),
304 rid: old_rid,
305 prev_meta,
306 prev_tuple,
307 });
308 self.undo_actions.push(UndoAction::Insert {
309 table,
310 rid: new_rid,
311 indexes: new_keys,
312 });
313 }
314
315 pub fn push_delete_undo(
316 &mut self,
317 table: Arc<TableHeap>,
318 rid: RecordId,
319 prev_meta: TupleMeta,
320 prev_tuple: Tuple,
321 ) {
322 self.undo_actions.push(UndoAction::Delete {
323 table,
324 rid,
325 prev_meta,
326 prev_tuple,
327 });
328 }
329
330 pub fn pop_undo_action(&mut self) -> Option<UndoAction> {
331 self.undo_actions.pop()
332 }
333
334 pub fn clear_undo(&mut self) {
335 self.undo_actions.clear();
336 }
337
338 pub fn snapshot(&self) -> Option<&TransactionSnapshot> {
339 self.snapshot.as_ref()
340 }
341
342 pub fn set_snapshot(&mut self, snapshot: TransactionSnapshot) {
343 self.snapshot = Some(snapshot);
344 }
345
346 pub fn clear_snapshot(&mut self) {
347 self.snapshot = None;
348 }
349}