tarantool_rs/client/
transaction.rs

1use std::{fmt, time::Duration};
2
3use async_trait::async_trait;
4
5use rmpv::Value;
6use tracing::debug;
7
8use super::{Connection, ExecutorExt, Stream};
9use crate::{
10    codec::{
11        consts::TransactionIsolationLevel,
12        request::{Begin, Commit, EncodedRequest, Rollback},
13    },
14    Executor, Result,
15};
16
17/// Started transaction ([docs](https://www.tarantool.io/en/doc/latest/dev_guide/internals/box_protocol/#binary-protocol-streams)).
18///
19/// If tranasction have a timeout and no requests made for that time, tranasction is automatically
20/// rolled back.
21///
22/// On drop tranasaction is rolled back, if not have been commited or rolled back already.
23pub struct Transaction {
24    conn: Connection,
25    stream_id: u32,
26    finished: bool,
27}
28
29impl Transaction {
30    async fn new(
31        conn: Connection,
32        timeout_secs: Option<f64>,
33        isolation_level: TransactionIsolationLevel,
34    ) -> Result<Self> {
35        let stream_id = conn.next_stream_id();
36        let this = Self {
37            conn,
38            stream_id,
39            finished: false,
40        };
41        this.begin(isolation_level, timeout_secs).await?;
42        Ok(this)
43    }
44
45    async fn begin(
46        &self,
47        transaction_isolation_level: TransactionIsolationLevel,
48        timeout_secs: Option<f64>,
49    ) -> Result<()> {
50        debug!("Beginning tranasction on stream {}", self.stream_id);
51        self.send_request(Begin::new(timeout_secs, transaction_isolation_level))
52            .await
53            .map(drop)
54    }
55
56    /// Commit tranasction.
57    pub async fn commit(mut self) -> Result<()> {
58        if !self.finished {
59            debug!("Commiting tranasction on stream {}", self.stream_id);
60            let _ = self.send_request(Commit::default()).await?;
61            self.finished = true;
62        }
63        Ok(())
64    }
65
66    /// Rollback tranasction.
67    pub async fn rollback(mut self) -> Result<()> {
68        if !self.finished {
69            debug!("Rolling back tranasction on stream {}", self.stream_id);
70            let _ = self.send_request(Rollback::default()).await?;
71            self.finished = true;
72        }
73        Ok(())
74    }
75}
76
77impl Drop for Transaction {
78    fn drop(&mut self) {
79        if !self.finished {
80            debug!(
81                "Rolling back tranasction on stream {} (on drop)",
82                self.stream_id
83            );
84            self.conn
85                .send_request_sync_and_forget(Rollback::default(), Some(self.stream_id));
86            self.finished = true;
87        }
88    }
89}
90
91#[async_trait]
92impl Executor for Transaction {
93    async fn send_encoded_request(&self, mut request: EncodedRequest) -> Result<Value> {
94        request.stream_id = Some(self.stream_id);
95        self.conn.send_encoded_request(request).await
96    }
97
98    // TODO: do we need to repeat this in all ConnetionLike implementations?
99    fn stream(&self) -> Stream {
100        self.conn.stream()
101    }
102
103    fn transaction_builder(&self) -> TransactionBuilder {
104        self.conn.transaction_builder()
105    }
106
107    async fn transaction(&self) -> Result<Transaction> {
108        self.conn.transaction().await
109    }
110
111    async fn get_cached_sql_statement_id(&self, statement: &str) -> Option<u64> {
112        self.conn.get_cached_sql_statement_id(statement).await
113    }
114}
115
116impl fmt::Debug for Transaction {
117    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
118        f.debug_struct("Transaction")
119            .field("stream_id", &self.stream_id)
120            .field("finished", &self.finished)
121            .finish()
122    }
123}
124
125/// Build transaction.
126pub struct TransactionBuilder {
127    connection: Connection,
128    timeout_secs: Option<f64>,
129    isolation_level: TransactionIsolationLevel,
130}
131
132impl TransactionBuilder {
133    pub(crate) fn new(
134        connection: Connection,
135        timeout_secs: Option<f64>,
136        isolation_level: TransactionIsolationLevel,
137    ) -> Self {
138        Self {
139            connection,
140            timeout_secs,
141            isolation_level,
142        }
143    }
144
145    pub fn timeout(&mut self, timeout: impl Into<Option<Duration>>) -> &mut Self {
146        self.timeout_secs = timeout.into().as_ref().map(Duration::as_secs_f64);
147        self
148    }
149
150    pub fn isolation_level(&mut self, isolation_level: TransactionIsolationLevel) -> &mut Self {
151        self.isolation_level = isolation_level;
152        self
153    }
154
155    pub async fn begin(&self) -> Result<Transaction> {
156        Transaction::new(
157            self.connection.clone(),
158            self.timeout_secs,
159            self.isolation_level,
160        )
161        .await
162    }
163}