1use crate::client::TimeoutSettings;
2use crate::errors::{YdbError, YdbResult};
3use crate::grpc_wrapper::raw_table_service::execute_data_query::RawExecuteDataQueryRequest;
4use crate::grpc_wrapper::raw_table_service::query_stats::RawQueryStatMode;
5use crate::grpc_wrapper::raw_table_service::transaction_control::{
6 RawOnlineReadonlySettings, RawTransactionControl, RawTxMode, RawTxSelector, RawTxSettings,
7};
8use crate::query::Query;
9use crate::result::QueryResult;
10use crate::session::Session;
11use crate::session_pool::SessionPool;
12use async_trait::async_trait;
13use itertools::Itertools;
14use tracing::trace;
15use ydb_grpc::ydb_proto::table::transaction_settings::TxMode;
16use ydb_grpc::ydb_proto::table::{OnlineModeSettings, SerializableModeSettings};
17
18#[derive(Clone, Debug)]
19pub struct TransactionInfo {
20 pub(crate) transaction_id: String,
21 pub(crate) session_id: String,
22}
23
24#[derive(Copy, Clone, PartialEq, Eq)]
25pub enum Mode {
26 OnlineReadonly,
27 SerializableReadWrite,
28}
29
30impl From<Mode> for TxMode {
31 fn from(m: Mode) -> Self {
32 match m {
33 Mode::OnlineReadonly => TxMode::OnlineReadOnly(OnlineModeSettings::default()),
34 Mode::SerializableReadWrite => {
35 TxMode::SerializableReadWrite(SerializableModeSettings::default())
36 }
37 }
38 }
39}
40
41impl From<Mode> for RawTxMode {
42 fn from(value: Mode) -> Self {
43 match value {
44 Mode::OnlineReadonly => Self::OnlineReadOnly(RawOnlineReadonlySettings {
45 allow_inconsistent_reads: false,
46 }),
47 Mode::SerializableReadWrite => Self::SerializableReadWrite,
48 }
49 }
50}
51
52#[async_trait]
53pub trait Transaction: Send + Sync {
54 async fn query(&mut self, query: Query) -> YdbResult<QueryResult>;
55 async fn commit(&mut self) -> YdbResult<()>;
56 async fn rollback(&mut self) -> YdbResult<()>;
57 async fn transaction_info(&mut self) -> YdbResult<TransactionInfo> {
58 Err(YdbError::custom(
59 "Transaction info not available for this transaction type",
60 ))
61 }
62}
63
64pub(crate) struct AutoCommit {
67 mode: Mode,
68 error_on_truncate_response: bool,
69 session_pool: SessionPool,
70 timeouts: TimeoutSettings,
71}
72
73impl AutoCommit {
74 pub(crate) fn new(session_pool: SessionPool, mode: Mode, timeouts: TimeoutSettings) -> Self {
75 Self {
76 mode,
77 session_pool,
78 error_on_truncate_response: false,
79 timeouts,
80 }
81 }
82
83 pub(crate) fn with_error_on_truncate(mut self, error_on_truncate: bool) -> Self {
84 self.error_on_truncate_response = error_on_truncate;
85 self
86 }
87}
88
89impl Drop for AutoCommit {
90 fn drop(&mut self) {}
91}
92
93#[async_trait]
94impl Transaction for AutoCommit {
95 async fn query(&mut self, query: Query) -> YdbResult<QueryResult> {
96 let req = RawExecuteDataQueryRequest {
97 session_id: String::default(),
98 tx_control: RawTransactionControl {
99 commit_tx: true,
100 tx_selector: RawTxSelector::Begin(RawTxSettings {
101 mode: self.mode.into(),
102 }),
103 },
104 yql_text: query.text,
105 operation_params: self.timeouts.operation_params(),
106 params: query
107 .parameters
108 .into_iter()
109 .map(|(k, v)| match v.try_into() {
110 Ok(converted) => Ok((k, converted)),
111 Err(err) => Err(err),
112 })
113 .try_collect()?,
114 keep_in_cache: query.keep_in_cache,
115 collect_stats: RawQueryStatMode::None,
116 };
117
118 let mut session = self.session_pool.session().await?;
119 return session
120 .execute_data_query(req, self.error_on_truncate_response)
121 .await;
122 }
123
124 async fn commit(&mut self) -> YdbResult<()> {
125 Ok(())
126 }
127
128 async fn rollback(&mut self) -> YdbResult<()> {
129 Err(YdbError::from(
130 "impossible to rollback autocommit transaction",
131 ))
132 }
133}
134
135pub(crate) struct SerializableReadWriteTx {
136 error_on_truncate_response: bool,
137 session_pool: SessionPool,
138
139 id: Option<String>,
140 session: Option<Session>,
141 comitted: bool,
142 rollbacked: bool,
143 finished: bool,
144 timeouts: TimeoutSettings,
145}
146
147impl SerializableReadWriteTx {
148 pub(crate) fn new(session_pool: SessionPool, timeouts: TimeoutSettings) -> Self {
149 Self {
150 error_on_truncate_response: false,
151 session_pool,
152
153 id: None,
154 session: None,
155 comitted: false,
156 rollbacked: false,
157 finished: false,
158 timeouts,
159 }
160 }
161
162 pub(crate) fn with_error_on_truncate(mut self, error_on_truncate: bool) -> Self {
163 self.error_on_truncate_response = error_on_truncate;
164 self
165 }
166
167 async fn begin_transaction(&mut self) -> YdbResult<()> {
169 let _ = self.query(Query::new("SELECT 1")).await?;
171 Ok(())
172 }
173}
174
175impl Drop for SerializableReadWriteTx {
176 fn drop(&mut self) {
178 if !self.finished {
179 if let (Some(tx_id), Some(mut session)) = (self.id.take(), self.session.take()) {
180 tokio::spawn(async move {
181 let _ = session.rollback_transaction(tx_id).await;
182 });
183 };
184 };
185 }
186}
187
188#[async_trait]
189impl Transaction for SerializableReadWriteTx {
190 async fn query(&mut self, query: Query) -> YdbResult<QueryResult> {
191 let session = if let Some(session) = self.session.as_mut() {
192 session
193 } else {
194 self.session = Some(self.session_pool.session().await?);
195 trace!("create session from transaction");
196 self.session.as_mut().unwrap()
197 };
198 trace!("session: {:#?}", &session);
199
200 let tx_selector = if let Some(tx_id) = &self.id {
201 trace!("tx_id: {}", tx_id);
202 RawTxSelector::Id(tx_id.clone())
203 } else {
204 trace!("start new transaction");
205 RawTxSelector::Begin(RawTxSettings {
206 mode: RawTxMode::SerializableReadWrite,
207 })
208 };
209
210 let req = RawExecuteDataQueryRequest {
211 session_id: session.id.clone(),
212 tx_control: RawTransactionControl {
213 commit_tx: false,
214 tx_selector,
215 },
216 yql_text: query.text,
217
218 operation_params: self.timeouts.operation_params(),
219 params: query
220 .parameters
221 .into_iter()
222 .map(|(k, v)| match v.try_into() {
223 Ok(converted) => Ok((k, converted)),
224 Err(err) => Err(err),
225 })
226 .try_collect()?,
227 keep_in_cache: false,
228 collect_stats: RawQueryStatMode::None,
229 };
230 let query_result = session
231 .execute_data_query(req, self.error_on_truncate_response)
232 .await?;
233 if self.id.is_none() {
234 self.id = Some(query_result.tx_id.clone());
235 };
236
237 return Ok(query_result);
238 }
239
240 async fn commit(&mut self) -> YdbResult<()> {
241 if self.comitted {
242 return Ok(());
244 }
245
246 if self.finished {
247 return Err(YdbError::Custom(format!(
248 "commit finished non comitted transaction: {:?}",
249 &self.id
250 )));
251 }
252 self.finished = true;
253
254 let tx_id = if let Some(id) = &self.id {
255 id
256 } else {
257 self.comitted = true;
259 return Ok(());
260 };
261
262 if let Some(session) = self.session.as_mut() {
263 session.commit_transaction(tx_id.clone()).await?;
264 self.comitted = true;
265 return Ok(());
266 } else {
267 return Err(YdbError::InternalError(
268 "commit transaction without session (internal error)".into(),
269 ));
270 }
271 }
272
273 async fn rollback(&mut self) -> YdbResult<()> {
274 if self.rollbacked {
276 return Ok(());
277 }
278
279 if self.finished {
280 return Err(YdbError::Custom(format!(
281 "rollback finished non rollbacked transaction: {:?}",
282 &self.id
283 )));
284 }
285 self.finished = true;
286
287 let session = if let Some(session) = &mut self.session {
288 session
289 } else {
290 self.finished = true;
292 self.rollbacked = true;
293 return Ok(());
294 };
295
296 let tx_id = if let Some(id) = &self.id {
297 id.clone()
298 } else {
299 self.rollbacked = true;
301 return Ok(());
302 };
303
304 self.rollbacked = true;
305
306 return session.rollback_transaction(tx_id).await;
307 }
308
309 async fn transaction_info(&mut self) -> YdbResult<TransactionInfo> {
310 if self.id.is_none() || self.session.is_none() {
312 self.begin_transaction().await?;
313 }
314
315 Ok(TransactionInfo {
316 transaction_id: self.id.clone().unwrap(),
317 session_id: self.session.as_ref().unwrap().id.clone(),
318 })
319 }
320}