1use std::collections::HashSet;
2use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
3use std::sync::Arc;
4
5use crate::error::{QuillSQLError, QuillSQLResult};
6use crate::recovery::wal::codec::{ClrPayload, TransactionPayload, TransactionRecordKind};
7use crate::recovery::wal_record::WalRecordPayload;
8use crate::recovery::{Lsn, WalManager};
9use crate::storage::page::RecordId;
10use crate::transaction::{
11 IsolationLevel, LockManager, LockMode, Transaction, TransactionId, TransactionSnapshot,
12 TransactionState, TransactionStatus,
13};
14use crate::utils::table_ref::TableReference;
15use dashmap::{DashMap, DashSet};
16use sqlparser::ast::TransactionAccessMode;
17
18#[derive(Debug, Default)]
19struct HeldLocks {
20 tables: Vec<(TableReference, LockMode)>,
21 rows: Vec<(TableReference, RecordId, LockMode)>,
22 row_keys: HashSet<(TableReference, RecordId)>,
23 shared_rows: HashSet<(TableReference, RecordId)>,
24}
25
26pub struct TransactionManager {
27 wal: Arc<WalManager>,
28 next_txn_id: AtomicU64,
29 synchronous_commit: AtomicBool,
30 active_txns: DashSet<TransactionId>,
31 lock_manager: Arc<LockManager>,
32 held_locks: DashMap<TransactionId, HeldLocks>,
33 txn_statuses: DashMap<TransactionId, TransactionStatus>,
34}
35
36impl TransactionManager {
37 pub fn new(wal: Arc<WalManager>, synchronous_commit: bool) -> Self {
38 Self {
39 wal,
40 next_txn_id: AtomicU64::new(1),
41 synchronous_commit: AtomicBool::new(synchronous_commit),
42 active_txns: DashSet::new(),
43 lock_manager: Arc::new(LockManager::new()),
44 held_locks: DashMap::new(),
45 txn_statuses: DashMap::new(),
46 }
47 }
48
49 pub fn with_lock_manager(
50 wal: Arc<WalManager>,
51 synchronous_commit: bool,
52 lock_manager: Arc<LockManager>,
53 ) -> Self {
54 Self {
55 wal,
56 next_txn_id: AtomicU64::new(1),
57 synchronous_commit: AtomicBool::new(synchronous_commit),
58 active_txns: DashSet::new(),
59 lock_manager,
60 held_locks: DashMap::new(),
61 txn_statuses: DashMap::new(),
62 }
63 }
64
65 pub fn begin(
66 &self,
67 isolation_level: IsolationLevel,
68 access_mode: TransactionAccessMode,
69 ) -> QuillSQLResult<Transaction> {
70 let txn_id = self.next_txn_id.fetch_add(1, Ordering::SeqCst);
71 if txn_id == 0 {
72 return Err(QuillSQLError::Internal(
73 "Transaction ID wrapped around".to_string(),
74 ));
75 }
76 let sync_commit = self.synchronous_commit.load(Ordering::Relaxed);
77 let mut txn = Transaction::new(txn_id, isolation_level, access_mode, sync_commit);
78 let append = self.wal.append_record_with(|_| {
79 WalRecordPayload::Transaction(TransactionPayload {
80 marker: TransactionRecordKind::Begin,
81 txn_id,
82 })
83 })?;
84 txn.set_begin_lsn(append.end_lsn);
85 self.active_txns.insert(txn_id);
86 self.txn_statuses
87 .insert(txn_id, TransactionStatus::InProgress);
88 self.held_locks.insert(txn_id, HeldLocks::default());
89 Ok(txn)
90 }
91
92 pub fn acquire_table_lock(
93 &self,
94 txn: &Transaction,
95 table: TableReference,
96 mode: LockMode,
97 ) -> QuillSQLResult<()> {
98 if self.lock_manager.lock_table(txn, mode, table.clone()) {
99 if let Some(mut entry) = self.held_locks.get_mut(&txn.id()) {
100 entry.tables.push((table, mode));
101 } else {
102 let mut new_entry = HeldLocks::default();
103 new_entry.tables.push((table, mode));
104 self.held_locks.insert(txn.id(), new_entry);
105 }
106 Ok(())
107 } else {
108 Err(QuillSQLError::Internal(format!(
109 "Failed to acquire table lock for txn {}",
110 txn.id()
111 )))
112 }
113 }
114
115 pub fn try_acquire_row_lock(
116 &self,
117 txn: &Transaction,
118 table: TableReference,
119 rid: RecordId,
120 mode: LockMode,
121 ) -> QuillSQLResult<bool> {
122 let key = (table.clone(), rid);
123 if let Some(entry) = self.held_locks.get(&txn.id()) {
124 if entry.row_keys.contains(&key) {
125 return Ok(true);
126 }
127 }
128 if self.lock_manager.lock_row(txn, mode, table.clone(), rid) {
129 self.record_row_lock(txn.id(), table, rid, mode);
130 Ok(true)
131 } else {
132 Ok(false)
133 }
134 }
135
136 pub fn acquire_row_lock(
137 &self,
138 txn: &Transaction,
139 table: TableReference,
140 rid: RecordId,
141 mode: LockMode,
142 ) -> QuillSQLResult<()> {
143 if !self.try_acquire_row_lock(txn, table.clone(), rid, mode)? {
144 return Err(QuillSQLError::Internal(format!(
145 "Failed to acquire row lock for txn {}",
146 txn.id()
147 )));
148 }
149 Ok(())
150 }
151
152 pub fn commit(&self, txn: &mut Transaction) -> QuillSQLResult<()> {
153 match txn.state() {
154 TransactionState::Running | TransactionState::Tainted => {}
155 TransactionState::Committed => {
156 return Err(QuillSQLError::Internal(format!(
157 "Transaction {} already committed",
158 txn.id()
159 )))
160 }
161 TransactionState::Aborted => {
162 return Err(QuillSQLError::Internal(format!(
163 "Transaction {} already aborted",
164 txn.id()
165 )))
166 }
167 }
168
169 let txn_id = txn.id();
170 let append = self.wal.append_record_with(|_| {
171 WalRecordPayload::Transaction(TransactionPayload {
172 marker: TransactionRecordKind::Commit,
173 txn_id,
174 })
175 })?;
176 txn.record_lsn(append.end_lsn);
177 txn.set_state(TransactionState::Committed);
178
179 self.active_txns.remove(&txn_id);
180 self.txn_statuses
181 .insert(txn_id, TransactionStatus::Committed);
182 self.release_all_locks(txn_id);
183 txn.clear_undo();
184 txn.clear_snapshot();
185 self.finish_commit(txn, append.end_lsn)
186 }
187
188 pub fn abort(&self, txn: &mut Transaction) -> QuillSQLResult<()> {
189 match txn.state() {
190 TransactionState::Committed => {
191 return Err(QuillSQLError::Internal(format!(
192 "Transaction {} already committed",
193 txn.id()
194 )))
195 }
196 TransactionState::Aborted => return Ok(()),
197 TransactionState::Running | TransactionState::Tainted => {}
198 }
199
200 let txn_id = txn.id();
201 let mut undo_next: Option<Lsn> = None;
202 while let Some(action) = txn.pop_undo_action() {
203 let payload = action.to_heap_payload()?;
204 let clr_result = self.wal.append_record_with(|ctx| {
205 txn.record_lsn(ctx.end_lsn);
206 WalRecordPayload::Clr(ClrPayload {
207 txn_id,
208 undone_lsn: ctx.start_lsn,
209 undo_next_lsn: undo_next.unwrap_or(0),
210 })
211 })?;
212 txn.record_lsn(clr_result.end_lsn);
213 let heap_result = self.wal.append_record_with(|ctx| {
214 txn.record_lsn(ctx.end_lsn);
215 WalRecordPayload::Heap(payload.clone())
216 })?;
217 txn.record_lsn(heap_result.end_lsn);
218 action.undo(txn_id)?;
219 undo_next = Some(heap_result.start_lsn);
220 }
221
222 let append = self.wal.append_record_with(|_| {
223 WalRecordPayload::Transaction(TransactionPayload {
224 marker: TransactionRecordKind::Abort,
225 txn_id,
226 })
227 })?;
228 txn.record_lsn(append.end_lsn);
229 txn.set_state(TransactionState::Aborted);
230
231 self.active_txns.remove(&txn_id);
232 self.txn_statuses.insert(txn_id, TransactionStatus::Aborted);
233 self.release_all_locks(txn_id);
234 txn.clear_undo();
235 txn.clear_snapshot();
236 self.finish_commit(txn, append.end_lsn)
237 }
238
239 pub fn synchronous_commit(&self) -> bool {
240 self.synchronous_commit.load(Ordering::Relaxed)
241 }
242
243 pub fn set_synchronous_commit(&self, value: bool) {
244 self.synchronous_commit.store(value, Ordering::Relaxed);
245 }
246
247 pub fn active_transactions(&self) -> Vec<TransactionId> {
248 self.active_txns.iter().map(|txn| *txn).collect()
249 }
250
251 pub fn snapshot(&self, txn_id: TransactionId) -> TransactionSnapshot {
252 let active: Vec<TransactionId> = self
253 .active_txns
254 .iter()
255 .map(|id| *id)
256 .filter(|id| *id != txn_id)
257 .collect();
258 let xmax = self.next_txn_id.load(Ordering::SeqCst);
259 let xmin = active.iter().copied().min().unwrap_or(xmax);
260 TransactionSnapshot::new(txn_id, xmin, xmax, active)
261 }
262
263 pub fn transaction_status(&self, txn_id: TransactionId) -> TransactionStatus {
264 if txn_id == 0 {
265 return TransactionStatus::Committed;
266 }
267 self.txn_statuses
268 .get(&txn_id)
269 .map(|entry| *entry.value())
270 .unwrap_or(TransactionStatus::Unknown)
271 }
272
273 pub fn oldest_active_txn(&self) -> Option<TransactionId> {
274 self.active_txns.iter().map(|txn| *txn).min()
275 }
276
277 pub fn next_txn_id_hint(&self) -> TransactionId {
278 self.next_txn_id.load(Ordering::SeqCst)
279 }
280
281 fn finish_commit(&self, txn: &Transaction, lsn: Lsn) -> QuillSQLResult<()> {
282 if txn.synchronous_commit() {
283 self.wal.wait_for_durable(lsn)?;
284 } else {
285 let _ = self.wal.flush_until(lsn)?;
286 }
287 Ok(())
288 }
289
290 pub fn record_row_lock(
291 &self,
292 txn_id: TransactionId,
293 table: TableReference,
294 rid: RecordId,
295 mode: LockMode,
296 ) {
297 let mut entry = self.held_locks.entry(txn_id).or_default();
298 if entry.row_keys.insert((table.clone(), rid)) {
299 entry.rows.push((table, rid, mode));
300 }
301 }
302
303 pub fn remove_row_key_marker(
304 &self,
305 txn_id: TransactionId,
306 table: &TableReference,
307 rid: RecordId,
308 ) {
309 if let Some(mut entry) = self.held_locks.get_mut(&txn_id) {
310 entry.row_keys.remove(&(table.clone(), rid));
311 }
312 }
313
314 pub fn record_shared_row_lock(
315 &self,
316 txn_id: TransactionId,
317 table: TableReference,
318 rid: RecordId,
319 ) {
320 let mut entry = self.held_locks.entry(txn_id).or_default();
321 entry.shared_rows.insert((table, rid));
322 }
323
324 pub fn remove_shared_row_lock(
325 &self,
326 txn_id: TransactionId,
327 table: &TableReference,
328 rid: RecordId,
329 ) {
330 if let Some(mut entry) = self.held_locks.get_mut(&txn_id) {
331 entry.shared_rows.remove(&(table.clone(), rid));
332 }
333 }
334
335 pub fn try_unlock_shared_row(
336 &self,
337 txn_id: TransactionId,
338 table: &TableReference,
339 rid: RecordId,
340 ) -> QuillSQLResult<()> {
341 let _ = self.lock_manager.unlock_row_raw(txn_id, table.clone(), rid);
342 self.remove_shared_row_lock(txn_id, table, rid);
343 Ok(())
344 }
345
346 pub fn unlock_row(&self, txn_id: TransactionId, table: &TableReference, rid: RecordId) {
347 if self.lock_manager.unlock_row_raw(txn_id, table.clone(), rid) {
348 if let Some(mut entry) = self.held_locks.get_mut(&txn_id) {
349 entry.row_keys.remove(&(table.clone(), rid));
350 entry.rows.retain(|(t, r, _)| !(t == table && *r == rid));
351 entry.shared_rows.remove(&(table.clone(), rid));
352 }
353 }
354 }
355
356 fn release_all_locks(&self, txn_id: TransactionId) {
357 if let Some((_, mut held)) = self.held_locks.remove(&txn_id) {
358 for (table, rid, _) in held.rows.drain(..).rev() {
359 let _ = self.lock_manager.unlock_row_raw(txn_id, table, rid);
360 }
361 for (table, rid) in held.shared_rows.drain() {
362 let _ = self.lock_manager.unlock_row_raw(txn_id, table, rid);
363 }
364 for (table, _) in held.tables.drain(..).rev() {
365 let _ = self.lock_manager.unlock_table_raw(txn_id, table);
366 }
367 }
368 }
369}
370
371#[cfg(test)]
372mod tests {
373 use super::*;
374 use crate::config::WalConfig;
375 use crate::recovery::WalManager;
376 use crate::storage::page::TupleMeta;
377 use tempfile::TempDir;
378
379 fn build_wal(temp_dir: &TempDir) -> Arc<WalManager> {
380 let wal_path = temp_dir.path().join("wal");
381 let config = WalConfig {
382 directory: wal_path,
383 sync_on_flush: false,
384 ..WalConfig::default()
385 };
386 Arc::new(WalManager::new(config, None, None).expect("wal manager"))
387 }
388
389 #[test]
390 fn commit_waits_for_durable_when_sync() {
391 let temp = TempDir::new().expect("tempdir");
392 let wal = build_wal(&temp);
393 let manager = TransactionManager::new(wal.clone(), true);
394
395 let mut txn = manager
396 .begin(
397 IsolationLevel::ReadUncommitted,
398 TransactionAccessMode::ReadWrite,
399 )
400 .expect("begin txn");
401 manager.commit(&mut txn).expect("commit");
402
403 assert_eq!(txn.state(), TransactionState::Committed);
404 let lsn = txn.last_lsn().expect("commit lsn");
405 assert!(wal.durable_lsn() >= lsn);
406 }
407
408 #[test]
409 fn abort_records_wal_and_marks_state() {
410 let temp = TempDir::new().expect("tempdir");
411 let wal = build_wal(&temp);
412 let manager = TransactionManager::new(wal.clone(), false);
413
414 let mut txn = manager
415 .begin(
416 IsolationLevel::ReadUncommitted,
417 TransactionAccessMode::ReadWrite,
418 )
419 .expect("begin txn");
420 manager.abort(&mut txn).expect("abort");
421
422 assert_eq!(txn.state(), TransactionState::Aborted);
423 let lsn = txn.last_lsn().expect("abort lsn");
424 assert!(wal.durable_lsn() >= lsn);
426 }
427
428 #[test]
429 fn snapshot_excludes_running_insert_until_commit() {
430 let temp = TempDir::new().expect("tempdir");
431 let wal = build_wal(&temp);
432 let manager = TransactionManager::new(wal, true);
433
434 let mut writer = manager
435 .begin(
436 IsolationLevel::ReadCommitted,
437 TransactionAccessMode::ReadWrite,
438 )
439 .expect("writer");
440 let meta = TupleMeta::new(writer.id(), 0);
441
442 let mut reader = manager
443 .begin(
444 IsolationLevel::ReadCommitted,
445 TransactionAccessMode::ReadWrite,
446 )
447 .expect("reader");
448 let snapshot = manager.snapshot(reader.id());
449 assert!(
450 !snapshot.is_visible(&meta, 0, |tid| manager.transaction_status(tid)),
451 "running writer should not be visible",
452 );
453
454 manager.commit(&mut writer).expect("commit writer");
455 let snapshot_after_commit = manager.snapshot(reader.id());
456 assert!(snapshot_after_commit.is_visible(&meta, 0, |tid| manager.transaction_status(tid)));
457
458 manager.abort(&mut reader).expect("abort reader");
459 }
460
461 #[test]
462 fn snapshot_treats_committed_delete_as_invisible() {
463 let temp = TempDir::new().expect("tempdir");
464 let wal = build_wal(&temp);
465 let manager = TransactionManager::new(wal, true);
466
467 let mut inserter = manager
468 .begin(
469 IsolationLevel::ReadCommitted,
470 TransactionAccessMode::ReadWrite,
471 )
472 .expect("insert txn");
473 let mut meta = TupleMeta::new(inserter.id(), 0);
474 manager.commit(&mut inserter).expect("commit insert");
475
476 let mut deleter = manager
477 .begin(
478 IsolationLevel::ReadCommitted,
479 TransactionAccessMode::ReadWrite,
480 )
481 .expect("delete txn");
482 meta.mark_deleted(deleter.id(), 0);
483
484 let mut reader = manager
485 .begin(
486 IsolationLevel::ReadCommitted,
487 TransactionAccessMode::ReadWrite,
488 )
489 .expect("reader txn");
490
491 let before_commit = manager.snapshot(reader.id());
492 assert!(before_commit.is_visible(&meta, 0, |tid| manager.transaction_status(tid)));
493
494 manager.commit(&mut deleter).expect("commit delete");
495 let after_commit = manager.snapshot(reader.id());
496 assert!(
497 !after_commit.is_visible(&meta, 0, |tid| manager.transaction_status(tid)),
498 "committed delete should hide tuple",
499 );
500
501 manager.abort(&mut reader).expect("abort reader");
502 }
503}