quill_sql/transaction/
transaction_manager.rs

1use std::collections::HashSet;
2use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
3use std::sync::Arc;
4
5use crate::error::{QuillSQLError, QuillSQLResult};
6use crate::recovery::wal::codec::{ClrPayload, TransactionPayload, TransactionRecordKind};
7use crate::recovery::wal_record::WalRecordPayload;
8use crate::recovery::{Lsn, WalManager};
9use crate::storage::page::RecordId;
10use crate::transaction::{
11    IsolationLevel, LockManager, LockMode, Transaction, TransactionId, TransactionSnapshot,
12    TransactionState, TransactionStatus,
13};
14use crate::utils::table_ref::TableReference;
15use dashmap::{DashMap, DashSet};
16use serde::Serialize;
17use sqlparser::ast::TransactionAccessMode;
18
19#[derive(Debug, Default)]
20struct HeldLocks {
21    tables: Vec<(TableReference, LockMode)>,
22    rows: Vec<(TableReference, RecordId, LockMode)>,
23    row_keys: HashSet<(TableReference, RecordId)>,
24}
25
26pub struct TransactionManager {
27    wal: Arc<WalManager>,
28    next_txn_id: AtomicU64,
29    synchronous_commit: AtomicBool,
30    active_txns: DashSet<TransactionId>,
31    lock_manager: Arc<LockManager>,
32    held_locks: DashMap<TransactionId, HeldLocks>,
33    txn_statuses: DashMap<TransactionId, TransactionStatus>,
34}
35
36#[derive(Debug, Clone, Serialize)]
37pub struct TxnDebugEntry {
38    pub txn_id: TransactionId,
39    pub status: TransactionStatus,
40    pub held_tables: usize,
41    pub held_rows: usize,
42}
43
44#[derive(Debug, Clone, Serialize)]
45pub struct TxnDebugSnapshot {
46    pub active: Vec<TxnDebugEntry>,
47    pub oldest_active: Option<TransactionId>,
48    pub next_txn_id: TransactionId,
49}
50
51impl TransactionManager {
52    pub fn new(wal: Arc<WalManager>, synchronous_commit: bool) -> Self {
53        Self {
54            wal,
55            next_txn_id: AtomicU64::new(1),
56            synchronous_commit: AtomicBool::new(synchronous_commit),
57            active_txns: DashSet::new(),
58            lock_manager: Arc::new(LockManager::new()),
59            held_locks: DashMap::new(),
60            txn_statuses: DashMap::new(),
61        }
62    }
63
64    pub fn with_lock_manager(
65        wal: Arc<WalManager>,
66        synchronous_commit: bool,
67        lock_manager: Arc<LockManager>,
68    ) -> Self {
69        Self {
70            wal,
71            next_txn_id: AtomicU64::new(1),
72            synchronous_commit: AtomicBool::new(synchronous_commit),
73            active_txns: DashSet::new(),
74            lock_manager,
75            held_locks: DashMap::new(),
76            txn_statuses: DashMap::new(),
77        }
78    }
79
80    pub fn lock_manager_arc(&self) -> Arc<LockManager> {
81        self.lock_manager.clone()
82    }
83
84    pub fn begin(
85        &self,
86        isolation_level: IsolationLevel,
87        access_mode: TransactionAccessMode,
88    ) -> QuillSQLResult<Transaction> {
89        let txn_id = self.next_txn_id.fetch_add(1, Ordering::SeqCst);
90        if txn_id == 0 {
91            return Err(QuillSQLError::Internal(
92                "Transaction ID wrapped around".to_string(),
93            ));
94        }
95        let sync_commit = self.synchronous_commit.load(Ordering::Relaxed);
96        let mut txn = Transaction::new(txn_id, isolation_level, access_mode, sync_commit);
97        let append = self.wal.append_record_with(|_| {
98            WalRecordPayload::Transaction(TransactionPayload {
99                marker: TransactionRecordKind::Begin,
100                txn_id,
101            })
102        })?;
103        txn.set_begin_lsn(append.end_lsn);
104        self.active_txns.insert(txn_id);
105        self.txn_statuses
106            .insert(txn_id, TransactionStatus::InProgress);
107        self.held_locks.insert(txn_id, HeldLocks::default());
108        Ok(txn)
109    }
110
111    pub fn acquire_table_lock(
112        &self,
113        txn: &Transaction,
114        table: TableReference,
115        mode: LockMode,
116    ) -> QuillSQLResult<()> {
117        if self.lock_manager.lock_table(txn, mode, table.clone()) {
118            if let Some(mut entry) = self.held_locks.get_mut(&txn.id()) {
119                entry.tables.push((table, mode));
120            } else {
121                let mut new_entry = HeldLocks::default();
122                new_entry.tables.push((table, mode));
123                self.held_locks.insert(txn.id(), new_entry);
124            }
125            Ok(())
126        } else {
127            Err(QuillSQLError::Internal(format!(
128                "Failed to acquire table lock for txn {}",
129                txn.id()
130            )))
131        }
132    }
133
134    pub fn try_acquire_row_lock(
135        &self,
136        txn: &Transaction,
137        table: TableReference,
138        rid: RecordId,
139        mode: LockMode,
140    ) -> QuillSQLResult<bool> {
141        let key = (table.clone(), rid);
142        if let Some(entry) = self.held_locks.get(&txn.id()) {
143            if entry.row_keys.contains(&key) {
144                return Ok(true);
145            }
146        }
147        if self.lock_manager.lock_row(txn, mode, table.clone(), rid) {
148            self.record_row_lock(txn.id(), table, rid, mode);
149            Ok(true)
150        } else {
151            Ok(false)
152        }
153    }
154
155    pub fn acquire_row_lock(
156        &self,
157        txn: &Transaction,
158        table: TableReference,
159        rid: RecordId,
160        mode: LockMode,
161    ) -> QuillSQLResult<()> {
162        if !self.try_acquire_row_lock(txn, table.clone(), rid, mode)? {
163            return Err(QuillSQLError::Internal(format!(
164                "Failed to acquire row lock for txn {}",
165                txn.id()
166            )));
167        }
168        Ok(())
169    }
170
171    pub fn commit(&self, txn: &mut Transaction) -> QuillSQLResult<()> {
172        match txn.state() {
173            TransactionState::Running | TransactionState::Tainted => {}
174            TransactionState::Committed => {
175                return Err(QuillSQLError::Internal(format!(
176                    "Transaction {} already committed",
177                    txn.id()
178                )))
179            }
180            TransactionState::Aborted => {
181                return Err(QuillSQLError::Internal(format!(
182                    "Transaction {} already aborted",
183                    txn.id()
184                )))
185            }
186        }
187
188        let txn_id = txn.id();
189        let append = self.wal.append_record_with(|_| {
190            WalRecordPayload::Transaction(TransactionPayload {
191                marker: TransactionRecordKind::Commit,
192                txn_id,
193            })
194        })?;
195        txn.record_lsn(append.end_lsn);
196        txn.set_state(TransactionState::Committed);
197
198        self.active_txns.remove(&txn_id);
199        self.txn_statuses
200            .insert(txn_id, TransactionStatus::Committed);
201        self.release_all_locks(txn_id);
202        txn.clear_undo();
203        txn.clear_snapshot();
204        self.finish_commit(txn, append.end_lsn)
205    }
206
207    pub fn abort(&self, txn: &mut Transaction) -> QuillSQLResult<()> {
208        match txn.state() {
209            TransactionState::Committed => {
210                return Err(QuillSQLError::Internal(format!(
211                    "Transaction {} already committed",
212                    txn.id()
213                )))
214            }
215            TransactionState::Aborted => return Ok(()),
216            TransactionState::Running | TransactionState::Tainted => {}
217        }
218
219        let txn_id = txn.id();
220        let mut undo_next: Option<Lsn> = None;
221        while let Some(action) = txn.pop_undo_action() {
222            let payload = action.to_heap_payload(txn_id)?;
223            let clr_result = self.wal.append_record_with(|ctx| {
224                txn.record_lsn(ctx.end_lsn);
225                WalRecordPayload::Clr(ClrPayload {
226                    txn_id,
227                    undone_lsn: ctx.start_lsn,
228                    undo_next_lsn: undo_next.unwrap_or(0),
229                })
230            })?;
231            txn.record_lsn(clr_result.end_lsn);
232            let heap_result = self.wal.append_record_with(|ctx| {
233                txn.record_lsn(ctx.end_lsn);
234                WalRecordPayload::Heap(payload.clone())
235            })?;
236            txn.record_lsn(heap_result.end_lsn);
237            action.undo(txn_id)?;
238            undo_next = Some(heap_result.start_lsn);
239        }
240
241        let append = self.wal.append_record_with(|_| {
242            WalRecordPayload::Transaction(TransactionPayload {
243                marker: TransactionRecordKind::Abort,
244                txn_id,
245            })
246        })?;
247        txn.record_lsn(append.end_lsn);
248        txn.set_state(TransactionState::Aborted);
249
250        self.active_txns.remove(&txn_id);
251        self.txn_statuses.insert(txn_id, TransactionStatus::Aborted);
252        self.release_all_locks(txn_id);
253        txn.clear_undo();
254        txn.clear_snapshot();
255        self.finish_commit(txn, append.end_lsn)
256    }
257
258    pub fn synchronous_commit(&self) -> bool {
259        self.synchronous_commit.load(Ordering::Relaxed)
260    }
261
262    pub fn set_synchronous_commit(&self, value: bool) {
263        self.synchronous_commit.store(value, Ordering::Relaxed);
264    }
265
266    pub fn active_transactions(&self) -> Vec<TransactionId> {
267        self.active_txns.iter().map(|txn| *txn).collect()
268    }
269
270    pub fn snapshot(&self, txn_id: TransactionId) -> TransactionSnapshot {
271        let active: Vec<TransactionId> = self
272            .active_txns
273            .iter()
274            .map(|id| *id)
275            .filter(|id| *id != txn_id)
276            .collect();
277        let xmax = self.next_txn_id.load(Ordering::SeqCst);
278        let xmin = active.iter().copied().min().unwrap_or(xmax);
279        TransactionSnapshot::new(txn_id, xmin, xmax, active)
280    }
281
282    pub fn transaction_status(&self, txn_id: TransactionId) -> TransactionStatus {
283        if txn_id == 0 {
284            return TransactionStatus::Committed;
285        }
286        self.txn_statuses
287            .get(&txn_id)
288            .map(|entry| *entry.value())
289            .unwrap_or(TransactionStatus::Unknown)
290    }
291
292    pub fn oldest_active_txn(&self) -> Option<TransactionId> {
293        self.active_txns.iter().map(|txn| *txn).min()
294    }
295
296    pub fn next_txn_id_hint(&self) -> TransactionId {
297        self.next_txn_id.load(Ordering::SeqCst)
298    }
299
300    pub fn debug_snapshot(&self) -> TxnDebugSnapshot {
301        let active_ids = self.active_transactions();
302        let mut active = Vec::with_capacity(active_ids.len());
303        for txn_id in active_ids {
304            let status = self.transaction_status(txn_id);
305            let held = self.held_locks.get(&txn_id);
306            let (held_tables, held_rows) = held
307                .map(|locks| (locks.tables.len(), locks.rows.len()))
308                .unwrap_or((0, 0));
309            active.push(TxnDebugEntry {
310                txn_id,
311                status,
312                held_tables,
313                held_rows,
314            });
315        }
316        TxnDebugSnapshot {
317            active,
318            oldest_active: self.oldest_active_txn(),
319            next_txn_id: self.next_txn_id_hint(),
320        }
321    }
322
323    fn finish_commit(&self, txn: &Transaction, lsn: Lsn) -> QuillSQLResult<()> {
324        if txn.synchronous_commit() {
325            self.wal.wait_for_durable(lsn)?;
326        } else if !self.wal.has_background_writer() {
327            let _ = self.wal.flush_until(lsn)?;
328        }
329        Ok(())
330    }
331
332    pub fn record_row_lock(
333        &self,
334        txn_id: TransactionId,
335        table: TableReference,
336        rid: RecordId,
337        mode: LockMode,
338    ) {
339        let mut entry = self.held_locks.entry(txn_id).or_default();
340        if entry.row_keys.insert((table.clone(), rid)) {
341            entry.rows.push((table, rid, mode));
342        }
343    }
344
345    pub fn unlock_row(&self, txn_id: TransactionId, table: &TableReference, rid: RecordId) {
346        if self.lock_manager.unlock_row_raw(txn_id, table.clone(), rid) {
347            if let Some(mut entry) = self.held_locks.get_mut(&txn_id) {
348                entry.row_keys.remove(&(table.clone(), rid));
349                entry.rows.retain(|(t, r, _)| !(t == table && *r == rid));
350            }
351        }
352    }
353
354    fn release_all_locks(&self, txn_id: TransactionId) {
355        if let Some((_, mut held)) = self.held_locks.remove(&txn_id) {
356            for (table, rid, _) in held.rows.drain(..).rev() {
357                let _ = self.lock_manager.unlock_row_raw(txn_id, table, rid);
358            }
359            for (table, _) in held.tables.drain(..).rev() {
360                let _ = self.lock_manager.unlock_table_raw(txn_id, table);
361            }
362        }
363    }
364}
365
366#[cfg(test)]
367mod tests {
368    use super::*;
369    use crate::config::WalConfig;
370    use crate::recovery::WalManager;
371    use crate::storage::page::TupleMeta;
372    use tempfile::TempDir;
373
374    fn build_wal(temp_dir: &TempDir) -> Arc<WalManager> {
375        let wal_path = temp_dir.path().join("wal");
376        let config = WalConfig {
377            directory: wal_path,
378            sync_on_flush: false,
379            ..WalConfig::default()
380        };
381        Arc::new(WalManager::new(config, None, None).expect("wal manager"))
382    }
383
384    #[test]
385    fn commit_waits_for_durable_when_sync() {
386        let temp = TempDir::new().expect("tempdir");
387        let wal = build_wal(&temp);
388        let manager = TransactionManager::new(wal.clone(), true);
389
390        let mut txn = manager
391            .begin(
392                IsolationLevel::ReadUncommitted,
393                TransactionAccessMode::ReadWrite,
394            )
395            .expect("begin txn");
396        manager.commit(&mut txn).expect("commit");
397
398        assert_eq!(txn.state(), TransactionState::Committed);
399        let lsn = txn.last_lsn().expect("commit lsn");
400        assert!(wal.durable_lsn() >= lsn);
401    }
402
403    #[test]
404    fn abort_records_wal_and_marks_state() {
405        let temp = TempDir::new().expect("tempdir");
406        let wal = build_wal(&temp);
407        let manager = TransactionManager::new(wal.clone(), false);
408
409        let mut txn = manager
410            .begin(
411                IsolationLevel::ReadUncommitted,
412                TransactionAccessMode::ReadWrite,
413            )
414            .expect("begin txn");
415        manager.abort(&mut txn).expect("abort");
416
417        assert_eq!(txn.state(), TransactionState::Aborted);
418        let lsn = txn.last_lsn().expect("abort lsn");
419        // Async commit still triggers flush_until, so durable LSN should advance.
420        assert!(wal.durable_lsn() >= lsn);
421    }
422
423    #[test]
424    fn snapshot_excludes_running_insert_until_commit() {
425        let temp = TempDir::new().expect("tempdir");
426        let wal = build_wal(&temp);
427        let manager = TransactionManager::new(wal, true);
428
429        let mut writer = manager
430            .begin(
431                IsolationLevel::ReadCommitted,
432                TransactionAccessMode::ReadWrite,
433            )
434            .expect("writer");
435        let meta = TupleMeta::new(writer.id(), 0);
436
437        let mut reader = manager
438            .begin(
439                IsolationLevel::ReadCommitted,
440                TransactionAccessMode::ReadWrite,
441            )
442            .expect("reader");
443        let snapshot = manager.snapshot(reader.id());
444        assert!(
445            !snapshot.is_visible(&meta, 0, |tid| manager.transaction_status(tid)),
446            "running writer should not be visible",
447        );
448
449        manager.commit(&mut writer).expect("commit writer");
450        let snapshot_after_commit = manager.snapshot(reader.id());
451        assert!(snapshot_after_commit.is_visible(&meta, 0, |tid| manager.transaction_status(tid)));
452
453        manager.abort(&mut reader).expect("abort reader");
454    }
455
456    #[test]
457    fn snapshot_treats_committed_delete_as_invisible() {
458        let temp = TempDir::new().expect("tempdir");
459        let wal = build_wal(&temp);
460        let manager = TransactionManager::new(wal, true);
461
462        let mut inserter = manager
463            .begin(
464                IsolationLevel::ReadCommitted,
465                TransactionAccessMode::ReadWrite,
466            )
467            .expect("insert txn");
468        let mut meta = TupleMeta::new(inserter.id(), 0);
469        manager.commit(&mut inserter).expect("commit insert");
470
471        let mut deleter = manager
472            .begin(
473                IsolationLevel::ReadCommitted,
474                TransactionAccessMode::ReadWrite,
475            )
476            .expect("delete txn");
477        meta.mark_deleted(deleter.id(), 0);
478
479        let mut reader = manager
480            .begin(
481                IsolationLevel::ReadCommitted,
482                TransactionAccessMode::ReadWrite,
483            )
484            .expect("reader txn");
485
486        let before_commit = manager.snapshot(reader.id());
487        assert!(before_commit.is_visible(&meta, 0, |tid| manager.transaction_status(tid)));
488
489        manager.commit(&mut deleter).expect("commit delete");
490        let after_commit = manager.snapshot(reader.id());
491        assert!(
492            !after_commit.is_visible(&meta, 0, |tid| manager.transaction_status(tid)),
493            "committed delete should hide tuple",
494        );
495
496        manager.abort(&mut reader).expect("abort reader");
497    }
498}