quill_sql/transaction/
txn_context.rs1use std::sync::Arc;
2
3use crate::error::QuillSQLError;
4use crate::storage::{
5 record::{RecordId, TupleMeta},
6 tuple::Tuple,
7};
8use crate::transaction::{
9 CommandId, IsolationLevel, LockManager, LockMode, RowLockGuard, Transaction, TransactionId,
10 TransactionManager, TransactionSnapshot, TxnReadGuard,
11};
12use crate::utils::table_ref::TableReference;
13use log::warn;
14use sqlparser::ast::TransactionAccessMode;
15
16pub struct TxnContext<'a> {
17 txn: &'a mut Transaction,
18 manager: Arc<TransactionManager>,
19 snapshot: TransactionSnapshot,
20 command_id: CommandId,
21 lock_manager: Arc<LockManager>,
22}
23
24impl<'a> TxnContext<'a> {
25 pub fn new(manager: Arc<TransactionManager>, txn: &'a mut Transaction) -> Self {
26 let lock_manager = manager.lock_manager_arc();
27 let command_id = txn.begin_command();
28 let snapshot = match txn.isolation_level() {
29 IsolationLevel::RepeatableRead | IsolationLevel::Serializable => {
30 if let Some(existing) = txn.snapshot().cloned() {
31 existing
32 } else {
33 let snap = manager.snapshot(txn.id());
34 txn.set_snapshot(snap.clone());
35 snap
36 }
37 }
38 IsolationLevel::ReadCommitted | IsolationLevel::ReadUncommitted => {
39 let snap = manager.snapshot(txn.id());
40 txn.clear_snapshot();
41 snap
42 }
43 };
44 Self {
45 txn,
46 manager,
47 snapshot,
48 command_id,
49 lock_manager,
50 }
51 }
52
53 pub fn snapshot(&self) -> &TransactionSnapshot {
54 &self.snapshot
55 }
56
57 pub fn is_visible(&self, meta: &TupleMeta) -> bool {
58 self.snapshot.is_visible(meta, self.command_id, |txn_id| {
59 self.manager.transaction_status(txn_id)
60 })
61 }
62
63 pub fn read_visible_tuple(
64 &mut self,
65 table: &TableReference,
66 rid: RecordId,
67 meta: &TupleMeta,
68 tuple: Tuple,
69 ) -> crate::error::QuillSQLResult<Option<Tuple>> {
70 match self.transaction().isolation_level() {
71 crate::transaction::IsolationLevel::ReadUncommitted => {
72 if self.is_visible(meta) {
73 Ok(Some(tuple))
74 } else {
75 Ok(None)
76 }
77 }
78 _ => {
79 let guard = self.acquire_shared_guard(table, rid)?;
80 let visible = self.is_visible(meta);
81 guard.release();
82 if visible {
83 Ok(Some(tuple))
84 } else {
85 Ok(None)
86 }
87 }
88 }
89 }
90
91 pub fn acquire_shared_guard(
92 &mut self,
93 table: &TableReference,
94 rid: RecordId,
95 ) -> crate::error::QuillSQLResult<TxnReadGuard> {
96 if !self
97 .lock_manager
98 .lock_row(self.transaction(), LockMode::Shared, table.clone(), rid)
99 {
100 return Err(QuillSQLError::Execution(
101 "failed to acquire shared row lock".to_string(),
102 ));
103 }
104 Ok(TxnReadGuard::Temporary(RowLockGuard::new(
105 self.lock_manager.clone(),
106 self.transaction().id(),
107 table.clone(),
108 rid,
109 )))
110 }
111
112 pub fn lock_row_exclusive(
113 &mut self,
114 table: &TableReference,
115 rid: RecordId,
116 ) -> crate::error::QuillSQLResult<()> {
117 if !self
118 .manager
119 .try_acquire_row_lock(self.txn, table.clone(), rid, LockMode::Exclusive)?
120 {
121 return Err(QuillSQLError::Execution(
122 "failed to acquire row exclusive lock".to_string(),
123 ));
124 }
125 Ok(())
126 }
127
128 pub fn lock_table(
129 &mut self,
130 table: TableReference,
131 mode: LockMode,
132 ) -> crate::error::QuillSQLResult<()> {
133 self.manager
134 .acquire_table_lock(self.txn, table.clone(), mode)
135 .map_err(|e| QuillSQLError::Execution(format!("lock error: {}", e)))
136 }
137
138 pub fn ensure_writable(
139 &self,
140 table: &TableReference,
141 operation: &str,
142 ) -> crate::error::QuillSQLResult<()> {
143 if matches!(
144 self.transaction().access_mode(),
145 TransactionAccessMode::ReadOnly
146 ) {
147 warn!(
148 "read-only txn {} attempted '{}' on {}",
149 self.txn.id(),
150 operation,
151 table.to_log_string()
152 );
153 return Err(QuillSQLError::Execution(format!(
154 "operation '{}' on table {} is not allowed in READ ONLY transaction",
155 operation,
156 table.to_log_string()
157 )));
158 }
159 Ok(())
160 }
161
162 pub fn isolation_level(&self) -> IsolationLevel {
163 self.txn.isolation_level()
164 }
165
166 pub fn transaction(&self) -> &Transaction {
167 self.txn
168 }
169
170 pub fn transaction_mut(&mut self) -> &mut Transaction {
171 self.txn
172 }
173
174 pub fn manager(&self) -> &TransactionManager {
175 self.manager.as_ref()
176 }
177
178 pub fn txn_id(&self) -> TransactionId {
179 self.txn.id()
180 }
181
182 pub fn unlock_row(&self, table: &TableReference, rid: RecordId) {
183 self.manager.unlock_row(self.txn.id(), table, rid);
184 }
185
186 pub fn command_id(&self) -> CommandId {
187 self.command_id
188 }
189}