tarantool_rs/client/
transaction.rs1use std::{fmt, time::Duration};
2
3use async_trait::async_trait;
4
5use rmpv::Value;
6use tracing::debug;
7
8use super::{Connection, ExecutorExt, Stream};
9use crate::{
10 codec::{
11 consts::TransactionIsolationLevel,
12 request::{Begin, Commit, EncodedRequest, Rollback},
13 },
14 Executor, Result,
15};
16
17pub struct Transaction {
24 conn: Connection,
25 stream_id: u32,
26 finished: bool,
27}
28
29impl Transaction {
30 async fn new(
31 conn: Connection,
32 timeout_secs: Option<f64>,
33 isolation_level: TransactionIsolationLevel,
34 ) -> Result<Self> {
35 let stream_id = conn.next_stream_id();
36 let this = Self {
37 conn,
38 stream_id,
39 finished: false,
40 };
41 this.begin(isolation_level, timeout_secs).await?;
42 Ok(this)
43 }
44
45 async fn begin(
46 &self,
47 transaction_isolation_level: TransactionIsolationLevel,
48 timeout_secs: Option<f64>,
49 ) -> Result<()> {
50 debug!("Beginning tranasction on stream {}", self.stream_id);
51 self.send_request(Begin::new(timeout_secs, transaction_isolation_level))
52 .await
53 .map(drop)
54 }
55
56 pub async fn commit(mut self) -> Result<()> {
58 if !self.finished {
59 debug!("Commiting tranasction on stream {}", self.stream_id);
60 let _ = self.send_request(Commit::default()).await?;
61 self.finished = true;
62 }
63 Ok(())
64 }
65
66 pub async fn rollback(mut self) -> Result<()> {
68 if !self.finished {
69 debug!("Rolling back tranasction on stream {}", self.stream_id);
70 let _ = self.send_request(Rollback::default()).await?;
71 self.finished = true;
72 }
73 Ok(())
74 }
75}
76
77impl Drop for Transaction {
78 fn drop(&mut self) {
79 if !self.finished {
80 debug!(
81 "Rolling back tranasction on stream {} (on drop)",
82 self.stream_id
83 );
84 self.conn
85 .send_request_sync_and_forget(Rollback::default(), Some(self.stream_id));
86 self.finished = true;
87 }
88 }
89}
90
91#[async_trait]
92impl Executor for Transaction {
93 async fn send_encoded_request(&self, mut request: EncodedRequest) -> Result<Value> {
94 request.stream_id = Some(self.stream_id);
95 self.conn.send_encoded_request(request).await
96 }
97
98 fn stream(&self) -> Stream {
100 self.conn.stream()
101 }
102
103 fn transaction_builder(&self) -> TransactionBuilder {
104 self.conn.transaction_builder()
105 }
106
107 async fn transaction(&self) -> Result<Transaction> {
108 self.conn.transaction().await
109 }
110
111 async fn get_cached_sql_statement_id(&self, statement: &str) -> Option<u64> {
112 self.conn.get_cached_sql_statement_id(statement).await
113 }
114}
115
116impl fmt::Debug for Transaction {
117 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
118 f.debug_struct("Transaction")
119 .field("stream_id", &self.stream_id)
120 .field("finished", &self.finished)
121 .finish()
122 }
123}
124
125pub struct TransactionBuilder {
127 connection: Connection,
128 timeout_secs: Option<f64>,
129 isolation_level: TransactionIsolationLevel,
130}
131
132impl TransactionBuilder {
133 pub(crate) fn new(
134 connection: Connection,
135 timeout_secs: Option<f64>,
136 isolation_level: TransactionIsolationLevel,
137 ) -> Self {
138 Self {
139 connection,
140 timeout_secs,
141 isolation_level,
142 }
143 }
144
145 pub fn timeout(&mut self, timeout: impl Into<Option<Duration>>) -> &mut Self {
146 self.timeout_secs = timeout.into().as_ref().map(Duration::as_secs_f64);
147 self
148 }
149
150 pub fn isolation_level(&mut self, isolation_level: TransactionIsolationLevel) -> &mut Self {
151 self.isolation_level = isolation_level;
152 self
153 }
154
155 pub async fn begin(&self) -> Result<Transaction> {
156 Transaction::new(
157 self.connection.clone(),
158 self.timeout_secs,
159 self.isolation_level,
160 )
161 .await
162 }
163}