quill_sql/transaction/
transaction.rs

1use crate::error::QuillSQLResult;
2use crate::recovery::Lsn;
3use crate::storage::codec::TupleCodec;
4use crate::storage::heap::wal_codec::{
5    HeapDeletePayload, HeapInsertPayload, HeapRecordPayload, TupleMetaRepr,
6};
7use crate::storage::index::btree_index::BPlusTreeIndex;
8use crate::storage::page::{RecordId, TupleMeta};
9use crate::storage::table_heap::TableHeap;
10use crate::storage::tuple::Tuple;
11use sqlparser::ast::TransactionAccessMode;
12use std::str::FromStr;
13use std::sync::Arc;
14
15use super::TransactionSnapshot;
16
17pub type TransactionId = u64;
18pub type CommandId = u32;
19
20pub const INVALID_COMMAND_ID: CommandId = CommandId::MAX;
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub enum IsolationLevel {
24    ReadUncommitted,
25    ReadCommitted,
26    RepeatableRead,
27    Serializable,
28}
29
30impl IsolationLevel {
31    pub fn as_str(&self) -> &'static str {
32        match self {
33            IsolationLevel::ReadUncommitted => "read-uncommitted",
34            IsolationLevel::ReadCommitted => "read-committed",
35            IsolationLevel::RepeatableRead => "repeatable-read",
36            IsolationLevel::Serializable => "serializable",
37        }
38    }
39}
40
41impl FromStr for IsolationLevel {
42    type Err = String;
43
44    fn from_str(s: &str) -> Result<Self, Self::Err> {
45        match s.trim().to_ascii_lowercase().as_str() {
46            "read-uncommitted" | "ru" => Ok(IsolationLevel::ReadUncommitted),
47            "read-committed" | "rc" => Ok(IsolationLevel::ReadCommitted),
48            "repeatable-read" | "rr" => Ok(IsolationLevel::RepeatableRead),
49            "serializable" | "sr" | "serial" => Ok(IsolationLevel::Serializable),
50            other => Err(format!("unknown isolation level '{}'", other)),
51        }
52    }
53}
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum TransactionState {
57    Running,
58    Tainted,
59    Committed,
60    Aborted,
61}
62
63#[derive(Debug, Clone)]
64pub enum UndoAction {
65    Insert {
66        table: Arc<TableHeap>,
67        rid: RecordId,
68        indexes: Vec<(Arc<BPlusTreeIndex>, Tuple)>,
69    },
70    Delete {
71        table: Arc<TableHeap>,
72        rid: RecordId,
73        prev_meta: TupleMeta,
74        prev_tuple: Tuple,
75        indexes: Vec<(Arc<BPlusTreeIndex>, Tuple)>,
76    },
77}
78
79impl UndoAction {
80    pub fn undo(self, txn_id: TransactionId) -> QuillSQLResult<()> {
81        match self {
82            UndoAction::Insert {
83                table,
84                rid,
85                indexes,
86            } => {
87                for (index, key) in indexes.into_iter() {
88                    index.delete_with_txn(&key, txn_id)?;
89                }
90                table.recover_delete_tuple(rid, txn_id, 0)?;
91                Ok(())
92            }
93            UndoAction::Delete {
94                table,
95                rid,
96                prev_meta,
97                prev_tuple,
98                indexes,
99            } => {
100                for (index, key) in indexes {
101                    index.insert_with_txn(&key, rid, txn_id)?;
102                }
103                table.recover_restore_tuple(rid, prev_meta, &prev_tuple)?;
104                Ok(())
105            }
106        }
107    }
108
109    pub fn to_heap_payload(&self, txn_id: TransactionId) -> QuillSQLResult<HeapRecordPayload> {
110        match self {
111            UndoAction::Insert { table, rid, .. } => {
112                let (meta, tuple) = table.full_tuple(*rid)?;
113                let mut deleted_meta = meta;
114                deleted_meta.mark_deleted(txn_id, 0);
115                Ok(HeapRecordPayload::Delete(HeapDeletePayload {
116                    relation: table.relation_ident(),
117                    page_id: rid.page_id,
118                    slot_id: rid.slot_num as u16,
119                    op_txn_id: meta.insert_txn_id,
120                    new_tuple_meta: TupleMetaRepr::from(deleted_meta),
121                    old_tuple_meta: TupleMetaRepr::from(meta),
122                    old_tuple_data: TupleCodec::encode(&tuple),
123                }))
124            }
125            UndoAction::Delete {
126                table,
127                rid,
128                prev_meta,
129                prev_tuple,
130                ..
131            } => Ok(HeapRecordPayload::Insert(HeapInsertPayload {
132                relation: table.relation_ident(),
133                page_id: rid.page_id,
134                slot_id: rid.slot_num as u16,
135                op_txn_id: txn_id,
136                tuple_meta: TupleMetaRepr::from(*prev_meta),
137                tuple_data: TupleCodec::encode(prev_tuple),
138            })),
139        }
140    }
141}
142
143pub struct Transaction {
144    id: TransactionId,
145    isolation_level: IsolationLevel,
146    access_mode: TransactionAccessMode,
147    state: TransactionState,
148    synchronous_commit: bool,
149    begin_lsn: Option<Lsn>,
150    last_lsn: Option<Lsn>,
151    undo_actions: Vec<UndoAction>,
152    current_command_id: CommandId,
153    next_command_id: CommandId,
154    snapshot: Option<TransactionSnapshot>,
155}
156
157impl Transaction {
158    pub fn new(
159        id: TransactionId,
160        isolation_level: IsolationLevel,
161        access_mode: TransactionAccessMode,
162        synchronous_commit: bool,
163    ) -> Self {
164        Self {
165            id,
166            isolation_level,
167            access_mode,
168            state: TransactionState::Running,
169            synchronous_commit,
170            begin_lsn: None,
171            last_lsn: None,
172            undo_actions: Vec::new(),
173            current_command_id: INVALID_COMMAND_ID,
174            next_command_id: 0,
175            snapshot: None,
176        }
177    }
178
179    pub fn id(&self) -> TransactionId {
180        self.id
181    }
182
183    pub fn isolation_level(&self) -> IsolationLevel {
184        self.isolation_level
185    }
186
187    pub fn access_mode(&self) -> TransactionAccessMode {
188        self.access_mode
189    }
190
191    pub fn set_isolation_level(&mut self, isolation_level: IsolationLevel) {
192        self.isolation_level = isolation_level;
193        if matches!(
194            isolation_level,
195            IsolationLevel::ReadCommitted | IsolationLevel::ReadUncommitted
196        ) {
197            self.clear_snapshot();
198        }
199    }
200
201    pub fn state(&self) -> TransactionState {
202        self.state
203    }
204
205    pub fn synchronous_commit(&self) -> bool {
206        self.synchronous_commit
207    }
208
209    pub fn begin_lsn(&self) -> Option<Lsn> {
210        self.begin_lsn
211    }
212
213    pub fn last_lsn(&self) -> Option<Lsn> {
214        self.last_lsn
215    }
216
217    pub fn begin_command(&mut self) -> CommandId {
218        let cid = self.next_command_id;
219        self.current_command_id = cid;
220        self.next_command_id = self.next_command_id.wrapping_add(1);
221        cid
222    }
223
224    pub fn current_command_id(&self) -> CommandId {
225        self.current_command_id
226    }
227
228    pub(crate) fn set_begin_lsn(&mut self, lsn: Lsn) {
229        self.begin_lsn = Some(lsn);
230        self.last_lsn = Some(lsn);
231    }
232
233    pub(crate) fn record_lsn(&mut self, lsn: Lsn) {
234        self.last_lsn = Some(lsn);
235    }
236
237    pub(crate) fn set_state(&mut self, state: TransactionState) {
238        self.state = state;
239    }
240
241    #[allow(dead_code)]
242    pub(crate) fn set_access_mode(&mut self, access_mode: TransactionAccessMode) {
243        self.access_mode = access_mode;
244    }
245
246    pub fn update_access_mode(&mut self, access_mode: TransactionAccessMode) {
247        self.access_mode = access_mode;
248    }
249
250    #[allow(dead_code)]
251    pub(crate) fn mark_tainted(&mut self) {
252        self.state = TransactionState::Tainted;
253    }
254
255    pub fn push_insert_undo(
256        &mut self,
257        table: Arc<TableHeap>,
258        rid: RecordId,
259        indexes: Vec<(Arc<BPlusTreeIndex>, Tuple)>,
260    ) {
261        self.undo_actions.push(UndoAction::Insert {
262            table,
263            rid,
264            indexes,
265        });
266    }
267
268    pub fn push_update_undo(
269        &mut self,
270        table: Arc<TableHeap>,
271        old_rid: RecordId,
272        new_rid: RecordId,
273        prev_meta: TupleMeta,
274        prev_tuple: Tuple,
275        new_keys: Vec<(Arc<BPlusTreeIndex>, Tuple)>,
276        old_keys: Vec<(Arc<BPlusTreeIndex>, Tuple)>,
277    ) {
278        self.undo_actions.push(UndoAction::Insert {
279            table: table.clone(),
280            rid: new_rid,
281            indexes: new_keys,
282        });
283        self.undo_actions.push(UndoAction::Delete {
284            table: table.clone(),
285            rid: old_rid,
286            prev_meta,
287            prev_tuple,
288            indexes: old_keys,
289        });
290    }
291
292    pub fn push_delete_undo(
293        &mut self,
294        table: Arc<TableHeap>,
295        rid: RecordId,
296        prev_meta: TupleMeta,
297        prev_tuple: Tuple,
298        indexes: Vec<(Arc<BPlusTreeIndex>, Tuple)>,
299    ) {
300        self.undo_actions.push(UndoAction::Delete {
301            table,
302            rid,
303            prev_meta,
304            prev_tuple,
305            indexes,
306        });
307    }
308
309    pub fn pop_undo_action(&mut self) -> Option<UndoAction> {
310        self.undo_actions.pop()
311    }
312
313    pub fn clear_undo(&mut self) {
314        self.undo_actions.clear();
315    }
316
317    pub fn snapshot(&self) -> Option<&TransactionSnapshot> {
318        self.snapshot.as_ref()
319    }
320
321    pub fn set_snapshot(&mut self, snapshot: TransactionSnapshot) {
322        self.snapshot = Some(snapshot);
323    }
324
325    pub fn clear_snapshot(&mut self) {
326        self.snapshot = None;
327    }
328}