Skip to main content

hyperdb_api/
async_transaction.rs

1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! RAII transaction guard for async connections.
5
6use crate::async_connection::AsyncConnection;
7use crate::async_result::AsyncRowset;
8use crate::error::Result;
9use crate::result::{Row, RowValue};
10
11/// An async RAII transaction guard.
12///
13/// Created via [`AsyncConnection::transaction()`]. This **exclusively borrows** the
14/// connection for the lifetime of the transaction, preventing any other code from
15/// using the raw connection while the transaction is active. Always explicitly call
16/// [`commit()`](Self::commit) or [`rollback()`](Self::rollback) — the `Drop`
17/// implementation cannot issue async rollback and will only emit a warning.
18///
19/// # Example
20///
21/// ```no_run
22/// # use hyperdb_api::{AsyncConnection, CreateMode, Result};
23/// # async fn example() -> Result<()> {
24/// # let mut conn = AsyncConnection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate).await?;
25/// let txn = conn.transaction().await?;
26/// txn.execute_command("INSERT INTO users VALUES (1, 'Alice')").await?;
27/// txn.commit().await?;
28/// # Ok(())
29/// # }
30/// ```
31#[derive(Debug)]
32pub struct AsyncTransaction<'conn> {
33    connection: &'conn mut AsyncConnection,
34    completed: bool,
35}
36
37impl<'conn> AsyncTransaction<'conn> {
38    /// Creates a new async transaction by issuing `BEGIN TRANSACTION`.
39    pub(crate) async fn new(connection: &'conn mut AsyncConnection) -> Result<Self> {
40        connection.begin_transaction().await?;
41        Ok(Self {
42            connection,
43            completed: false,
44        })
45    }
46
47    /// Commits the transaction.
48    ///
49    /// # Errors
50    ///
51    /// Forwards the error from [`AsyncConnection::commit`]. The transaction
52    /// is marked completed regardless, so the drop guard will not warn.
53    pub async fn commit(mut self) -> Result<()> {
54        self.completed = true;
55        self.connection.commit().await
56    }
57
58    /// Rolls back the transaction explicitly.
59    ///
60    /// # Errors
61    ///
62    /// Forwards the error from [`AsyncConnection::rollback`]. The
63    /// transaction is marked completed regardless.
64    pub async fn rollback(mut self) -> Result<()> {
65        self.completed = true;
66        self.connection.rollback().await
67    }
68
69    /// Returns a reference to the underlying async connection.
70    #[must_use]
71    pub fn connection(&self) -> &AsyncConnection {
72        self.connection
73    }
74
75    // =========================================================================
76    // Delegated execution methods
77    // =========================================================================
78
79    /// Executes a SQL command within this transaction.
80    ///
81    /// # Errors
82    ///
83    /// Forwards the error from [`AsyncConnection::execute_command`].
84    pub async fn execute_command(&self, sql: &str) -> Result<u64> {
85        self.connection.execute_command(sql).await
86    }
87
88    /// Executes a streaming query within this transaction.
89    ///
90    /// # Errors
91    ///
92    /// Forwards the error from [`AsyncConnection::execute_query`].
93    pub async fn execute_query(&self, query: &str) -> Result<AsyncRowset<'_>> {
94        self.connection.execute_query(query).await
95    }
96
97    /// Fetches a single row, erroring if zero rows are returned.
98    ///
99    /// # Errors
100    ///
101    /// Forwards the error from [`AsyncConnection::fetch_one`].
102    pub async fn fetch_one<Q: AsRef<str>>(&self, query: Q) -> Result<Row> {
103        self.connection.fetch_one(query).await
104    }
105
106    /// Fetches a single row or `None`.
107    ///
108    /// # Errors
109    ///
110    /// Forwards the error from [`AsyncConnection::fetch_optional`].
111    pub async fn fetch_optional<Q: AsRef<str>>(&self, query: Q) -> Result<Option<Row>> {
112        self.connection.fetch_optional(query).await
113    }
114
115    /// Fetches all rows from the query.
116    ///
117    /// # Errors
118    ///
119    /// Forwards the error from [`AsyncConnection::fetch_all`].
120    pub async fn fetch_all<Q: AsRef<str>>(&self, query: Q) -> Result<Vec<Row>> {
121        self.connection.fetch_all(query).await
122    }
123
124    /// Fetches a single non-NULL scalar; errors if empty or NULL.
125    ///
126    /// # Errors
127    ///
128    /// Forwards the error from [`AsyncConnection::fetch_scalar`].
129    pub async fn fetch_scalar<T, Q>(&self, query: Q) -> Result<T>
130    where
131        T: RowValue,
132        Q: AsRef<str>,
133    {
134        self.connection.fetch_scalar(query).await
135    }
136
137    /// Fetches a single scalar, allowing NULL as `None`.
138    ///
139    /// # Errors
140    ///
141    /// Forwards the error from [`AsyncConnection::fetch_optional_scalar`].
142    pub async fn fetch_optional_scalar<T, Q>(&self, query: Q) -> Result<Option<T>>
143    where
144        T: RowValue,
145        Q: AsRef<str>,
146    {
147        self.connection.fetch_optional_scalar(query).await
148    }
149
150    /// Returns a count from a `SELECT COUNT(*)` style query.
151    ///
152    /// # Errors
153    ///
154    /// Forwards the error from [`AsyncConnection::query_count`].
155    pub async fn query_count(&self, query: &str) -> Result<i64> {
156        self.connection.query_count(query).await
157    }
158
159    /// Executes a parameterized query within this transaction.
160    ///
161    /// # Errors
162    ///
163    /// Forwards the error from [`AsyncConnection::query_params`].
164    pub async fn query_params(
165        &self,
166        query: &str,
167        params: &[&dyn crate::params::ToSqlParam],
168    ) -> Result<AsyncRowset<'_>> {
169        self.connection.query_params(query, params).await
170    }
171
172    /// Executes a parameterized command within this transaction.
173    ///
174    /// # Errors
175    ///
176    /// Forwards the error from [`AsyncConnection::command_params`].
177    pub async fn command_params(
178        &self,
179        query: &str,
180        params: &[&dyn crate::params::ToSqlParam],
181    ) -> Result<u64> {
182        self.connection.command_params(query, params).await
183    }
184}
185
186impl Drop for AsyncTransaction<'_> {
187    fn drop(&mut self) {
188        if !self.completed {
189            // CRITICAL: Rust does not support async Drop, so we CANNOT issue a
190            // ROLLBACK here (unlike the sync Transaction which can and does).
191            // The transaction remains open on the server until the next command
192            // on the connection, at which point Hyper handles it implicitly.
193            // Users MUST explicitly call commit() or rollback().
194            tracing::warn!(
195                "AsyncTransaction dropped without explicit commit/rollback — \
196                 transaction state is undefined until the next command on this connection"
197            );
198        }
199    }
200}