quill_sql/transaction/
transaction.rs

1use crate::error::QuillSQLResult;
2use crate::recovery::Lsn;
3use crate::storage::codec::TupleCodec;
4use crate::storage::heap::wal_codec::{
5    HeapDeletePayload, HeapRecordPayload, HeapUpdatePayload, TupleMetaRepr,
6};
7use crate::storage::index::btree_index::BPlusTreeIndex;
8use crate::storage::page::{RecordId, TupleMeta};
9use crate::storage::table_heap::TableHeap;
10use crate::storage::tuple::Tuple;
11use sqlparser::ast::TransactionAccessMode;
12use std::str::FromStr;
13use std::sync::Arc;
14
15use super::TransactionSnapshot;
16
17pub type TransactionId = u64;
18pub type CommandId = u32;
19
20pub const INVALID_COMMAND_ID: CommandId = CommandId::MAX;
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub enum IsolationLevel {
24    ReadUncommitted,
25    ReadCommitted,
26    RepeatableRead,
27    Serializable,
28}
29
30impl IsolationLevel {
31    pub fn as_str(&self) -> &'static str {
32        match self {
33            IsolationLevel::ReadUncommitted => "read-uncommitted",
34            IsolationLevel::ReadCommitted => "read-committed",
35            IsolationLevel::RepeatableRead => "repeatable-read",
36            IsolationLevel::Serializable => "serializable",
37        }
38    }
39}
40
41impl FromStr for IsolationLevel {
42    type Err = String;
43
44    fn from_str(s: &str) -> Result<Self, Self::Err> {
45        match s.trim().to_ascii_lowercase().as_str() {
46            "read-uncommitted" | "ru" => Ok(IsolationLevel::ReadUncommitted),
47            "read-committed" | "rc" => Ok(IsolationLevel::ReadCommitted),
48            "repeatable-read" | "rr" => Ok(IsolationLevel::RepeatableRead),
49            "serializable" | "sr" | "serial" => Ok(IsolationLevel::Serializable),
50            other => Err(format!("unknown isolation level '{}'", other)),
51        }
52    }
53}
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum TransactionState {
57    Running,
58    Tainted,
59    Committed,
60    Aborted,
61}
62
63#[derive(Debug, Clone)]
64pub enum UndoAction {
65    Insert {
66        table: Arc<TableHeap>,
67        rid: RecordId,
68        indexes: Vec<(Arc<BPlusTreeIndex>, Tuple)>,
69    },
70    Update {
71        table: Arc<TableHeap>,
72        rid: RecordId,
73        prev_meta: TupleMeta,
74        prev_tuple: Tuple,
75    },
76    Delete {
77        table: Arc<TableHeap>,
78        rid: RecordId,
79        prev_meta: TupleMeta,
80        prev_tuple: Tuple,
81    },
82}
83
84impl UndoAction {
85    pub fn undo(self, txn_id: TransactionId) -> QuillSQLResult<()> {
86        match self {
87            UndoAction::Insert {
88                table,
89                rid,
90                indexes,
91            } => {
92                for (index, key) in indexes.into_iter() {
93                    index.delete(&key)?;
94                }
95                table.recover_delete_tuple(rid, txn_id, 0)?;
96                Ok(())
97            }
98            UndoAction::Update {
99                table,
100                rid,
101                prev_meta,
102                prev_tuple,
103            } => {
104                table.recover_restore_tuple(rid, prev_meta, &prev_tuple)?;
105                Ok(())
106            }
107            UndoAction::Delete {
108                table,
109                rid,
110                prev_meta,
111                prev_tuple,
112            } => {
113                table.recover_restore_tuple(rid, prev_meta, &prev_tuple)?;
114                Ok(())
115            }
116        }
117    }
118
119    pub fn to_heap_payload(&self) -> QuillSQLResult<HeapRecordPayload> {
120        match self {
121            UndoAction::Insert { table, rid, .. } => {
122                let (meta, tuple) = table.full_tuple(*rid)?;
123                Ok(HeapRecordPayload::Delete(HeapDeletePayload {
124                    relation: table.relation_ident(),
125                    page_id: rid.page_id,
126                    slot_id: rid.slot_num as u16,
127                    op_txn_id: meta.insert_txn_id,
128                    old_tuple_meta: TupleMetaRepr::from(meta),
129                    old_tuple_data: Some(TupleCodec::encode(&tuple)),
130                }))
131            }
132            UndoAction::Update {
133                table,
134                rid,
135                prev_meta,
136                prev_tuple,
137                ..
138            } => Ok(HeapRecordPayload::Update(HeapUpdatePayload {
139                relation: table.relation_ident(),
140                page_id: rid.page_id,
141                slot_id: rid.slot_num as u16,
142                op_txn_id: prev_meta.insert_txn_id,
143                new_tuple_meta: TupleMetaRepr::from(*prev_meta),
144                new_tuple_data: TupleCodec::encode(prev_tuple),
145                old_tuple_meta: None,
146                old_tuple_data: None,
147            })),
148            UndoAction::Delete {
149                table,
150                rid,
151                prev_meta,
152                prev_tuple,
153                ..
154            } => Ok(HeapRecordPayload::Update(HeapUpdatePayload {
155                relation: table.relation_ident(),
156                page_id: rid.page_id,
157                slot_id: rid.slot_num as u16,
158                op_txn_id: prev_meta.insert_txn_id,
159                new_tuple_meta: TupleMetaRepr::from(*prev_meta),
160                new_tuple_data: TupleCodec::encode(prev_tuple),
161                old_tuple_meta: None,
162                old_tuple_data: None,
163            })),
164        }
165    }
166}
167
168pub struct Transaction {
169    id: TransactionId,
170    isolation_level: IsolationLevel,
171    access_mode: TransactionAccessMode,
172    state: TransactionState,
173    synchronous_commit: bool,
174    begin_lsn: Option<Lsn>,
175    last_lsn: Option<Lsn>,
176    undo_actions: Vec<UndoAction>,
177    current_command_id: CommandId,
178    next_command_id: CommandId,
179    snapshot: Option<TransactionSnapshot>,
180}
181
182impl Transaction {
183    pub fn new(
184        id: TransactionId,
185        isolation_level: IsolationLevel,
186        access_mode: TransactionAccessMode,
187        synchronous_commit: bool,
188    ) -> Self {
189        Self {
190            id,
191            isolation_level,
192            access_mode,
193            state: TransactionState::Running,
194            synchronous_commit,
195            begin_lsn: None,
196            last_lsn: None,
197            undo_actions: Vec::new(),
198            current_command_id: INVALID_COMMAND_ID,
199            next_command_id: 0,
200            snapshot: None,
201        }
202    }
203
204    pub fn id(&self) -> TransactionId {
205        self.id
206    }
207
208    pub fn isolation_level(&self) -> IsolationLevel {
209        self.isolation_level
210    }
211
212    pub fn access_mode(&self) -> TransactionAccessMode {
213        self.access_mode
214    }
215
216    pub fn set_isolation_level(&mut self, isolation_level: IsolationLevel) {
217        self.isolation_level = isolation_level;
218        if matches!(
219            isolation_level,
220            IsolationLevel::ReadCommitted | IsolationLevel::ReadUncommitted
221        ) {
222            self.clear_snapshot();
223        }
224    }
225
226    pub fn state(&self) -> TransactionState {
227        self.state
228    }
229
230    pub fn synchronous_commit(&self) -> bool {
231        self.synchronous_commit
232    }
233
234    pub fn begin_lsn(&self) -> Option<Lsn> {
235        self.begin_lsn
236    }
237
238    pub fn last_lsn(&self) -> Option<Lsn> {
239        self.last_lsn
240    }
241
242    pub fn begin_command(&mut self) -> CommandId {
243        let cid = self.next_command_id;
244        self.current_command_id = cid;
245        self.next_command_id = self.next_command_id.wrapping_add(1);
246        cid
247    }
248
249    pub fn current_command_id(&self) -> CommandId {
250        self.current_command_id
251    }
252
253    pub(crate) fn set_begin_lsn(&mut self, lsn: Lsn) {
254        self.begin_lsn = Some(lsn);
255        self.last_lsn = Some(lsn);
256    }
257
258    pub(crate) fn record_lsn(&mut self, lsn: Lsn) {
259        self.last_lsn = Some(lsn);
260    }
261
262    pub(crate) fn set_state(&mut self, state: TransactionState) {
263        self.state = state;
264    }
265
266    #[allow(dead_code)]
267    pub(crate) fn set_access_mode(&mut self, access_mode: TransactionAccessMode) {
268        self.access_mode = access_mode;
269    }
270
271    pub fn update_access_mode(&mut self, access_mode: TransactionAccessMode) {
272        self.access_mode = access_mode;
273    }
274
275    #[allow(dead_code)]
276    pub(crate) fn mark_tainted(&mut self) {
277        self.state = TransactionState::Tainted;
278    }
279
280    pub fn push_insert_undo(
281        &mut self,
282        table: Arc<TableHeap>,
283        rid: RecordId,
284        indexes: Vec<(Arc<BPlusTreeIndex>, Tuple)>,
285    ) {
286        self.undo_actions.push(UndoAction::Insert {
287            table,
288            rid,
289            indexes,
290        });
291    }
292
293    pub fn push_update_undo(
294        &mut self,
295        table: Arc<TableHeap>,
296        old_rid: RecordId,
297        new_rid: RecordId,
298        prev_meta: TupleMeta,
299        prev_tuple: Tuple,
300        new_keys: Vec<(Arc<BPlusTreeIndex>, Tuple)>,
301    ) {
302        self.undo_actions.push(UndoAction::Update {
303            table: table.clone(),
304            rid: old_rid,
305            prev_meta,
306            prev_tuple,
307        });
308        self.undo_actions.push(UndoAction::Insert {
309            table,
310            rid: new_rid,
311            indexes: new_keys,
312        });
313    }
314
315    pub fn push_delete_undo(
316        &mut self,
317        table: Arc<TableHeap>,
318        rid: RecordId,
319        prev_meta: TupleMeta,
320        prev_tuple: Tuple,
321    ) {
322        self.undo_actions.push(UndoAction::Delete {
323            table,
324            rid,
325            prev_meta,
326            prev_tuple,
327        });
328    }
329
330    pub fn pop_undo_action(&mut self) -> Option<UndoAction> {
331        self.undo_actions.pop()
332    }
333
334    pub fn clear_undo(&mut self) {
335        self.undo_actions.clear();
336    }
337
338    pub fn snapshot(&self) -> Option<&TransactionSnapshot> {
339        self.snapshot.as_ref()
340    }
341
342    pub fn set_snapshot(&mut self, snapshot: TransactionSnapshot) {
343        self.snapshot = Some(snapshot);
344    }
345
346    pub fn clear_snapshot(&mut self) {
347        self.snapshot = None;
348    }
349}