1use std::collections::HashSet;
2use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
3use std::sync::Arc;
4
5use crate::error::{QuillSQLError, QuillSQLResult};
6use crate::recovery::wal::codec::{ClrPayload, TransactionPayload, TransactionRecordKind};
7use crate::recovery::wal_record::WalRecordPayload;
8use crate::recovery::{Lsn, WalManager};
9use crate::storage::page::RecordId;
10use crate::transaction::{
11 IsolationLevel, LockManager, LockMode, Transaction, TransactionId, TransactionSnapshot,
12 TransactionState, TransactionStatus,
13};
14use crate::utils::table_ref::TableReference;
15use dashmap::{DashMap, DashSet};
16use serde::Serialize;
17use sqlparser::ast::TransactionAccessMode;
18
19#[derive(Debug, Default)]
20struct HeldLocks {
21 tables: Vec<(TableReference, LockMode)>,
22 rows: Vec<(TableReference, RecordId, LockMode)>,
23 row_keys: HashSet<(TableReference, RecordId)>,
24}
25
26pub struct TransactionManager {
27 wal: Arc<WalManager>,
28 next_txn_id: AtomicU64,
29 synchronous_commit: AtomicBool,
30 active_txns: DashSet<TransactionId>,
31 lock_manager: Arc<LockManager>,
32 held_locks: DashMap<TransactionId, HeldLocks>,
33 txn_statuses: DashMap<TransactionId, TransactionStatus>,
34}
35
36#[derive(Debug, Clone, Serialize)]
37pub struct TxnDebugEntry {
38 pub txn_id: TransactionId,
39 pub status: TransactionStatus,
40 pub held_tables: usize,
41 pub held_rows: usize,
42}
43
44#[derive(Debug, Clone, Serialize)]
45pub struct TxnDebugSnapshot {
46 pub active: Vec<TxnDebugEntry>,
47 pub oldest_active: Option<TransactionId>,
48 pub next_txn_id: TransactionId,
49}
50
51impl TransactionManager {
52 pub fn new(wal: Arc<WalManager>, synchronous_commit: bool) -> Self {
53 Self {
54 wal,
55 next_txn_id: AtomicU64::new(1),
56 synchronous_commit: AtomicBool::new(synchronous_commit),
57 active_txns: DashSet::new(),
58 lock_manager: Arc::new(LockManager::new()),
59 held_locks: DashMap::new(),
60 txn_statuses: DashMap::new(),
61 }
62 }
63
64 pub fn with_lock_manager(
65 wal: Arc<WalManager>,
66 synchronous_commit: bool,
67 lock_manager: Arc<LockManager>,
68 ) -> Self {
69 Self {
70 wal,
71 next_txn_id: AtomicU64::new(1),
72 synchronous_commit: AtomicBool::new(synchronous_commit),
73 active_txns: DashSet::new(),
74 lock_manager,
75 held_locks: DashMap::new(),
76 txn_statuses: DashMap::new(),
77 }
78 }
79
80 pub fn lock_manager_arc(&self) -> Arc<LockManager> {
81 self.lock_manager.clone()
82 }
83
84 pub fn begin(
85 &self,
86 isolation_level: IsolationLevel,
87 access_mode: TransactionAccessMode,
88 ) -> QuillSQLResult<Transaction> {
89 let txn_id = self.next_txn_id.fetch_add(1, Ordering::SeqCst);
90 if txn_id == 0 {
91 return Err(QuillSQLError::Internal(
92 "Transaction ID wrapped around".to_string(),
93 ));
94 }
95 let sync_commit = self.synchronous_commit.load(Ordering::Relaxed);
96 let mut txn = Transaction::new(txn_id, isolation_level, access_mode, sync_commit);
97 let append = self.wal.append_record_with(|_| {
98 WalRecordPayload::Transaction(TransactionPayload {
99 marker: TransactionRecordKind::Begin,
100 txn_id,
101 })
102 })?;
103 txn.set_begin_lsn(append.end_lsn);
104 self.active_txns.insert(txn_id);
105 self.txn_statuses
106 .insert(txn_id, TransactionStatus::InProgress);
107 self.held_locks.insert(txn_id, HeldLocks::default());
108 Ok(txn)
109 }
110
111 pub fn acquire_table_lock(
112 &self,
113 txn: &Transaction,
114 table: TableReference,
115 mode: LockMode,
116 ) -> QuillSQLResult<()> {
117 if self.lock_manager.lock_table(txn, mode, table.clone()) {
118 if let Some(mut entry) = self.held_locks.get_mut(&txn.id()) {
119 entry.tables.push((table, mode));
120 } else {
121 let mut new_entry = HeldLocks::default();
122 new_entry.tables.push((table, mode));
123 self.held_locks.insert(txn.id(), new_entry);
124 }
125 Ok(())
126 } else {
127 Err(QuillSQLError::Internal(format!(
128 "Failed to acquire table lock for txn {}",
129 txn.id()
130 )))
131 }
132 }
133
134 pub fn try_acquire_row_lock(
135 &self,
136 txn: &Transaction,
137 table: TableReference,
138 rid: RecordId,
139 mode: LockMode,
140 ) -> QuillSQLResult<bool> {
141 let key = (table.clone(), rid);
142 if let Some(entry) = self.held_locks.get(&txn.id()) {
143 if entry.row_keys.contains(&key) {
144 return Ok(true);
145 }
146 }
147 if self.lock_manager.lock_row(txn, mode, table.clone(), rid) {
148 self.record_row_lock(txn.id(), table, rid, mode);
149 Ok(true)
150 } else {
151 Ok(false)
152 }
153 }
154
155 pub fn acquire_row_lock(
156 &self,
157 txn: &Transaction,
158 table: TableReference,
159 rid: RecordId,
160 mode: LockMode,
161 ) -> QuillSQLResult<()> {
162 if !self.try_acquire_row_lock(txn, table.clone(), rid, mode)? {
163 return Err(QuillSQLError::Internal(format!(
164 "Failed to acquire row lock for txn {}",
165 txn.id()
166 )));
167 }
168 Ok(())
169 }
170
171 pub fn commit(&self, txn: &mut Transaction) -> QuillSQLResult<()> {
172 match txn.state() {
173 TransactionState::Running | TransactionState::Tainted => {}
174 TransactionState::Committed => {
175 return Err(QuillSQLError::Internal(format!(
176 "Transaction {} already committed",
177 txn.id()
178 )))
179 }
180 TransactionState::Aborted => {
181 return Err(QuillSQLError::Internal(format!(
182 "Transaction {} already aborted",
183 txn.id()
184 )))
185 }
186 }
187
188 let txn_id = txn.id();
189 let append = self.wal.append_record_with(|_| {
190 WalRecordPayload::Transaction(TransactionPayload {
191 marker: TransactionRecordKind::Commit,
192 txn_id,
193 })
194 })?;
195 txn.record_lsn(append.end_lsn);
196 txn.set_state(TransactionState::Committed);
197
198 self.active_txns.remove(&txn_id);
199 self.txn_statuses
200 .insert(txn_id, TransactionStatus::Committed);
201 self.release_all_locks(txn_id);
202 txn.clear_undo();
203 txn.clear_snapshot();
204 self.finish_commit(txn, append.end_lsn)
205 }
206
207 pub fn abort(&self, txn: &mut Transaction) -> QuillSQLResult<()> {
208 match txn.state() {
209 TransactionState::Committed => {
210 return Err(QuillSQLError::Internal(format!(
211 "Transaction {} already committed",
212 txn.id()
213 )))
214 }
215 TransactionState::Aborted => return Ok(()),
216 TransactionState::Running | TransactionState::Tainted => {}
217 }
218
219 let txn_id = txn.id();
220 let mut undo_next: Option<Lsn> = None;
221 while let Some(action) = txn.pop_undo_action() {
222 let payload = action.to_heap_payload(txn_id)?;
223 let clr_result = self.wal.append_record_with(|ctx| {
224 txn.record_lsn(ctx.end_lsn);
225 WalRecordPayload::Clr(ClrPayload {
226 txn_id,
227 undone_lsn: ctx.start_lsn,
228 undo_next_lsn: undo_next.unwrap_or(0),
229 })
230 })?;
231 txn.record_lsn(clr_result.end_lsn);
232 let heap_result = self.wal.append_record_with(|ctx| {
233 txn.record_lsn(ctx.end_lsn);
234 WalRecordPayload::Heap(payload.clone())
235 })?;
236 txn.record_lsn(heap_result.end_lsn);
237 action.undo(txn_id)?;
238 undo_next = Some(heap_result.start_lsn);
239 }
240
241 let append = self.wal.append_record_with(|_| {
242 WalRecordPayload::Transaction(TransactionPayload {
243 marker: TransactionRecordKind::Abort,
244 txn_id,
245 })
246 })?;
247 txn.record_lsn(append.end_lsn);
248 txn.set_state(TransactionState::Aborted);
249
250 self.active_txns.remove(&txn_id);
251 self.txn_statuses.insert(txn_id, TransactionStatus::Aborted);
252 self.release_all_locks(txn_id);
253 txn.clear_undo();
254 txn.clear_snapshot();
255 self.finish_commit(txn, append.end_lsn)
256 }
257
258 pub fn synchronous_commit(&self) -> bool {
259 self.synchronous_commit.load(Ordering::Relaxed)
260 }
261
262 pub fn set_synchronous_commit(&self, value: bool) {
263 self.synchronous_commit.store(value, Ordering::Relaxed);
264 }
265
266 pub fn active_transactions(&self) -> Vec<TransactionId> {
267 self.active_txns.iter().map(|txn| *txn).collect()
268 }
269
270 pub fn snapshot(&self, txn_id: TransactionId) -> TransactionSnapshot {
271 let active: Vec<TransactionId> = self
272 .active_txns
273 .iter()
274 .map(|id| *id)
275 .filter(|id| *id != txn_id)
276 .collect();
277 let xmax = self.next_txn_id.load(Ordering::SeqCst);
278 let xmin = active.iter().copied().min().unwrap_or(xmax);
279 TransactionSnapshot::new(txn_id, xmin, xmax, active)
280 }
281
282 pub fn transaction_status(&self, txn_id: TransactionId) -> TransactionStatus {
283 if txn_id == 0 {
284 return TransactionStatus::Committed;
285 }
286 self.txn_statuses
287 .get(&txn_id)
288 .map(|entry| *entry.value())
289 .unwrap_or(TransactionStatus::Unknown)
290 }
291
292 pub fn oldest_active_txn(&self) -> Option<TransactionId> {
293 self.active_txns.iter().map(|txn| *txn).min()
294 }
295
296 pub fn next_txn_id_hint(&self) -> TransactionId {
297 self.next_txn_id.load(Ordering::SeqCst)
298 }
299
300 pub fn debug_snapshot(&self) -> TxnDebugSnapshot {
301 let active_ids = self.active_transactions();
302 let mut active = Vec::with_capacity(active_ids.len());
303 for txn_id in active_ids {
304 let status = self.transaction_status(txn_id);
305 let held = self.held_locks.get(&txn_id);
306 let (held_tables, held_rows) = held
307 .map(|locks| (locks.tables.len(), locks.rows.len()))
308 .unwrap_or((0, 0));
309 active.push(TxnDebugEntry {
310 txn_id,
311 status,
312 held_tables,
313 held_rows,
314 });
315 }
316 TxnDebugSnapshot {
317 active,
318 oldest_active: self.oldest_active_txn(),
319 next_txn_id: self.next_txn_id_hint(),
320 }
321 }
322
323 fn finish_commit(&self, txn: &Transaction, lsn: Lsn) -> QuillSQLResult<()> {
324 if txn.synchronous_commit() {
325 self.wal.wait_for_durable(lsn)?;
326 } else if !self.wal.has_background_writer() {
327 let _ = self.wal.flush_until(lsn)?;
328 }
329 Ok(())
330 }
331
332 pub fn record_row_lock(
333 &self,
334 txn_id: TransactionId,
335 table: TableReference,
336 rid: RecordId,
337 mode: LockMode,
338 ) {
339 let mut entry = self.held_locks.entry(txn_id).or_default();
340 if entry.row_keys.insert((table.clone(), rid)) {
341 entry.rows.push((table, rid, mode));
342 }
343 }
344
345 pub fn unlock_row(&self, txn_id: TransactionId, table: &TableReference, rid: RecordId) {
346 if self.lock_manager.unlock_row_raw(txn_id, table.clone(), rid) {
347 if let Some(mut entry) = self.held_locks.get_mut(&txn_id) {
348 entry.row_keys.remove(&(table.clone(), rid));
349 entry.rows.retain(|(t, r, _)| !(t == table && *r == rid));
350 }
351 }
352 }
353
354 fn release_all_locks(&self, txn_id: TransactionId) {
355 if let Some((_, mut held)) = self.held_locks.remove(&txn_id) {
356 for (table, rid, _) in held.rows.drain(..).rev() {
357 let _ = self.lock_manager.unlock_row_raw(txn_id, table, rid);
358 }
359 for (table, _) in held.tables.drain(..).rev() {
360 let _ = self.lock_manager.unlock_table_raw(txn_id, table);
361 }
362 }
363 }
364}
365
366#[cfg(test)]
367mod tests {
368 use super::*;
369 use crate::config::WalConfig;
370 use crate::recovery::WalManager;
371 use crate::storage::page::TupleMeta;
372 use tempfile::TempDir;
373
374 fn build_wal(temp_dir: &TempDir) -> Arc<WalManager> {
375 let wal_path = temp_dir.path().join("wal");
376 let config = WalConfig {
377 directory: wal_path,
378 sync_on_flush: false,
379 ..WalConfig::default()
380 };
381 Arc::new(WalManager::new(config, None, None).expect("wal manager"))
382 }
383
384 #[test]
385 fn commit_waits_for_durable_when_sync() {
386 let temp = TempDir::new().expect("tempdir");
387 let wal = build_wal(&temp);
388 let manager = TransactionManager::new(wal.clone(), true);
389
390 let mut txn = manager
391 .begin(
392 IsolationLevel::ReadUncommitted,
393 TransactionAccessMode::ReadWrite,
394 )
395 .expect("begin txn");
396 manager.commit(&mut txn).expect("commit");
397
398 assert_eq!(txn.state(), TransactionState::Committed);
399 let lsn = txn.last_lsn().expect("commit lsn");
400 assert!(wal.durable_lsn() >= lsn);
401 }
402
403 #[test]
404 fn abort_records_wal_and_marks_state() {
405 let temp = TempDir::new().expect("tempdir");
406 let wal = build_wal(&temp);
407 let manager = TransactionManager::new(wal.clone(), false);
408
409 let mut txn = manager
410 .begin(
411 IsolationLevel::ReadUncommitted,
412 TransactionAccessMode::ReadWrite,
413 )
414 .expect("begin txn");
415 manager.abort(&mut txn).expect("abort");
416
417 assert_eq!(txn.state(), TransactionState::Aborted);
418 let lsn = txn.last_lsn().expect("abort lsn");
419 assert!(wal.durable_lsn() >= lsn);
421 }
422
423 #[test]
424 fn snapshot_excludes_running_insert_until_commit() {
425 let temp = TempDir::new().expect("tempdir");
426 let wal = build_wal(&temp);
427 let manager = TransactionManager::new(wal, true);
428
429 let mut writer = manager
430 .begin(
431 IsolationLevel::ReadCommitted,
432 TransactionAccessMode::ReadWrite,
433 )
434 .expect("writer");
435 let meta = TupleMeta::new(writer.id(), 0);
436
437 let mut reader = manager
438 .begin(
439 IsolationLevel::ReadCommitted,
440 TransactionAccessMode::ReadWrite,
441 )
442 .expect("reader");
443 let snapshot = manager.snapshot(reader.id());
444 assert!(
445 !snapshot.is_visible(&meta, 0, |tid| manager.transaction_status(tid)),
446 "running writer should not be visible",
447 );
448
449 manager.commit(&mut writer).expect("commit writer");
450 let snapshot_after_commit = manager.snapshot(reader.id());
451 assert!(snapshot_after_commit.is_visible(&meta, 0, |tid| manager.transaction_status(tid)));
452
453 manager.abort(&mut reader).expect("abort reader");
454 }
455
456 #[test]
457 fn snapshot_treats_committed_delete_as_invisible() {
458 let temp = TempDir::new().expect("tempdir");
459 let wal = build_wal(&temp);
460 let manager = TransactionManager::new(wal, true);
461
462 let mut inserter = manager
463 .begin(
464 IsolationLevel::ReadCommitted,
465 TransactionAccessMode::ReadWrite,
466 )
467 .expect("insert txn");
468 let mut meta = TupleMeta::new(inserter.id(), 0);
469 manager.commit(&mut inserter).expect("commit insert");
470
471 let mut deleter = manager
472 .begin(
473 IsolationLevel::ReadCommitted,
474 TransactionAccessMode::ReadWrite,
475 )
476 .expect("delete txn");
477 meta.mark_deleted(deleter.id(), 0);
478
479 let mut reader = manager
480 .begin(
481 IsolationLevel::ReadCommitted,
482 TransactionAccessMode::ReadWrite,
483 )
484 .expect("reader txn");
485
486 let before_commit = manager.snapshot(reader.id());
487 assert!(before_commit.is_visible(&meta, 0, |tid| manager.transaction_status(tid)));
488
489 manager.commit(&mut deleter).expect("commit delete");
490 let after_commit = manager.snapshot(reader.id());
491 assert!(
492 !after_commit.is_visible(&meta, 0, |tid| manager.transaction_status(tid)),
493 "committed delete should hide tuple",
494 );
495
496 manager.abort(&mut reader).expect("abort reader");
497 }
498}