quill_sql/transaction/
transaction_manager.rs

1use std::collections::HashSet;
2use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
3use std::sync::Arc;
4
5use crate::error::{QuillSQLError, QuillSQLResult};
6use crate::recovery::wal::codec::{ClrPayload, TransactionPayload, TransactionRecordKind};
7use crate::recovery::wal_record::WalRecordPayload;
8use crate::recovery::{Lsn, WalManager};
9use crate::storage::page::RecordId;
10use crate::transaction::{
11    IsolationLevel, LockManager, LockMode, Transaction, TransactionId, TransactionSnapshot,
12    TransactionState, TransactionStatus,
13};
14use crate::utils::table_ref::TableReference;
15use dashmap::{DashMap, DashSet};
16use sqlparser::ast::TransactionAccessMode;
17
18#[derive(Debug, Default)]
19struct HeldLocks {
20    tables: Vec<(TableReference, LockMode)>,
21    rows: Vec<(TableReference, RecordId, LockMode)>,
22    row_keys: HashSet<(TableReference, RecordId)>,
23    shared_rows: HashSet<(TableReference, RecordId)>,
24}
25
26pub struct TransactionManager {
27    wal: Arc<WalManager>,
28    next_txn_id: AtomicU64,
29    synchronous_commit: AtomicBool,
30    active_txns: DashSet<TransactionId>,
31    lock_manager: Arc<LockManager>,
32    held_locks: DashMap<TransactionId, HeldLocks>,
33    txn_statuses: DashMap<TransactionId, TransactionStatus>,
34}
35
36impl TransactionManager {
37    pub fn new(wal: Arc<WalManager>, synchronous_commit: bool) -> Self {
38        Self {
39            wal,
40            next_txn_id: AtomicU64::new(1),
41            synchronous_commit: AtomicBool::new(synchronous_commit),
42            active_txns: DashSet::new(),
43            lock_manager: Arc::new(LockManager::new()),
44            held_locks: DashMap::new(),
45            txn_statuses: DashMap::new(),
46        }
47    }
48
49    pub fn with_lock_manager(
50        wal: Arc<WalManager>,
51        synchronous_commit: bool,
52        lock_manager: Arc<LockManager>,
53    ) -> Self {
54        Self {
55            wal,
56            next_txn_id: AtomicU64::new(1),
57            synchronous_commit: AtomicBool::new(synchronous_commit),
58            active_txns: DashSet::new(),
59            lock_manager,
60            held_locks: DashMap::new(),
61            txn_statuses: DashMap::new(),
62        }
63    }
64
65    pub fn begin(
66        &self,
67        isolation_level: IsolationLevel,
68        access_mode: TransactionAccessMode,
69    ) -> QuillSQLResult<Transaction> {
70        let txn_id = self.next_txn_id.fetch_add(1, Ordering::SeqCst);
71        if txn_id == 0 {
72            return Err(QuillSQLError::Internal(
73                "Transaction ID wrapped around".to_string(),
74            ));
75        }
76        let sync_commit = self.synchronous_commit.load(Ordering::Relaxed);
77        let mut txn = Transaction::new(txn_id, isolation_level, access_mode, sync_commit);
78        let append = self.wal.append_record_with(|_| {
79            WalRecordPayload::Transaction(TransactionPayload {
80                marker: TransactionRecordKind::Begin,
81                txn_id,
82            })
83        })?;
84        txn.set_begin_lsn(append.end_lsn);
85        self.active_txns.insert(txn_id);
86        self.txn_statuses
87            .insert(txn_id, TransactionStatus::InProgress);
88        self.held_locks.insert(txn_id, HeldLocks::default());
89        Ok(txn)
90    }
91
92    pub fn acquire_table_lock(
93        &self,
94        txn: &Transaction,
95        table: TableReference,
96        mode: LockMode,
97    ) -> QuillSQLResult<()> {
98        if self.lock_manager.lock_table(txn, mode, table.clone()) {
99            if let Some(mut entry) = self.held_locks.get_mut(&txn.id()) {
100                entry.tables.push((table, mode));
101            } else {
102                let mut new_entry = HeldLocks::default();
103                new_entry.tables.push((table, mode));
104                self.held_locks.insert(txn.id(), new_entry);
105            }
106            Ok(())
107        } else {
108            Err(QuillSQLError::Internal(format!(
109                "Failed to acquire table lock for txn {}",
110                txn.id()
111            )))
112        }
113    }
114
115    pub fn try_acquire_row_lock(
116        &self,
117        txn: &Transaction,
118        table: TableReference,
119        rid: RecordId,
120        mode: LockMode,
121    ) -> QuillSQLResult<bool> {
122        let key = (table.clone(), rid);
123        if let Some(entry) = self.held_locks.get(&txn.id()) {
124            if entry.row_keys.contains(&key) {
125                return Ok(true);
126            }
127        }
128        if self.lock_manager.lock_row(txn, mode, table.clone(), rid) {
129            self.record_row_lock(txn.id(), table, rid, mode);
130            Ok(true)
131        } else {
132            Ok(false)
133        }
134    }
135
136    pub fn acquire_row_lock(
137        &self,
138        txn: &Transaction,
139        table: TableReference,
140        rid: RecordId,
141        mode: LockMode,
142    ) -> QuillSQLResult<()> {
143        if !self.try_acquire_row_lock(txn, table.clone(), rid, mode)? {
144            return Err(QuillSQLError::Internal(format!(
145                "Failed to acquire row lock for txn {}",
146                txn.id()
147            )));
148        }
149        Ok(())
150    }
151
152    pub fn commit(&self, txn: &mut Transaction) -> QuillSQLResult<()> {
153        match txn.state() {
154            TransactionState::Running | TransactionState::Tainted => {}
155            TransactionState::Committed => {
156                return Err(QuillSQLError::Internal(format!(
157                    "Transaction {} already committed",
158                    txn.id()
159                )))
160            }
161            TransactionState::Aborted => {
162                return Err(QuillSQLError::Internal(format!(
163                    "Transaction {} already aborted",
164                    txn.id()
165                )))
166            }
167        }
168
169        let txn_id = txn.id();
170        let append = self.wal.append_record_with(|_| {
171            WalRecordPayload::Transaction(TransactionPayload {
172                marker: TransactionRecordKind::Commit,
173                txn_id,
174            })
175        })?;
176        txn.record_lsn(append.end_lsn);
177        txn.set_state(TransactionState::Committed);
178
179        self.active_txns.remove(&txn_id);
180        self.txn_statuses
181            .insert(txn_id, TransactionStatus::Committed);
182        self.release_all_locks(txn_id);
183        txn.clear_undo();
184        txn.clear_snapshot();
185        self.finish_commit(txn, append.end_lsn)
186    }
187
188    pub fn abort(&self, txn: &mut Transaction) -> QuillSQLResult<()> {
189        match txn.state() {
190            TransactionState::Committed => {
191                return Err(QuillSQLError::Internal(format!(
192                    "Transaction {} already committed",
193                    txn.id()
194                )))
195            }
196            TransactionState::Aborted => return Ok(()),
197            TransactionState::Running | TransactionState::Tainted => {}
198        }
199
200        let txn_id = txn.id();
201        let mut undo_next: Option<Lsn> = None;
202        while let Some(action) = txn.pop_undo_action() {
203            let payload = action.to_heap_payload()?;
204            let clr_result = self.wal.append_record_with(|ctx| {
205                txn.record_lsn(ctx.end_lsn);
206                WalRecordPayload::Clr(ClrPayload {
207                    txn_id,
208                    undone_lsn: ctx.start_lsn,
209                    undo_next_lsn: undo_next.unwrap_or(0),
210                })
211            })?;
212            txn.record_lsn(clr_result.end_lsn);
213            let heap_result = self.wal.append_record_with(|ctx| {
214                txn.record_lsn(ctx.end_lsn);
215                WalRecordPayload::Heap(payload.clone())
216            })?;
217            txn.record_lsn(heap_result.end_lsn);
218            action.undo(txn_id)?;
219            undo_next = Some(heap_result.start_lsn);
220        }
221
222        let append = self.wal.append_record_with(|_| {
223            WalRecordPayload::Transaction(TransactionPayload {
224                marker: TransactionRecordKind::Abort,
225                txn_id,
226            })
227        })?;
228        txn.record_lsn(append.end_lsn);
229        txn.set_state(TransactionState::Aborted);
230
231        self.active_txns.remove(&txn_id);
232        self.txn_statuses.insert(txn_id, TransactionStatus::Aborted);
233        self.release_all_locks(txn_id);
234        txn.clear_undo();
235        txn.clear_snapshot();
236        self.finish_commit(txn, append.end_lsn)
237    }
238
239    pub fn synchronous_commit(&self) -> bool {
240        self.synchronous_commit.load(Ordering::Relaxed)
241    }
242
243    pub fn set_synchronous_commit(&self, value: bool) {
244        self.synchronous_commit.store(value, Ordering::Relaxed);
245    }
246
247    pub fn active_transactions(&self) -> Vec<TransactionId> {
248        self.active_txns.iter().map(|txn| *txn).collect()
249    }
250
251    pub fn snapshot(&self, txn_id: TransactionId) -> TransactionSnapshot {
252        let active: Vec<TransactionId> = self
253            .active_txns
254            .iter()
255            .map(|id| *id)
256            .filter(|id| *id != txn_id)
257            .collect();
258        let xmax = self.next_txn_id.load(Ordering::SeqCst);
259        let xmin = active.iter().copied().min().unwrap_or(xmax);
260        TransactionSnapshot::new(txn_id, xmin, xmax, active)
261    }
262
263    pub fn transaction_status(&self, txn_id: TransactionId) -> TransactionStatus {
264        if txn_id == 0 {
265            return TransactionStatus::Committed;
266        }
267        self.txn_statuses
268            .get(&txn_id)
269            .map(|entry| *entry.value())
270            .unwrap_or(TransactionStatus::Unknown)
271    }
272
273    pub fn oldest_active_txn(&self) -> Option<TransactionId> {
274        self.active_txns.iter().map(|txn| *txn).min()
275    }
276
277    pub fn next_txn_id_hint(&self) -> TransactionId {
278        self.next_txn_id.load(Ordering::SeqCst)
279    }
280
281    fn finish_commit(&self, txn: &Transaction, lsn: Lsn) -> QuillSQLResult<()> {
282        if txn.synchronous_commit() {
283            self.wal.wait_for_durable(lsn)?;
284        } else {
285            let _ = self.wal.flush_until(lsn)?;
286        }
287        Ok(())
288    }
289
290    pub fn record_row_lock(
291        &self,
292        txn_id: TransactionId,
293        table: TableReference,
294        rid: RecordId,
295        mode: LockMode,
296    ) {
297        let mut entry = self.held_locks.entry(txn_id).or_default();
298        if entry.row_keys.insert((table.clone(), rid)) {
299            entry.rows.push((table, rid, mode));
300        }
301    }
302
303    pub fn remove_row_key_marker(
304        &self,
305        txn_id: TransactionId,
306        table: &TableReference,
307        rid: RecordId,
308    ) {
309        if let Some(mut entry) = self.held_locks.get_mut(&txn_id) {
310            entry.row_keys.remove(&(table.clone(), rid));
311        }
312    }
313
314    pub fn record_shared_row_lock(
315        &self,
316        txn_id: TransactionId,
317        table: TableReference,
318        rid: RecordId,
319    ) {
320        let mut entry = self.held_locks.entry(txn_id).or_default();
321        entry.shared_rows.insert((table, rid));
322    }
323
324    pub fn remove_shared_row_lock(
325        &self,
326        txn_id: TransactionId,
327        table: &TableReference,
328        rid: RecordId,
329    ) {
330        if let Some(mut entry) = self.held_locks.get_mut(&txn_id) {
331            entry.shared_rows.remove(&(table.clone(), rid));
332        }
333    }
334
335    pub fn try_unlock_shared_row(
336        &self,
337        txn_id: TransactionId,
338        table: &TableReference,
339        rid: RecordId,
340    ) -> QuillSQLResult<()> {
341        let _ = self.lock_manager.unlock_row_raw(txn_id, table.clone(), rid);
342        self.remove_shared_row_lock(txn_id, table, rid);
343        Ok(())
344    }
345
346    pub fn unlock_row(&self, txn_id: TransactionId, table: &TableReference, rid: RecordId) {
347        if self.lock_manager.unlock_row_raw(txn_id, table.clone(), rid) {
348            if let Some(mut entry) = self.held_locks.get_mut(&txn_id) {
349                entry.row_keys.remove(&(table.clone(), rid));
350                entry.rows.retain(|(t, r, _)| !(t == table && *r == rid));
351                entry.shared_rows.remove(&(table.clone(), rid));
352            }
353        }
354    }
355
356    fn release_all_locks(&self, txn_id: TransactionId) {
357        if let Some((_, mut held)) = self.held_locks.remove(&txn_id) {
358            for (table, rid, _) in held.rows.drain(..).rev() {
359                let _ = self.lock_manager.unlock_row_raw(txn_id, table, rid);
360            }
361            for (table, rid) in held.shared_rows.drain() {
362                let _ = self.lock_manager.unlock_row_raw(txn_id, table, rid);
363            }
364            for (table, _) in held.tables.drain(..).rev() {
365                let _ = self.lock_manager.unlock_table_raw(txn_id, table);
366            }
367        }
368    }
369}
370
371#[cfg(test)]
372mod tests {
373    use super::*;
374    use crate::config::WalConfig;
375    use crate::recovery::WalManager;
376    use crate::storage::page::TupleMeta;
377    use tempfile::TempDir;
378
379    fn build_wal(temp_dir: &TempDir) -> Arc<WalManager> {
380        let wal_path = temp_dir.path().join("wal");
381        let config = WalConfig {
382            directory: wal_path,
383            sync_on_flush: false,
384            ..WalConfig::default()
385        };
386        Arc::new(WalManager::new(config, None, None).expect("wal manager"))
387    }
388
389    #[test]
390    fn commit_waits_for_durable_when_sync() {
391        let temp = TempDir::new().expect("tempdir");
392        let wal = build_wal(&temp);
393        let manager = TransactionManager::new(wal.clone(), true);
394
395        let mut txn = manager
396            .begin(
397                IsolationLevel::ReadUncommitted,
398                TransactionAccessMode::ReadWrite,
399            )
400            .expect("begin txn");
401        manager.commit(&mut txn).expect("commit");
402
403        assert_eq!(txn.state(), TransactionState::Committed);
404        let lsn = txn.last_lsn().expect("commit lsn");
405        assert!(wal.durable_lsn() >= lsn);
406    }
407
408    #[test]
409    fn abort_records_wal_and_marks_state() {
410        let temp = TempDir::new().expect("tempdir");
411        let wal = build_wal(&temp);
412        let manager = TransactionManager::new(wal.clone(), false);
413
414        let mut txn = manager
415            .begin(
416                IsolationLevel::ReadUncommitted,
417                TransactionAccessMode::ReadWrite,
418            )
419            .expect("begin txn");
420        manager.abort(&mut txn).expect("abort");
421
422        assert_eq!(txn.state(), TransactionState::Aborted);
423        let lsn = txn.last_lsn().expect("abort lsn");
424        // Async commit still triggers flush_until, so durable LSN should advance.
425        assert!(wal.durable_lsn() >= lsn);
426    }
427
428    #[test]
429    fn snapshot_excludes_running_insert_until_commit() {
430        let temp = TempDir::new().expect("tempdir");
431        let wal = build_wal(&temp);
432        let manager = TransactionManager::new(wal, true);
433
434        let mut writer = manager
435            .begin(
436                IsolationLevel::ReadCommitted,
437                TransactionAccessMode::ReadWrite,
438            )
439            .expect("writer");
440        let meta = TupleMeta::new(writer.id(), 0);
441
442        let mut reader = manager
443            .begin(
444                IsolationLevel::ReadCommitted,
445                TransactionAccessMode::ReadWrite,
446            )
447            .expect("reader");
448        let snapshot = manager.snapshot(reader.id());
449        assert!(
450            !snapshot.is_visible(&meta, 0, |tid| manager.transaction_status(tid)),
451            "running writer should not be visible",
452        );
453
454        manager.commit(&mut writer).expect("commit writer");
455        let snapshot_after_commit = manager.snapshot(reader.id());
456        assert!(snapshot_after_commit.is_visible(&meta, 0, |tid| manager.transaction_status(tid)));
457
458        manager.abort(&mut reader).expect("abort reader");
459    }
460
461    #[test]
462    fn snapshot_treats_committed_delete_as_invisible() {
463        let temp = TempDir::new().expect("tempdir");
464        let wal = build_wal(&temp);
465        let manager = TransactionManager::new(wal, true);
466
467        let mut inserter = manager
468            .begin(
469                IsolationLevel::ReadCommitted,
470                TransactionAccessMode::ReadWrite,
471            )
472            .expect("insert txn");
473        let mut meta = TupleMeta::new(inserter.id(), 0);
474        manager.commit(&mut inserter).expect("commit insert");
475
476        let mut deleter = manager
477            .begin(
478                IsolationLevel::ReadCommitted,
479                TransactionAccessMode::ReadWrite,
480            )
481            .expect("delete txn");
482        meta.mark_deleted(deleter.id(), 0);
483
484        let mut reader = manager
485            .begin(
486                IsolationLevel::ReadCommitted,
487                TransactionAccessMode::ReadWrite,
488            )
489            .expect("reader txn");
490
491        let before_commit = manager.snapshot(reader.id());
492        assert!(before_commit.is_visible(&meta, 0, |tid| manager.transaction_status(tid)));
493
494        manager.commit(&mut deleter).expect("commit delete");
495        let after_commit = manager.snapshot(reader.id());
496        assert!(
497            !after_commit.is_visible(&meta, 0, |tid| manager.transaction_status(tid)),
498            "committed delete should hide tuple",
499        );
500
501        manager.abort(&mut reader).expect("abort reader");
502    }
503}