ydb/
transaction.rs

1use crate::client::TimeoutSettings;
2use crate::errors::{YdbError, YdbResult};
3use crate::grpc_wrapper::raw_table_service::execute_data_query::RawExecuteDataQueryRequest;
4use crate::grpc_wrapper::raw_table_service::query_stats::RawQueryStatMode;
5use crate::grpc_wrapper::raw_table_service::transaction_control::{
6    RawOnlineReadonlySettings, RawTransactionControl, RawTxMode, RawTxSelector, RawTxSettings,
7};
8use crate::query::Query;
9use crate::result::QueryResult;
10use crate::session::Session;
11use crate::session_pool::SessionPool;
12use async_trait::async_trait;
13use itertools::Itertools;
14use tracing::trace;
15use ydb_grpc::ydb_proto::table::transaction_settings::TxMode;
16use ydb_grpc::ydb_proto::table::{OnlineModeSettings, SerializableModeSettings};
17
18#[derive(Clone, Debug)]
19pub struct TransactionInfo {
20    pub(crate) transaction_id: String,
21    pub(crate) session_id: String,
22}
23
24#[derive(Copy, Clone, PartialEq, Eq)]
25pub enum Mode {
26    OnlineReadonly,
27    SerializableReadWrite,
28}
29
30impl From<Mode> for TxMode {
31    fn from(m: Mode) -> Self {
32        match m {
33            Mode::OnlineReadonly => TxMode::OnlineReadOnly(OnlineModeSettings::default()),
34            Mode::SerializableReadWrite => {
35                TxMode::SerializableReadWrite(SerializableModeSettings::default())
36            }
37        }
38    }
39}
40
41impl From<Mode> for RawTxMode {
42    fn from(value: Mode) -> Self {
43        match value {
44            Mode::OnlineReadonly => Self::OnlineReadOnly(RawOnlineReadonlySettings {
45                allow_inconsistent_reads: false,
46            }),
47            Mode::SerializableReadWrite => Self::SerializableReadWrite,
48        }
49    }
50}
51
52#[async_trait]
53pub trait Transaction: Send + Sync {
54    async fn query(&mut self, query: Query) -> YdbResult<QueryResult>;
55    async fn commit(&mut self) -> YdbResult<()>;
56    async fn rollback(&mut self) -> YdbResult<()>;
57    async fn transaction_info(&mut self) -> YdbResult<TransactionInfo> {
58        Err(YdbError::custom(
59            "Transaction info not available for this transaction type",
60        ))
61    }
62}
63
64// TODO: operations timeout
65
66pub(crate) struct AutoCommit {
67    mode: Mode,
68    error_on_truncate_response: bool,
69    session_pool: SessionPool,
70    timeouts: TimeoutSettings,
71}
72
73impl AutoCommit {
74    pub(crate) fn new(session_pool: SessionPool, mode: Mode, timeouts: TimeoutSettings) -> Self {
75        Self {
76            mode,
77            session_pool,
78            error_on_truncate_response: false,
79            timeouts,
80        }
81    }
82
83    pub(crate) fn with_error_on_truncate(mut self, error_on_truncate: bool) -> Self {
84        self.error_on_truncate_response = error_on_truncate;
85        self
86    }
87}
88
89impl Drop for AutoCommit {
90    fn drop(&mut self) {}
91}
92
93#[async_trait]
94impl Transaction for AutoCommit {
95    async fn query(&mut self, query: Query) -> YdbResult<QueryResult> {
96        let req = RawExecuteDataQueryRequest {
97            session_id: String::default(),
98            tx_control: RawTransactionControl {
99                commit_tx: true,
100                tx_selector: RawTxSelector::Begin(RawTxSettings {
101                    mode: self.mode.into(),
102                }),
103            },
104            yql_text: query.text,
105            operation_params: self.timeouts.operation_params(),
106            params: query
107                .parameters
108                .into_iter()
109                .map(|(k, v)| match v.try_into() {
110                    Ok(converted) => Ok((k, converted)),
111                    Err(err) => Err(err),
112                })
113                .try_collect()?,
114            keep_in_cache: query.keep_in_cache,
115            collect_stats: RawQueryStatMode::None,
116        };
117
118        let mut session = self.session_pool.session().await?;
119        return session
120            .execute_data_query(req, self.error_on_truncate_response)
121            .await;
122    }
123
124    async fn commit(&mut self) -> YdbResult<()> {
125        Ok(())
126    }
127
128    async fn rollback(&mut self) -> YdbResult<()> {
129        Err(YdbError::from(
130            "impossible to rollback autocommit transaction",
131        ))
132    }
133}
134
135pub(crate) struct SerializableReadWriteTx {
136    error_on_truncate_response: bool,
137    session_pool: SessionPool,
138
139    id: Option<String>,
140    session: Option<Session>,
141    comitted: bool,
142    rollbacked: bool,
143    finished: bool,
144    timeouts: TimeoutSettings,
145}
146
147impl SerializableReadWriteTx {
148    pub(crate) fn new(session_pool: SessionPool, timeouts: TimeoutSettings) -> Self {
149        Self {
150            error_on_truncate_response: false,
151            session_pool,
152
153            id: None,
154            session: None,
155            comitted: false,
156            rollbacked: false,
157            finished: false,
158            timeouts,
159        }
160    }
161
162    pub(crate) fn with_error_on_truncate(mut self, error_on_truncate: bool) -> Self {
163        self.error_on_truncate_response = error_on_truncate;
164        self
165    }
166
167    // Private method for transaction initialization using "workaround"
168    async fn begin_transaction(&mut self) -> YdbResult<()> {
169        // Call query with simple request to create transaction
170        let _ = self.query(Query::new("SELECT 1")).await?;
171        Ok(())
172    }
173}
174
175impl Drop for SerializableReadWriteTx {
176    // rollback if unfinished
177    fn drop(&mut self) {
178        if !self.finished {
179            if let (Some(tx_id), Some(mut session)) = (self.id.take(), self.session.take()) {
180                tokio::spawn(async move {
181                    let _ = session.rollback_transaction(tx_id).await;
182                });
183            };
184        };
185    }
186}
187
188#[async_trait]
189impl Transaction for SerializableReadWriteTx {
190    async fn query(&mut self, query: Query) -> YdbResult<QueryResult> {
191        let session = if let Some(session) = self.session.as_mut() {
192            session
193        } else {
194            self.session = Some(self.session_pool.session().await?);
195            trace!("create session from transaction");
196            self.session.as_mut().unwrap()
197        };
198        trace!("session: {:#?}", &session);
199
200        let tx_selector = if let Some(tx_id) = &self.id {
201            trace!("tx_id: {}", tx_id);
202            RawTxSelector::Id(tx_id.clone())
203        } else {
204            trace!("start new transaction");
205            RawTxSelector::Begin(RawTxSettings {
206                mode: RawTxMode::SerializableReadWrite,
207            })
208        };
209
210        let req = RawExecuteDataQueryRequest {
211            session_id: session.id.clone(),
212            tx_control: RawTransactionControl {
213                commit_tx: false,
214                tx_selector,
215            },
216            yql_text: query.text,
217
218            operation_params: self.timeouts.operation_params(),
219            params: query
220                .parameters
221                .into_iter()
222                .map(|(k, v)| match v.try_into() {
223                    Ok(converted) => Ok((k, converted)),
224                    Err(err) => Err(err),
225                })
226                .try_collect()?,
227            keep_in_cache: false,
228            collect_stats: RawQueryStatMode::None,
229        };
230        let query_result = session
231            .execute_data_query(req, self.error_on_truncate_response)
232            .await?;
233        if self.id.is_none() {
234            self.id = Some(query_result.tx_id.clone());
235        };
236
237        return Ok(query_result);
238    }
239
240    async fn commit(&mut self) -> YdbResult<()> {
241        if self.comitted {
242            // commit many times - ok
243            return Ok(());
244        }
245
246        if self.finished {
247            return Err(YdbError::Custom(format!(
248                "commit finished non comitted transaction: {:?}",
249                &self.id
250            )));
251        }
252        self.finished = true;
253
254        let tx_id = if let Some(id) = &self.id {
255            id
256        } else {
257            // commit non started transaction - ok
258            self.comitted = true;
259            return Ok(());
260        };
261
262        if let Some(session) = self.session.as_mut() {
263            session.commit_transaction(tx_id.clone()).await?;
264            self.comitted = true;
265            return Ok(());
266        } else {
267            return Err(YdbError::InternalError(
268                "commit transaction without session (internal error)".into(),
269            ));
270        }
271    }
272
273    async fn rollback(&mut self) -> YdbResult<()> {
274        // double rollback is ok
275        if self.rollbacked {
276            return Ok(());
277        }
278
279        if self.finished {
280            return Err(YdbError::Custom(format!(
281                "rollback finished non rollbacked transaction: {:?}",
282                &self.id
283            )));
284        }
285        self.finished = true;
286
287        let session = if let Some(session) = &mut self.session {
288            session
289        } else {
290            // rollback non started transaction ok
291            self.finished = true;
292            self.rollbacked = true;
293            return Ok(());
294        };
295
296        let tx_id = if let Some(id) = &self.id {
297            id.clone()
298        } else {
299            // rollback non started transaction - ok
300            self.rollbacked = true;
301            return Ok(());
302        };
303
304        self.rollbacked = true;
305
306        return session.rollback_transaction(tx_id).await;
307    }
308
309    async fn transaction_info(&mut self) -> YdbResult<TransactionInfo> {
310        // If transaction_id or session_id are missing, create transaction
311        if self.id.is_none() || self.session.is_none() {
312            self.begin_transaction().await?;
313        }
314
315        Ok(TransactionInfo {
316            transaction_id: self.id.clone().unwrap(),
317            session_id: self.session.as_ref().unwrap().id.clone(),
318        })
319    }
320}