Skip to main content

hyperdb_api/
async_transaction.rs

1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! RAII transaction guard for async connections.
5
6use crate::async_connection::AsyncConnection;
7use crate::async_result::AsyncRowset;
8use crate::error::Result;
9use crate::result::{Row, RowValue};
10
11/// An async RAII transaction guard.
12///
13/// Created via [`AsyncConnection::transaction()`]. This **exclusively borrows** the
14/// connection for the lifetime of the transaction, preventing any other code from
15/// using the raw connection while the transaction is active. Always explicitly call
16/// [`commit()`](Self::commit) or [`rollback()`](Self::rollback) — the `Drop`
17/// implementation cannot issue async rollback and will only emit a warning.
18///
19/// # Example
20///
21/// ```no_run
22/// # use hyperdb_api::{AsyncConnection, CreateMode, Result};
23/// # async fn example() -> Result<()> {
24/// # let mut conn = AsyncConnection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate).await?;
25/// let txn = conn.transaction().await?;
26/// txn.execute_command("INSERT INTO users VALUES (1, 'Alice')").await?;
27/// txn.commit().await?;
28/// # Ok(())
29/// # }
30/// ```
31#[derive(Debug)]
32pub struct AsyncTransaction<'conn> {
33    connection: &'conn mut AsyncConnection,
34    completed: bool,
35}
36
37impl<'conn> AsyncTransaction<'conn> {
38    /// Creates a new async transaction by issuing `BEGIN TRANSACTION`.
39    pub(crate) async fn new(connection: &'conn mut AsyncConnection) -> Result<Self> {
40        // Use the crate-internal `_raw` family. The matching `pub`
41        // methods on `AsyncConnection` are `#[deprecated]` for
42        // downstream consumers; this guard is the recommended
43        // replacement.
44        connection.begin_transaction_raw().await?;
45        Ok(Self {
46            connection,
47            completed: false,
48        })
49    }
50
51    /// Commits the transaction.
52    ///
53    /// # Errors
54    ///
55    /// Forwards the error from the server's `COMMIT`. The transaction
56    /// is marked completed regardless, so the drop guard will not warn.
57    pub async fn commit(mut self) -> Result<()> {
58        self.completed = true;
59        self.connection.commit_raw().await
60    }
61
62    /// Rolls back the transaction explicitly.
63    ///
64    /// # Errors
65    ///
66    /// Forwards the error from the server's `ROLLBACK`. The
67    /// transaction is marked completed regardless.
68    pub async fn rollback(mut self) -> Result<()> {
69        self.completed = true;
70        self.connection.rollback_raw().await
71    }
72
73    /// Returns a reference to the underlying async connection.
74    #[must_use]
75    pub fn connection(&self) -> &AsyncConnection {
76        self.connection
77    }
78
79    // =========================================================================
80    // Delegated execution methods
81    // =========================================================================
82
83    /// Executes a SQL command within this transaction.
84    ///
85    /// # Errors
86    ///
87    /// Forwards the error from [`AsyncConnection::execute_command`].
88    pub async fn execute_command(&self, sql: &str) -> Result<u64> {
89        self.connection.execute_command(sql).await
90    }
91
92    /// Executes a streaming query within this transaction.
93    ///
94    /// # Errors
95    ///
96    /// Forwards the error from [`AsyncConnection::execute_query`].
97    pub async fn execute_query(&self, query: &str) -> Result<AsyncRowset<'_>> {
98        self.connection.execute_query(query).await
99    }
100
101    /// Fetches a single row, erroring if zero rows are returned.
102    ///
103    /// # Errors
104    ///
105    /// Forwards the error from [`AsyncConnection::fetch_one`].
106    pub async fn fetch_one<Q: AsRef<str>>(&self, query: Q) -> Result<Row> {
107        self.connection.fetch_one(query).await
108    }
109
110    /// Fetches a single row or `None`.
111    ///
112    /// # Errors
113    ///
114    /// Forwards the error from [`AsyncConnection::fetch_optional`].
115    pub async fn fetch_optional<Q: AsRef<str>>(&self, query: Q) -> Result<Option<Row>> {
116        self.connection.fetch_optional(query).await
117    }
118
119    /// Fetches all rows from the query.
120    ///
121    /// # Errors
122    ///
123    /// Forwards the error from [`AsyncConnection::fetch_all`].
124    pub async fn fetch_all<Q: AsRef<str>>(&self, query: Q) -> Result<Vec<Row>> {
125        self.connection.fetch_all(query).await
126    }
127
128    /// Fetches a single non-NULL scalar; errors if empty or NULL.
129    ///
130    /// # Errors
131    ///
132    /// Forwards the error from [`AsyncConnection::fetch_scalar`].
133    pub async fn fetch_scalar<T, Q>(&self, query: Q) -> Result<T>
134    where
135        T: RowValue,
136        Q: AsRef<str>,
137    {
138        self.connection.fetch_scalar(query).await
139    }
140
141    /// Fetches a single scalar, allowing NULL as `None`.
142    ///
143    /// # Errors
144    ///
145    /// Forwards the error from [`AsyncConnection::fetch_optional_scalar`].
146    pub async fn fetch_optional_scalar<T, Q>(&self, query: Q) -> Result<Option<T>>
147    where
148        T: RowValue,
149        Q: AsRef<str>,
150    {
151        self.connection.fetch_optional_scalar(query).await
152    }
153
154    /// Returns a count from a `SELECT COUNT(*)` style query.
155    ///
156    /// # Errors
157    ///
158    /// Forwards the error from [`AsyncConnection::query_count`].
159    pub async fn query_count(&self, query: &str) -> Result<i64> {
160        self.connection.query_count(query).await
161    }
162
163    /// Executes a parameterized query within this transaction.
164    ///
165    /// # Errors
166    ///
167    /// Forwards the error from [`AsyncConnection::query_params`].
168    pub async fn query_params(
169        &self,
170        query: &str,
171        params: &[&dyn crate::params::ToSqlParam],
172    ) -> Result<AsyncRowset<'_>> {
173        self.connection.query_params(query, params).await
174    }
175
176    /// Executes a parameterized command within this transaction.
177    ///
178    /// # Errors
179    ///
180    /// Forwards the error from [`AsyncConnection::command_params`].
181    pub async fn command_params(
182        &self,
183        query: &str,
184        params: &[&dyn crate::params::ToSqlParam],
185    ) -> Result<u64> {
186        self.connection.command_params(query, params).await
187    }
188}
189
190impl Drop for AsyncTransaction<'_> {
191    fn drop(&mut self) {
192        if !self.completed {
193            // CRITICAL: Rust does not support async Drop, so we CANNOT issue a
194            // ROLLBACK here (unlike the sync Transaction which can and does).
195            // The transaction remains open on the server until the next command
196            // on the connection, at which point Hyper handles it implicitly.
197            // Users MUST explicitly call commit() or rollback().
198            tracing::warn!(
199                "AsyncTransaction dropped without explicit commit/rollback — \
200                 transaction state is undefined until the next command on this connection"
201            );
202        }
203    }
204}