1use crate::error::QuillSQLResult;
2use crate::recovery::Lsn;
3use crate::storage::codec::TupleCodec;
4use crate::storage::heap::wal_codec::{
5 HeapDeletePayload, HeapInsertPayload, HeapRecordPayload, TupleMetaRepr,
6};
7use crate::storage::index::btree_index::BPlusTreeIndex;
8use crate::storage::page::{RecordId, TupleMeta};
9use crate::storage::table_heap::TableHeap;
10use crate::storage::tuple::Tuple;
11use sqlparser::ast::TransactionAccessMode;
12use std::str::FromStr;
13use std::sync::Arc;
14
15use super::TransactionSnapshot;
16
17pub type TransactionId = u64;
18pub type CommandId = u32;
19
20pub const INVALID_COMMAND_ID: CommandId = CommandId::MAX;
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub enum IsolationLevel {
24 ReadUncommitted,
25 ReadCommitted,
26 RepeatableRead,
27 Serializable,
28}
29
30impl IsolationLevel {
31 pub fn as_str(&self) -> &'static str {
32 match self {
33 IsolationLevel::ReadUncommitted => "read-uncommitted",
34 IsolationLevel::ReadCommitted => "read-committed",
35 IsolationLevel::RepeatableRead => "repeatable-read",
36 IsolationLevel::Serializable => "serializable",
37 }
38 }
39}
40
41impl FromStr for IsolationLevel {
42 type Err = String;
43
44 fn from_str(s: &str) -> Result<Self, Self::Err> {
45 match s.trim().to_ascii_lowercase().as_str() {
46 "read-uncommitted" | "ru" => Ok(IsolationLevel::ReadUncommitted),
47 "read-committed" | "rc" => Ok(IsolationLevel::ReadCommitted),
48 "repeatable-read" | "rr" => Ok(IsolationLevel::RepeatableRead),
49 "serializable" | "sr" | "serial" => Ok(IsolationLevel::Serializable),
50 other => Err(format!("unknown isolation level '{}'", other)),
51 }
52 }
53}
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum TransactionState {
57 Running,
58 Tainted,
59 Committed,
60 Aborted,
61}
62
63#[derive(Debug, Clone)]
64pub enum UndoAction {
65 Insert {
66 table: Arc<TableHeap>,
67 rid: RecordId,
68 indexes: Vec<(Arc<BPlusTreeIndex>, Tuple)>,
69 },
70 Delete {
71 table: Arc<TableHeap>,
72 rid: RecordId,
73 prev_meta: TupleMeta,
74 prev_tuple: Tuple,
75 indexes: Vec<(Arc<BPlusTreeIndex>, Tuple)>,
76 },
77}
78
79impl UndoAction {
80 pub fn undo(self, txn_id: TransactionId) -> QuillSQLResult<()> {
81 match self {
82 UndoAction::Insert {
83 table,
84 rid,
85 indexes,
86 } => {
87 for (index, key) in indexes.into_iter() {
88 index.delete_with_txn(&key, txn_id)?;
89 }
90 table.recover_delete_tuple(rid, txn_id, 0)?;
91 Ok(())
92 }
93 UndoAction::Delete {
94 table,
95 rid,
96 prev_meta,
97 prev_tuple,
98 indexes,
99 } => {
100 for (index, key) in indexes {
101 index.insert_with_txn(&key, rid, txn_id)?;
102 }
103 table.recover_restore_tuple(rid, prev_meta, &prev_tuple)?;
104 Ok(())
105 }
106 }
107 }
108
109 pub fn to_heap_payload(&self, txn_id: TransactionId) -> QuillSQLResult<HeapRecordPayload> {
110 match self {
111 UndoAction::Insert { table, rid, .. } => {
112 let (meta, tuple) = table.full_tuple(*rid)?;
113 let mut deleted_meta = meta;
114 deleted_meta.mark_deleted(txn_id, 0);
115 Ok(HeapRecordPayload::Delete(HeapDeletePayload {
116 relation: table.relation_ident(),
117 page_id: rid.page_id,
118 slot_id: rid.slot_num as u16,
119 op_txn_id: meta.insert_txn_id,
120 new_tuple_meta: TupleMetaRepr::from(deleted_meta),
121 old_tuple_meta: TupleMetaRepr::from(meta),
122 old_tuple_data: TupleCodec::encode(&tuple),
123 }))
124 }
125 UndoAction::Delete {
126 table,
127 rid,
128 prev_meta,
129 prev_tuple,
130 ..
131 } => Ok(HeapRecordPayload::Insert(HeapInsertPayload {
132 relation: table.relation_ident(),
133 page_id: rid.page_id,
134 slot_id: rid.slot_num as u16,
135 op_txn_id: txn_id,
136 tuple_meta: TupleMetaRepr::from(*prev_meta),
137 tuple_data: TupleCodec::encode(prev_tuple),
138 })),
139 }
140 }
141}
142
143pub struct Transaction {
144 id: TransactionId,
145 isolation_level: IsolationLevel,
146 access_mode: TransactionAccessMode,
147 state: TransactionState,
148 synchronous_commit: bool,
149 begin_lsn: Option<Lsn>,
150 last_lsn: Option<Lsn>,
151 undo_actions: Vec<UndoAction>,
152 current_command_id: CommandId,
153 next_command_id: CommandId,
154 snapshot: Option<TransactionSnapshot>,
155}
156
157impl Transaction {
158 pub fn new(
159 id: TransactionId,
160 isolation_level: IsolationLevel,
161 access_mode: TransactionAccessMode,
162 synchronous_commit: bool,
163 ) -> Self {
164 Self {
165 id,
166 isolation_level,
167 access_mode,
168 state: TransactionState::Running,
169 synchronous_commit,
170 begin_lsn: None,
171 last_lsn: None,
172 undo_actions: Vec::new(),
173 current_command_id: INVALID_COMMAND_ID,
174 next_command_id: 0,
175 snapshot: None,
176 }
177 }
178
179 pub fn id(&self) -> TransactionId {
180 self.id
181 }
182
183 pub fn isolation_level(&self) -> IsolationLevel {
184 self.isolation_level
185 }
186
187 pub fn access_mode(&self) -> TransactionAccessMode {
188 self.access_mode
189 }
190
191 pub fn set_isolation_level(&mut self, isolation_level: IsolationLevel) {
192 self.isolation_level = isolation_level;
193 if matches!(
194 isolation_level,
195 IsolationLevel::ReadCommitted | IsolationLevel::ReadUncommitted
196 ) {
197 self.clear_snapshot();
198 }
199 }
200
201 pub fn state(&self) -> TransactionState {
202 self.state
203 }
204
205 pub fn synchronous_commit(&self) -> bool {
206 self.synchronous_commit
207 }
208
209 pub fn begin_lsn(&self) -> Option<Lsn> {
210 self.begin_lsn
211 }
212
213 pub fn last_lsn(&self) -> Option<Lsn> {
214 self.last_lsn
215 }
216
217 pub fn begin_command(&mut self) -> CommandId {
218 let cid = self.next_command_id;
219 self.current_command_id = cid;
220 self.next_command_id = self.next_command_id.wrapping_add(1);
221 cid
222 }
223
224 pub fn current_command_id(&self) -> CommandId {
225 self.current_command_id
226 }
227
228 pub(crate) fn set_begin_lsn(&mut self, lsn: Lsn) {
229 self.begin_lsn = Some(lsn);
230 self.last_lsn = Some(lsn);
231 }
232
233 pub(crate) fn record_lsn(&mut self, lsn: Lsn) {
234 self.last_lsn = Some(lsn);
235 }
236
237 pub(crate) fn set_state(&mut self, state: TransactionState) {
238 self.state = state;
239 }
240
241 #[allow(dead_code)]
242 pub(crate) fn set_access_mode(&mut self, access_mode: TransactionAccessMode) {
243 self.access_mode = access_mode;
244 }
245
246 pub fn update_access_mode(&mut self, access_mode: TransactionAccessMode) {
247 self.access_mode = access_mode;
248 }
249
250 #[allow(dead_code)]
251 pub(crate) fn mark_tainted(&mut self) {
252 self.state = TransactionState::Tainted;
253 }
254
255 pub fn push_insert_undo(
256 &mut self,
257 table: Arc<TableHeap>,
258 rid: RecordId,
259 indexes: Vec<(Arc<BPlusTreeIndex>, Tuple)>,
260 ) {
261 self.undo_actions.push(UndoAction::Insert {
262 table,
263 rid,
264 indexes,
265 });
266 }
267
268 pub fn push_update_undo(
269 &mut self,
270 table: Arc<TableHeap>,
271 old_rid: RecordId,
272 new_rid: RecordId,
273 prev_meta: TupleMeta,
274 prev_tuple: Tuple,
275 new_keys: Vec<(Arc<BPlusTreeIndex>, Tuple)>,
276 old_keys: Vec<(Arc<BPlusTreeIndex>, Tuple)>,
277 ) {
278 self.undo_actions.push(UndoAction::Insert {
279 table: table.clone(),
280 rid: new_rid,
281 indexes: new_keys,
282 });
283 self.undo_actions.push(UndoAction::Delete {
284 table: table.clone(),
285 rid: old_rid,
286 prev_meta,
287 prev_tuple,
288 indexes: old_keys,
289 });
290 }
291
292 pub fn push_delete_undo(
293 &mut self,
294 table: Arc<TableHeap>,
295 rid: RecordId,
296 prev_meta: TupleMeta,
297 prev_tuple: Tuple,
298 indexes: Vec<(Arc<BPlusTreeIndex>, Tuple)>,
299 ) {
300 self.undo_actions.push(UndoAction::Delete {
301 table,
302 rid,
303 prev_meta,
304 prev_tuple,
305 indexes,
306 });
307 }
308
309 pub fn pop_undo_action(&mut self) -> Option<UndoAction> {
310 self.undo_actions.pop()
311 }
312
313 pub fn clear_undo(&mut self) {
314 self.undo_actions.clear();
315 }
316
317 pub fn snapshot(&self) -> Option<&TransactionSnapshot> {
318 self.snapshot.as_ref()
319 }
320
321 pub fn set_snapshot(&mut self, snapshot: TransactionSnapshot) {
322 self.snapshot = Some(snapshot);
323 }
324
325 pub fn clear_snapshot(&mut self) {
326 self.snapshot = None;
327 }
328}