Skip to main content

xitca_postgres/transaction/
builder.rs

1use crate::{
2    client::{Client, ClientBorrowMut},
3    error::Error,
4    execute::Execute,
5};
6
7use super::Transaction;
8
9/// The isolation level of a database transaction.
10#[derive(Debug, Copy, Clone)]
11#[non_exhaustive]
12pub enum IsolationLevel {
13    /// Equivalent to `ReadCommitted`.
14    ReadUncommitted,
15    /// An individual statement in the transaction will see rows committed before it began.
16    ReadCommitted,
17    /// All statements in the transaction will see the same view of rows committed before the first query in the
18    /// transaction.
19    RepeatableRead,
20    /// The reads and writes in this transaction must be able to be committed as an atomic "unit" with respect to reads
21    /// and writes of all other concurrent serializable transactions without interleaving.
22    Serializable,
23}
24
25impl IsolationLevel {
26    const PREFIX: &str = " ISOLATION LEVEL ";
27    const READ_UNCOMMITTED: &str = "READ UNCOMMITTED,";
28    const READ_COMMITTED: &str = "READ COMMITTED,";
29    const REPEATABLE_READ: &str = "REPEATABLE READ,";
30    const SERIALIZABLE: &str = "SERIALIZABLE,";
31
32    fn write(self, str: &mut StackStr) {
33        str.push_str(Self::PREFIX);
34        str.push_str(match self {
35            IsolationLevel::ReadUncommitted => Self::READ_UNCOMMITTED,
36            IsolationLevel::ReadCommitted => Self::READ_COMMITTED,
37            IsolationLevel::RepeatableRead => Self::REPEATABLE_READ,
38            IsolationLevel::Serializable => Self::SERIALIZABLE,
39        });
40    }
41}
42
43/// A builder for database transactions.
44pub struct TransactionBuilder {
45    isolation_level: Option<IsolationLevel>,
46    read_only: Option<bool>,
47    deferrable: Option<bool>,
48}
49
50impl TransactionBuilder {
51    pub const fn new() -> Self {
52        Self {
53            isolation_level: None,
54            read_only: None,
55            deferrable: None,
56        }
57    }
58
59    /// Sets the isolation level of the transaction.
60    pub fn isolation_level(mut self, isolation_level: IsolationLevel) -> Self {
61        self.isolation_level = Some(isolation_level);
62        self
63    }
64
65    /// Sets the access mode of the transaction.
66    pub fn read_only(mut self, read_only: bool) -> Self {
67        self.read_only = Some(read_only);
68        self
69    }
70
71    /// Sets the deferrability of the transaction.
72    ///
73    /// If the transaction is also serializable and read only, creation of the transaction may block, but when it
74    /// completes the transaction is able to run with less overhead and a guarantee that it will not be aborted due to
75    /// serialization failure.
76    pub fn deferrable(mut self, deferrable: bool) -> Self {
77        self.deferrable = Some(deferrable);
78        self
79    }
80
81    /// Begins the transaction with a borrowned client.
82    ///
83    /// The transaction will roll back by default - use [`Transaction::commit`] method to commit it.
84    pub async fn begin<C>(self, cli: &mut C) -> Result<Transaction<'_, C>, Error>
85    where
86        C: ClientBorrowMut,
87    {
88        // marker check to ensure exclusive borrowing Client. see ClientBorrowMut for detail
89        self._begin(cli.borrow_cli_mut()).await.map(|_| Transaction::new(cli))
90    }
91
92    /// Begins the transaction with an owned client.
93    ///
94    /// The transaction will roll back by default - use [`Transaction::commit`] method to commit it.
95    pub async fn begin_owned<'a, C>(self, mut cli: C) -> Result<Transaction<'a, C>, Error>
96    where
97        C: ClientBorrowMut + 'a,
98    {
99        // marker check to ensure exclusive borrowing Client. see ClientBorrowMut for detail
100        self._begin(cli.borrow_cli_mut())
101            .await
102            .map(|_| Transaction::new_owned(cli))
103    }
104
105    async fn _begin(self, cli: &Client) -> Result<u64, Error> {
106        let mut query = const { StackStr::new("START TRANSACTION") };
107
108        let Self {
109            isolation_level,
110            read_only,
111            deferrable,
112        } = self;
113
114        if let Some(isolation_level) = isolation_level {
115            isolation_level.write(&mut query);
116        }
117
118        if let Some(read_only) = read_only {
119            let s = if read_only { " READ ONLY," } else { " READ WRITE," };
120            query.push_str(s);
121        }
122
123        if let Some(deferrable) = deferrable {
124            let s = if deferrable { " DEFERRABLE" } else { " NOT DEFERRABLE" };
125            query.push_str(s);
126        }
127
128        query.pop_if_ends_with(",");
129
130        query.as_str().execute(cli).await
131    }
132}
133
134struct StackStr {
135    buf: [u8; 120],
136    cursor: usize,
137}
138
139impl StackStr {
140    const fn new(str: &str) -> Self {
141        let mut buf = [0; 120];
142
143        let mut cursor = 0;
144
145        let str = str.as_bytes();
146
147        while cursor < str.len() {
148            buf[cursor] = str[cursor];
149            cursor += 1;
150        }
151
152        Self { buf, cursor }
153    }
154
155    fn push_str(&mut self, str: &str) {
156        let start = self.cursor;
157        self.cursor += str.len();
158        self.buf[start..self.cursor].copy_from_slice(str.as_bytes());
159    }
160
161    fn as_str(&self) -> &str {
162        core::str::from_utf8(&self.buf[..self.cursor]).unwrap()
163    }
164
165    fn pop_if_ends_with(&mut self, needle: &str) {
166        let needle = needle.as_bytes();
167        if self.buf[..self.cursor].ends_with(needle) {
168            self.cursor -= needle.len();
169        }
170    }
171}
172
173#[cfg(test)]
174mod test {
175    use std::sync::Arc;
176
177    use crate::{
178        Client, Postgres,
179        client::{ClientBorrow, ClientBorrowMut},
180    };
181
182    use super::{IsolationLevel, TransactionBuilder};
183
184    #[tokio::test]
185    async fn client_borrow_mut() {
186        #[derive(Clone)]
187        struct PanicCli(Arc<Client>);
188
189        impl PanicCli {
190            fn new(cli: Client) -> Self {
191                Self(Arc::new(cli))
192            }
193        }
194
195        impl ClientBorrow for PanicCli {
196            fn borrow_cli_ref(&self) -> &Client {
197                &self.0
198            }
199        }
200
201        impl ClientBorrowMut for PanicCli {
202            fn borrow_cli_mut(&mut self) -> &mut Client {
203                Arc::get_mut(&mut self.0).unwrap()
204            }
205        }
206
207        let (cli, drv) = Postgres::new("postgres://postgres:postgres@localhost:5432")
208            .connect()
209            .await
210            .unwrap();
211
212        tokio::spawn(drv.into_future());
213
214        let mut cli = PanicCli::new(cli);
215
216        {
217            let _tx = TransactionBuilder::new().begin(&mut cli).await.unwrap();
218        }
219
220        let res = tokio::spawn(async move {
221            let _cli2 = cli.clone();
222            let _tx = TransactionBuilder::new().begin_owned(cli).await.unwrap();
223        })
224        .await
225        .err()
226        .unwrap();
227
228        assert!(res.is_panic());
229    }
230
231    #[tokio::test]
232    async fn transaction_builder() {
233        let (mut cli, drv) = Postgres::new("postgres://postgres:postgres@localhost:5432")
234            .connect()
235            .await
236            .unwrap();
237
238        tokio::spawn(drv.into_future());
239
240        let _ = TransactionBuilder::new()
241            .isolation_level(IsolationLevel::ReadUncommitted)
242            .read_only(false)
243            .deferrable(false)
244            .begin(&mut cli)
245            .await
246            .unwrap();
247    }
248}