sqlx_exasol/
transaction.rs

1use std::borrow::Cow;
2
3use futures_core::future::BoxFuture;
4use futures_util::FutureExt;
5use sqlx_core::transaction::TransactionManager;
6
7use crate::{
8    connection::websocket::future::{Commit, Rollback, WebSocketFuture},
9    database::Exasol,
10    error::ExaProtocolError,
11    ExaConnection, SqlxResult,
12};
13
14/// Implementor of [`TransactionManager`].
15#[derive(Debug, Clone, Copy)]
16pub struct ExaTransactionManager;
17
18impl TransactionManager for ExaTransactionManager {
19    type Database = Exasol;
20
21    fn begin<'conn>(
22        conn: &'conn mut ExaConnection,
23        _: Option<Cow<'static, str>>,
24    ) -> BoxFuture<'conn, SqlxResult<()>> {
25        Box::pin(async {
26            // Exasol does not have nested transactions.
27            if conn.attributes().open_transaction() {
28                // A pending rollback indicates that a transaction was dropped before an explicit
29                // rollback, which is why it's still open. If that's the case, then awaiting the
30                // rollback is sufficient to proceed.
31                match conn.ws.pending_rollback.take() {
32                    Some(rollback) => rollback.future(&mut conn.ws).await?,
33                    None => return Err(ExaProtocolError::TransactionAlreadyOpen)?,
34                }
35            }
36
37            // The next time a request is sent, the transaction will be started.
38            // We could eagerly start it as well, but that implies one more
39            // round-trip to the server and back with no benefit.
40            #[expect(deprecated, reason = "will make this private")]
41            conn.attributes_mut().set_autocommit(false);
42            Ok(())
43        })
44    }
45
46    fn commit(conn: &mut ExaConnection) -> BoxFuture<'_, SqlxResult<()>> {
47        async move { Commit::default().future(&mut conn.ws).await }.boxed()
48    }
49
50    fn rollback(conn: &mut ExaConnection) -> BoxFuture<'_, SqlxResult<()>> {
51        async move { Rollback::default().future(&mut conn.ws).await }.boxed()
52    }
53
54    fn start_rollback(conn: &mut ExaConnection) {
55        conn.ws.pending_rollback = Some(Rollback::default());
56    }
57
58    fn get_transaction_depth(conn: &ExaConnection) -> usize {
59        conn.attributes().open_transaction().into()
60    }
61}