hyperdb_api/async_connection.rs
1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Async connection to Hyper database.
5//!
6//! This module provides [`AsyncConnection`] the async version of [`Connection`](crate::Connection).
7//! Use this when you're already in an async runtime (tokio).
8
9use std::any::Any;
10use std::sync::{Arc, Mutex};
11
12use crate::async_result::AsyncRowset;
13use crate::async_transport::{AsyncTcpTransport, AsyncTransport};
14use crate::error::{Error, Result};
15use crate::names::escape_sql_path;
16use crate::query_stats::{QueryStats, QueryStatsProvider};
17use crate::result::{Row, RowValue};
18use crate::CreateMode;
19
20/// An async connection to a Hyper database.
21///
22/// This is the async equivalent of [`Connection`](crate::Connection), designed for use
23/// in tokio-based async applications. All I/O operations are non-blocking.
24///
25/// # Example
26///
27/// ```no_run
28/// use hyperdb_api::{AsyncConnection, CreateMode, Result};
29///
30/// #[tokio::main]
31/// async fn main() -> Result<()> {
32/// let conn = AsyncConnection::connect(
33/// "localhost:7483",
34/// "example.hyper",
35/// CreateMode::CreateIfNotExists,
36/// ).await?;
37///
38/// conn.execute_command("CREATE TABLE test (id INT)").await?;
39/// let count: i64 = conn.fetch_scalar("SELECT COUNT(*) FROM test").await?;
40///
41/// conn.close().await?;
42/// Ok(())
43/// }
44/// ```
45pub struct AsyncConnection {
46 transport: AsyncTransport,
47 database: Option<String>,
48 stats_provider: Mutex<Option<Arc<dyn QueryStatsProvider>>>,
49 pending_stats: Mutex<Option<(Box<dyn Any + Send>, String)>>,
50}
51
52impl std::fmt::Debug for AsyncConnection {
53 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54 f.debug_struct("AsyncConnection")
55 .field("database", &self.database)
56 .finish_non_exhaustive()
57 }
58}
59
60impl AsyncConnection {
61 /// Returns a fluent [`AsyncConnectionBuilder`](crate::AsyncConnectionBuilder)
62 /// pointed at `endpoint`.
63 #[must_use]
64 pub fn builder(endpoint: &str) -> crate::AsyncConnectionBuilder {
65 crate::AsyncConnectionBuilder::new(endpoint)
66 }
67
68 /// Connects to a Hyper server (async).
69 ///
70 /// Transport is auto-detected from the endpoint:
71 /// - `https://` or `http://` → gRPC transport
72 /// - Otherwise → TCP transport (`PostgreSQL` wire protocol)
73 ///
74 /// # Errors
75 ///
76 /// - Returns [`Error::Io`] / [`Error::Connection`] if the handshake with
77 /// the server fails.
78 /// - Returns [`Error::Server`] if the `CreateMode` SQL (`CREATE`
79 /// / `DROP` / `ATTACH`) is rejected by the server.
80 pub async fn connect(endpoint: &str, database: &str, mode: CreateMode) -> Result<Self> {
81 let transport = AsyncTransport::connect(endpoint, Some(database)).await?;
82 let conn = AsyncConnection {
83 transport,
84 database: Some(database.to_string()),
85 stats_provider: Mutex::new(None),
86 pending_stats: Mutex::new(None),
87 };
88
89 if conn.transport.supports_writes() {
90 conn.handle_creation_mode(database, mode).await?;
91 conn.attach_and_set_path(database).await?;
92 }
93
94 Ok(conn)
95 }
96
97 /// Connects with authentication (async).
98 ///
99 /// # Errors
100 ///
101 /// - Returns [`Error::Authentication`] if authentication is rejected.
102 /// - Returns [`Error::Io`] if the endpoint cannot be reached.
103 /// - Returns [`Error::Server`] if the `CreateMode` SQL is rejected.
104 pub async fn connect_with_auth(
105 endpoint: &str,
106 database: &str,
107 mode: CreateMode,
108 user: &str,
109 password: &str,
110 ) -> Result<Self> {
111 let transport = AsyncTransport::connect_tcp_with_auth(endpoint, user, password).await?;
112 let conn = AsyncConnection {
113 transport,
114 database: Some(database.to_string()),
115 stats_provider: Mutex::new(None),
116 pending_stats: Mutex::new(None),
117 };
118
119 conn.handle_creation_mode(database, mode).await?;
120 conn.attach_and_set_path(database).await?;
121
122 Ok(conn)
123 }
124
125 /// Connects to a server without attaching any database (async).
126 ///
127 /// Useful for running `CREATE DATABASE` / `DROP DATABASE` without an
128 /// active attachment.
129 ///
130 /// # Errors
131 ///
132 /// Returns [`Error::Io`] or [`Error::Connection`] if the TCP handshake
133 /// with `endpoint` fails.
134 pub async fn without_database(endpoint: &str) -> Result<Self> {
135 let transport = AsyncTransport::connect_tcp(endpoint).await?;
136 Ok(AsyncConnection {
137 transport,
138 database: None,
139 stats_provider: Mutex::new(None),
140 pending_stats: Mutex::new(None),
141 })
142 }
143
144 /// Builds an `AsyncConnection` from a pre-existing `AsyncClient` (TCP only).
145 #[must_use]
146 pub fn from_async_client(
147 client: hyperdb_api_core::client::AsyncClient,
148 database: Option<String>,
149 ) -> Self {
150 AsyncConnection {
151 transport: AsyncTransport::Tcp(AsyncTcpTransport { client }),
152 database,
153 stats_provider: Mutex::new(None),
154 pending_stats: Mutex::new(None),
155 }
156 }
157
158 /// Builds an `AsyncConnection` from a pre-constructed transport.
159 ///
160 /// Used by [`AsyncConnectionBuilder`](crate::AsyncConnectionBuilder) to
161 /// stitch together a gRPC transport after its own config construction.
162 pub(crate) fn from_transport(transport: AsyncTransport, database: Option<String>) -> Self {
163 AsyncConnection {
164 transport,
165 database,
166 stats_provider: Mutex::new(None),
167 pending_stats: Mutex::new(None),
168 }
169 }
170
171 /// Runs the configured `CreateMode` as SQL (crate-public for use by
172 /// [`AsyncConnectionBuilder`](crate::AsyncConnectionBuilder)).
173 pub(crate) async fn handle_creation_mode_public(
174 &self,
175 database: &str,
176 mode: CreateMode,
177 ) -> Result<()> {
178 self.handle_creation_mode(database, mode).await
179 }
180
181 /// Attaches the database and sets `search_path` (crate-public for use
182 /// by [`AsyncConnectionBuilder`](crate::AsyncConnectionBuilder)).
183 pub(crate) async fn attach_and_set_path_public(&self, database: &str) -> Result<()> {
184 self.attach_and_set_path(database).await
185 }
186
187 async fn handle_creation_mode(&self, database: &str, mode: CreateMode) -> Result<()> {
188 let escaped_db = escape_sql_path(database);
189 match mode {
190 CreateMode::Create => {
191 self.execute_command(&format!("CREATE DATABASE {escaped_db}"))
192 .await?;
193 }
194 CreateMode::CreateIfNotExists => {
195 self.execute_command(&format!("CREATE DATABASE IF NOT EXISTS {escaped_db}"))
196 .await?;
197 }
198 CreateMode::CreateAndReplace => {
199 self.execute_command(&format!("DROP DATABASE IF EXISTS {escaped_db}"))
200 .await?;
201 self.execute_command(&format!("CREATE DATABASE {escaped_db}"))
202 .await?;
203 }
204 CreateMode::DoNotCreate => {}
205 }
206 Ok(())
207 }
208
209 async fn attach_and_set_path(&self, database: &str) -> Result<()> {
210 let escaped_db = escape_sql_path(database);
211 let db_alias = std::path::Path::new(database)
212 .file_stem()
213 .and_then(|s| s.to_str())
214 .unwrap_or("db");
215 let escaped_alias = escape_sql_path(db_alias);
216
217 self.execute_command(&format!("ATTACH DATABASE {escaped_db} AS {escaped_alias}"))
218 .await?;
219
220 self.execute_command(&format!("SET search_path TO {escaped_alias}, public"))
221 .await?;
222 Ok(())
223 }
224
225 /// Returns the transport type name (e.g., "TCP", "gRPC").
226 pub fn transport_type(&self) -> &'static str {
227 self.transport.transport_type().as_str()
228 }
229
230 /// Returns true if this connection supports write operations.
231 pub fn supports_writes(&self) -> bool {
232 self.transport.supports_writes()
233 }
234
235 /// Returns the database path.
236 pub fn database(&self) -> Option<&str> {
237 self.database.as_deref()
238 }
239
240 // =========================================================================
241 // Command Execution
242 // =========================================================================
243
244 /// Executes a SQL command that doesn't return rows (async).
245 ///
246 /// Use for DDL statements (CREATE, DROP, ALTER) and DML statements
247 /// (INSERT, UPDATE, DELETE). Returns the number of affected rows (DML)
248 /// or 0 (DDL).
249 ///
250 /// # Errors
251 ///
252 /// - Returns [`Error::FeatureNotSupported`] on gRPC transports that do not yet
253 /// support write operations.
254 /// - Returns [`Error::Server`] if the SQL fails to parse or execute.
255 /// - Returns [`Error::Io`] on transport-level I/O failures.
256 pub async fn execute_command(&self, sql: &str) -> Result<u64> {
257 let token = self.stats_before_query(sql);
258 let result = self.transport.execute_command(sql).await;
259 self.stats_store_pending(token, sql);
260 result
261 }
262
263 /// Executes multiple SQL statements sequentially (async).
264 ///
265 /// If any statement fails, execution stops and the error is returned
266 /// wrapping the SQL preview for context.
267 ///
268 /// # Errors
269 ///
270 /// Returns an [`Error::Internal`] wrapping the first failing statement's
271 /// error; the wrapping message includes the statement's ordinal and
272 /// an 80-character SQL preview.
273 pub async fn execute_batch(&self, statements: &[&str]) -> Result<u64> {
274 let mut total = 0u64;
275 for (i, stmt) in statements.iter().enumerate() {
276 if !stmt.trim().is_empty() {
277 total += self.execute_command(stmt).await.map_err(|e| {
278 let preview: String = stmt.chars().take(80).collect();
279 Error::internal(format!(
280 "execute_batch failed at statement {} of {}: {}: {}",
281 i + 1,
282 statements.len(),
283 preview,
284 e,
285 ))
286 })?;
287 }
288 }
289 Ok(total)
290 }
291
292 // =========================================================================
293 // Query Execution (Streaming)
294 // =========================================================================
295
296 /// Executes a SQL query and returns a streaming [`AsyncRowset`] (async).
297 ///
298 /// Results are streamed in chunks so memory usage stays constant
299 /// regardless of result set size. See [`AsyncRowset`] for the row-level
300 /// API and collectors.
301 ///
302 /// # Errors
303 ///
304 /// - Returns [`Error::Server`] if the SQL is rejected by the server.
305 /// - Returns [`Error::Io`] on transport-level I/O failures while
306 /// opening the stream.
307 pub async fn execute_query(&self, query: &str) -> Result<AsyncRowset<'_>> {
308 let token = self.stats_before_query(query);
309 let result = self.transport.execute_query_streaming(query).await;
310 self.stats_store_pending(token, query);
311 result
312 }
313
314 /// Fetches a single row, erroring if the query returns zero rows.
315 ///
316 /// # Errors
317 ///
318 /// - Returns the error from [`execute_query`](Self::execute_query) if
319 /// the query fails.
320 /// - Returns [`Error::Conversion`] with message `"Query returned no rows"` if
321 /// the query produced zero rows.
322 pub async fn fetch_one<Q: AsRef<str>>(&self, query: Q) -> Result<Row> {
323 self.execute_query(query.as_ref())
324 .await?
325 .require_first_row()
326 .await
327 }
328
329 /// Fetches a single row, returning `None` if the query is empty.
330 ///
331 /// # Errors
332 ///
333 /// Returns the error from [`execute_query`](Self::execute_query) if the
334 /// query fails. An empty result set yields `Ok(None)`, not an error.
335 pub async fn fetch_optional<Q: AsRef<str>>(&self, query: Q) -> Result<Option<Row>> {
336 self.execute_query(query.as_ref()).await?.first_row().await
337 }
338
339 /// Fetches all rows from a query.
340 ///
341 /// # Errors
342 ///
343 /// Returns the error from [`execute_query`](Self::execute_query), or a
344 /// transport error produced while draining every chunk.
345 pub async fn fetch_all<Q: AsRef<str>>(&self, query: Q) -> Result<Vec<Row>> {
346 self.execute_query(query.as_ref())
347 .await?
348 .collect_rows()
349 .await
350 }
351
352 /// Fetches a single row and maps it to a struct using [`crate::FromRow`].
353 ///
354 /// # Errors
355 ///
356 /// - Returns the error from [`fetch_one`](Self::fetch_one).
357 /// - Returns whatever [`FromRow::from_row`](crate::FromRow::from_row)
358 /// produces when the row cannot be mapped.
359 pub async fn fetch_one_as<T: crate::FromRow>(&self, query: &str) -> Result<T> {
360 let row = self.fetch_one(query).await?;
361 let indices = row
362 .schema()
363 .map(crate::row_accessor::RowAccessor::build_indices)
364 .unwrap_or_default();
365 T::from_row(crate::RowAccessor::new(&row, &indices))
366 }
367
368 /// Fetches all rows and maps them to structs using [`crate::FromRow`].
369 ///
370 /// # Errors
371 ///
372 /// - Returns the error from [`fetch_all`](Self::fetch_all).
373 /// - Returns the first error produced by
374 /// [`FromRow::from_row`](crate::FromRow::from_row) on any row.
375 pub async fn fetch_all_as<T: crate::FromRow>(&self, query: &str) -> Result<Vec<T>> {
376 let rows = self.fetch_all(query).await?;
377 // Build the column-name → index lookup once from the first
378 // row's schema; reuse for every row.
379 let indices = rows
380 .first()
381 .and_then(crate::result::Row::schema)
382 .map(crate::row_accessor::RowAccessor::build_indices)
383 .unwrap_or_default();
384 rows.iter()
385 .map(|r| T::from_row(crate::RowAccessor::new(r, &indices)))
386 .collect()
387 }
388
389 /// Returns a lazy `Stream` over rows, mapping each to `T` via
390 /// [`FromRow`].
391 ///
392 /// This is the streaming variant of [`fetch_all_as`](Self::fetch_all_as):
393 /// memory usage is bounded by the chunk size (default 64K rows), not by
394 /// the total row count. Use this for large result sets where collecting
395 /// all rows into a `Vec` would exceed memory limits.
396 ///
397 /// The column-name → index lookup table is built exactly once (on the
398 /// first non-empty chunk) and reused for all rows, so per-row mapping is
399 /// O(1) in column count.
400 ///
401 /// # Example
402 ///
403 /// ```no_run
404 /// # use hyperdb_api::{AsyncConnection, CreateMode, FromRow, RowAccessor, Result};
405 /// # use futures::StreamExt;
406 /// # struct User { id: i32, name: String }
407 /// # impl FromRow for User {
408 /// # fn from_row(row: RowAccessor<'_>) -> Result<Self> {
409 /// # Ok(User { id: row.get("id")?, name: row.get("name")? })
410 /// # }
411 /// # }
412 /// # async fn example(conn: &AsyncConnection) -> Result<()> {
413 /// let stream = conn.stream_as::<User>("SELECT id, name FROM users");
414 /// tokio::pin!(stream);
415 /// while let Some(row_result) = stream.next().await {
416 /// let user = row_result?;
417 /// println!("{}: {}", user.id, user.name);
418 /// }
419 /// # Ok(())
420 /// # }
421 /// ```
422 ///
423 /// # Errors
424 ///
425 /// Each yielded item is a `Result<T>`:
426 /// - The first item will be `Err(e)` if query submission fails (parse
427 /// failures, server errors, transport failures). The stream is lazy and
428 /// does not execute the query until first polled.
429 /// - Subsequent items are `Ok(T)` if the row was successfully mapped via
430 /// `FromRow`, or `Err(e)` if mapping failed (missing column, type
431 /// mismatch, NULL in a non-optional field). These errors surface lazily
432 /// during iteration.
433 ///
434 /// [`FromRow`]: crate::FromRow
435 pub fn stream_as<'a, T: crate::FromRow + 'a>(
436 &'a self,
437 query: &str,
438 ) -> impl futures_core::Stream<Item = Result<T>> + 'a {
439 // Own the query string so the stream doesn't borrow the &str arg
440 // across await points.
441 let query = query.to_owned();
442 async_stream::try_stream! {
443 let mut rs = self.execute_query(&query).await?; // submit err → first Err item
444 let mut indices: Option<std::collections::HashMap<String, usize>> = None;
445 while let Some(chunk) = rs.next_chunk().await? {
446 // Build the name→index map once, on the first chunk, after
447 // next_chunk() has materialized the schema (TCP sends the
448 // RowDescription as the first stream message). If the schema is
449 // somehow unavailable, fall back to an empty map so per-row
450 // lookups surface a `Missing` error — matching `fetch_all_as`'s
451 // `unwrap_or_default()` and the sync `stream_as`, rather than
452 // silently skipping the chunk.
453 if indices.is_none() {
454 let map = rs
455 .schema()
456 .map(|schema| crate::RowAccessor::build_owned_indices(&schema))
457 .unwrap_or_default();
458 indices = Some(map);
459 }
460 let idx = indices.get_or_insert_with(Default::default);
461 for row in &chunk {
462 yield T::from_row(crate::RowAccessor::new_owned(row, idx))?;
463 }
464 }
465 }
466 }
467
468 /// Fetches a single row from a **parameterized** query and maps it to a
469 /// struct using [`FromRow`](crate::FromRow) (async).
470 ///
471 /// Parameterized counterpart to [`fetch_one_as`](Self::fetch_one_as): binds
472 /// `$1`, `$2`, … placeholders from `params` (via
473 /// [`ToSqlParam`](crate::params::ToSqlParam), exactly as
474 /// [`query_params`](Self::query_params)) and maps the first result row into
475 /// `T`.
476 ///
477 /// # Errors
478 ///
479 /// - Returns [`Error::FeatureNotSupported`] on gRPC transports (prepared
480 /// statements are TCP-only).
481 /// - Returns the error from [`query_params`](Self::query_params) if the
482 /// server rejects the statement, or on transport-level I/O failures.
483 /// - Returns [`Error::Conversion`] with message `"Query returned no rows"`
484 /// if the query produced zero rows.
485 /// - Returns whatever [`FromRow::from_row`](crate::FromRow::from_row)
486 /// produces when the row cannot be mapped.
487 pub async fn fetch_one_as_params<T: crate::FromRow>(
488 &self,
489 query: &str,
490 params: &[&dyn crate::params::ToSqlParam],
491 ) -> Result<T> {
492 let row = self
493 .query_params(query, params)
494 .await?
495 .require_first_row()
496 .await?;
497 let indices = row
498 .schema()
499 .map(crate::row_accessor::RowAccessor::build_indices)
500 .unwrap_or_default();
501 T::from_row(crate::RowAccessor::new(&row, &indices))
502 }
503
504 /// Fetches all rows from a **parameterized** query and maps them to structs
505 /// using [`FromRow`](crate::FromRow) (async).
506 ///
507 /// Parameterized counterpart to [`fetch_all_as`](Self::fetch_all_as): binds
508 /// `$1`, `$2`, … placeholders from `params` (see
509 /// [`query_params`](Self::query_params)) and maps every result row into `T`.
510 ///
511 /// # Errors
512 ///
513 /// - Returns [`Error::FeatureNotSupported`] on gRPC transports.
514 /// - Returns the error from [`query_params`](Self::query_params) if the
515 /// server rejects the statement, or on transport-level I/O failures.
516 /// - Returns the first error produced by
517 /// [`FromRow::from_row`](crate::FromRow::from_row) on any row.
518 pub async fn fetch_all_as_params<T: crate::FromRow>(
519 &self,
520 query: &str,
521 params: &[&dyn crate::params::ToSqlParam],
522 ) -> Result<Vec<T>> {
523 let rows = self
524 .query_params(query, params)
525 .await?
526 .collect_rows()
527 .await?;
528 // Build the column-name → index lookup once from the first row's
529 // schema; reuse for every row. See `fetch_all_as`.
530 let indices = rows
531 .first()
532 .and_then(crate::result::Row::schema)
533 .map(crate::row_accessor::RowAccessor::build_indices)
534 .unwrap_or_default();
535 rows.iter()
536 .map(|r| T::from_row(crate::RowAccessor::new(r, &indices)))
537 .collect()
538 }
539
540 /// Returns a lazy `Stream` over the rows of a **parameterized** query,
541 /// mapping each to `T` via [`FromRow`] (async).
542 ///
543 /// Parameterized counterpart to [`stream_as`](Self::stream_as): binds `$1`,
544 /// `$2`, … placeholders from `params` and streams the result with O(chunk)
545 /// memory, mapping each row into `T`. The column-index map is built once on
546 /// the first chunk and reused.
547 ///
548 /// # Errors
549 ///
550 /// Like [`stream_as`](Self::stream_as), this returns the `Stream` directly
551 /// (no outer `Result`) — the query is lazy and does not execute until first
552 /// polled. Each yielded item is a `Result<T>`:
553 /// - The **first** item is `Err(e)` if statement submission fails:
554 /// [`Error::FeatureNotSupported`] on gRPC transport, a `Parse`/`Bind`
555 /// rejection, or a transport failure. These surface as the first item,
556 /// *not* eagerly.
557 /// - Subsequent items are `Ok(T)` on a clean per-row mapping, or `Err(e)`
558 /// for a mapping failure (missing column, type mismatch, NULL in a
559 /// non-`Option` field) or a transport error hit on a later chunk.
560 ///
561 /// [`FromRow`]: crate::FromRow
562 pub fn stream_as_params<'a, T: crate::FromRow + 'a>(
563 &'a self,
564 query: &str,
565 params: &[&dyn crate::params::ToSqlParam],
566 ) -> impl futures_core::Stream<Item = Result<T>> + 'a {
567 // `&[&dyn ToSqlParam]` can't cross the `try_stream!` await points, so
568 // own the query string and encode params up front (encoding needs no
569 // connection). The prepare+execute sequence below mirrors
570 // `query_params` (see that method) — keep the two in sync if its
571 // Parse/Bind/Execute handling ever changes.
572 let query = query.to_owned();
573 let oids: Vec<crate::Oid> = params.iter().map(|p| p.sql_oid()).collect();
574 let encoded: Vec<Option<Vec<u8>>> = params.iter().map(|p| p.encode_param()).collect();
575 async_stream::try_stream! {
576 let client = match &self.transport {
577 AsyncTransport::Tcp(tcp) => &tcp.client,
578 AsyncTransport::Grpc(_) => {
579 // `?` inside try_stream! yields Err(e) and terminates the
580 // generator, so `unreachable!()` is dead code — its `!`
581 // type just satisfies the arm's need for a `&AsyncClient`
582 // (same type the Tcp arm produces).
583 Err(Error::feature_not_supported(
584 "prepared statements are not supported over gRPC transport",
585 ))?;
586 unreachable!()
587 }
588 };
589 let stmt = client.prepare_typed(&query, &oids).await?;
590 let stream = client
591 .execute_prepared_streaming(&stmt, encoded, crate::result::DEFAULT_BINARY_CHUNK_SIZE)
592 .await?;
593 let mut rs = AsyncRowset::from_prepared(stream).with_statement_guard(stmt);
594 // The Prepared path captures the schema at prepare time, so the
595 // column-name → index map is available immediately — build it once
596 // up front rather than deferring to the first chunk (the empty-map
597 // fallback matches `stream_as` / `fetch_all_as` if it is somehow
598 // unavailable, surfacing a per-row `Missing` error).
599 let idx = rs
600 .schema()
601 .map(|schema| crate::RowAccessor::build_owned_indices(&schema))
602 .unwrap_or_default();
603 while let Some(chunk) = rs.next_chunk().await? {
604 for row in &chunk {
605 yield T::from_row(crate::RowAccessor::new_owned(row, &idx))?;
606 }
607 }
608 }
609 }
610
611 /// Fetches a single non-NULL scalar value. Errors on empty / NULL.
612 ///
613 /// # Errors
614 ///
615 /// - Returns the error from [`execute_query`](Self::execute_query).
616 /// - Returns [`Error::Conversion`] with message `"Query returned no rows"` if
617 /// the query is empty.
618 /// - Returns [`Error::Conversion`] with message `"Scalar query returned NULL"`
619 /// if the first cell is SQL `NULL`.
620 pub async fn fetch_scalar<T, Q>(&self, query: Q) -> Result<T>
621 where
622 T: RowValue,
623 Q: AsRef<str>,
624 {
625 self.execute_query(query.as_ref())
626 .await?
627 .require_scalar()
628 .await
629 }
630
631 /// Fetches a single scalar value, allowing NULL (returns `None`).
632 ///
633 /// # Errors
634 ///
635 /// Returns the error from [`execute_query`](Self::execute_query). An
636 /// empty result still yields an error; SQL `NULL` in the first cell
637 /// yields `Ok(None)`.
638 pub async fn fetch_optional_scalar<T, Q>(&self, query: Q) -> Result<Option<T>>
639 where
640 T: RowValue,
641 Q: AsRef<str>,
642 {
643 self.execute_query(query.as_ref()).await?.scalar().await
644 }
645
646 /// Returns the count from a `SELECT COUNT(*)` style query, defaulting
647 /// to 0 on NULL.
648 ///
649 /// # Errors
650 ///
651 /// Returns the error from [`execute_query`](Self::execute_query) if the
652 /// query itself fails.
653 pub async fn query_count(&self, query: &str) -> Result<i64> {
654 let opt: Option<i64> = self.fetch_optional_scalar(query).await?;
655 Ok(opt.unwrap_or(0))
656 }
657
658 // =========================================================================
659 // Arrow Queries
660 // =========================================================================
661
662 /// Executes a SELECT query and returns results as Arrow IPC stream bytes (async).
663 ///
664 /// TCP uses `COPY ... TO STDOUT WITH (FORMAT ARROWSTREAM)`; gRPC uses
665 /// the native Arrow transport. Both return the same IPC stream shape.
666 ///
667 /// # Errors
668 ///
669 /// Propagates any [`Error::Server`] from the transport when the query
670 /// fails or the server cannot produce Arrow IPC output.
671 pub async fn execute_query_to_arrow(&self, sql: &str) -> Result<bytes::Bytes> {
672 self.transport.execute_query_to_arrow(sql).await
673 }
674
675 /// Exports an entire table to Arrow IPC stream format (async).
676 ///
677 /// # Errors
678 ///
679 /// See [`execute_query_to_arrow`](Self::execute_query_to_arrow).
680 pub async fn export_table_to_arrow(&self, table_name: &str) -> Result<bytes::Bytes> {
681 self.execute_query_to_arrow(&format!("SELECT * FROM {table_name}"))
682 .await
683 }
684
685 /// Executes a SELECT query and returns parsed Arrow `RecordBatch`es (async).
686 ///
687 /// # Errors
688 ///
689 /// - Returns [`Error::Server`] if the query fails.
690 /// - Returns [`Error::Conversion`] if the Arrow IPC payload cannot be
691 /// decoded into record batches.
692 pub async fn execute_query_to_batches(
693 &self,
694 sql: &str,
695 ) -> Result<Vec<arrow::record_batch::RecordBatch>> {
696 let arrow_data = self.execute_query_to_arrow(sql).await?;
697 crate::arrow_result::parse_arrow_ipc(arrow_data)
698 }
699
700 // =========================================================================
701 // Parameterized Queries
702 // =========================================================================
703
704 /// Executes a parameterized query with binary-encoded parameters (async).
705 ///
706 /// Mirrors the sync [`Connection::query_params`](crate::Connection::query_params);
707 /// see that method for the design rationale. Parameters travel through the
708 /// extended query protocol (Parse/Bind/Execute) in HyperBinary format — no
709 /// SQL escaping, full SQL-injection safety regardless of parameter content.
710 ///
711 /// # Errors
712 ///
713 /// - Returns [`Error::FeatureNotSupported`] on gRPC transports (prepared statements
714 /// are TCP-only).
715 /// - Returns [`Error::Server`] if the server rejects the statement at
716 /// `Parse`, `Bind`, or `Execute` time.
717 /// - Returns [`Error::Io`] on transport-level I/O failures.
718 pub async fn query_params(
719 &self,
720 query: &str,
721 params: &[&dyn crate::params::ToSqlParam],
722 ) -> Result<AsyncRowset<'_>> {
723 // Route through the extended query protocol. See
724 // [`Connection::query_params`] for the sync equivalent and the
725 // rationale behind the statement-guard pattern.
726 let client = match &self.transport {
727 AsyncTransport::Tcp(tcp) => &tcp.client,
728 AsyncTransport::Grpc(_) => {
729 return Err(Error::feature_not_supported(
730 "prepared statements are not supported over gRPC transport",
731 ));
732 }
733 };
734 let oids: Vec<crate::Oid> = params.iter().map(|p| p.sql_oid()).collect();
735 let stmt = client.prepare_typed(query, &oids).await?;
736 let encoded: Vec<Option<Vec<u8>>> = params.iter().map(|p| p.encode_param()).collect();
737 let stream = client
738 .execute_prepared_streaming(&stmt, encoded, crate::result::DEFAULT_BINARY_CHUNK_SIZE)
739 .await?;
740 Ok(AsyncRowset::from_prepared(stream).with_statement_guard(stmt))
741 }
742
743 /// Executes a parameterized command (INSERT / UPDATE / DELETE) with
744 /// binary-encoded parameters via Parse/Bind/Execute (async).
745 ///
746 /// # Errors
747 ///
748 /// - Returns [`Error::FeatureNotSupported`] on gRPC transports.
749 /// - Returns [`Error::Server`] if the server rejects the statement at
750 /// `Parse`, `Bind`, or `Execute` time.
751 /// - Returns [`Error::Io`] on transport-level I/O failures.
752 pub async fn command_params(
753 &self,
754 query: &str,
755 params: &[&dyn crate::params::ToSqlParam],
756 ) -> Result<u64> {
757 let client = match &self.transport {
758 AsyncTransport::Tcp(tcp) => &tcp.client,
759 AsyncTransport::Grpc(_) => {
760 return Err(Error::feature_not_supported(
761 "prepared statements are not supported over gRPC transport",
762 ));
763 }
764 };
765 let oids: Vec<crate::Oid> = params.iter().map(|p| p.sql_oid()).collect();
766 let stmt = client.prepare_typed(query, &oids).await?;
767 let encoded: Vec<Option<Vec<u8>>> = params.iter().map(|p| p.encode_param()).collect();
768 Ok(client.execute_prepared_no_result(&stmt, encoded).await?)
769 }
770
771 // =========================================================================
772 // Catalog / Database Management
773 // =========================================================================
774
775 /// Creates a new database file (async).
776 ///
777 /// # Errors
778 ///
779 /// Returns [`Error::Server`] if the server rejects
780 /// `CREATE DATABASE IF NOT EXISTS` (e.g. the path is not writable).
781 pub async fn create_database(&self, path: &str) -> Result<()> {
782 let sql = format!("CREATE DATABASE IF NOT EXISTS {}", escape_sql_path(path));
783 self.execute_command(&sql).await?;
784 Ok(())
785 }
786
787 /// Drops (deletes) a database file (async).
788 ///
789 /// # Errors
790 ///
791 /// Returns [`Error::Server`] if the server rejects
792 /// `DROP DATABASE IF EXISTS` (e.g. the database is still attached).
793 pub async fn drop_database(&self, path: &str) -> Result<()> {
794 let sql = format!("DROP DATABASE IF EXISTS {}", escape_sql_path(path));
795 self.execute_command(&sql).await?;
796 Ok(())
797 }
798
799 /// Attaches a database file to the connection (async).
800 ///
801 /// # Errors
802 ///
803 /// Returns [`Error::Server`] if the server rejects the
804 /// `ATTACH DATABASE` statement (file missing, permission denied,
805 /// alias conflict).
806 pub async fn attach_database(&self, path: &str, alias: Option<&str>) -> Result<()> {
807 let sql = if let Some(alias) = alias {
808 format!(
809 "ATTACH DATABASE {} AS {}",
810 escape_sql_path(path),
811 escape_sql_path(alias)
812 )
813 } else {
814 format!("ATTACH DATABASE {}", escape_sql_path(path))
815 };
816 self.execute_command(&sql).await?;
817 Ok(())
818 }
819
820 /// Detaches a database alias from this connection (async).
821 ///
822 /// # Errors
823 ///
824 /// Returns [`Error::Server`] if the alias is not attached or the
825 /// server cannot flush pending updates.
826 pub async fn detach_database(&self, alias: &str) -> Result<()> {
827 let sql = format!("DETACH DATABASE {}", escape_sql_path(alias));
828 self.execute_command(&sql).await?;
829 Ok(())
830 }
831
832 /// Detaches all databases from this connection (async).
833 ///
834 /// # Errors
835 ///
836 /// Returns [`Error::Server`] if the server rejects
837 /// `DETACH ALL DATABASES`.
838 pub async fn detach_all_databases(&self) -> Result<()> {
839 self.execute_command("DETACH ALL DATABASES").await?;
840 Ok(())
841 }
842
843 /// Copies a database file to a new path (async).
844 ///
845 /// # Errors
846 ///
847 /// Returns [`Error::Server`] if the server rejects the
848 /// `COPY DATABASE` statement — e.g. the source is not attached or the
849 /// destination path is not writable.
850 pub async fn copy_database(&self, source: &str, destination: &str) -> Result<()> {
851 let sql = format!(
852 "COPY DATABASE {} TO {}",
853 escape_sql_path(source),
854 escape_sql_path(destination)
855 );
856 self.execute_command(&sql).await?;
857 Ok(())
858 }
859
860 /// Creates a schema in the database (async).
861 ///
862 /// # Errors
863 ///
864 /// - Returns an error if `schema_name` cannot be converted to a
865 /// [`SchemaName`](crate::SchemaName).
866 /// - Returns [`Error::Server`] if the server rejects
867 /// `CREATE SCHEMA IF NOT EXISTS`.
868 pub async fn create_schema<T>(&self, schema_name: T) -> Result<()>
869 where
870 T: TryInto<crate::SchemaName>,
871 crate::Error: From<T::Error>,
872 {
873 let schema: crate::SchemaName = schema_name.try_into()?;
874 let sql = format!("CREATE SCHEMA IF NOT EXISTS {schema}");
875 self.execute_command(&sql).await?;
876 Ok(())
877 }
878
879 /// Checks whether a schema exists (async).
880 ///
881 /// # Errors
882 ///
883 /// - Returns an error if `schema` cannot be converted to a
884 /// [`SchemaName`](crate::SchemaName).
885 /// - Returns [`Error::Server`] if the catalog lookup query fails.
886 pub async fn has_schema<T>(&self, schema: T) -> Result<bool>
887 where
888 T: TryInto<crate::SchemaName>,
889 crate::Error: From<T::Error>,
890 {
891 let schema: crate::SchemaName = schema.try_into()?;
892 let db_prefix = if let Some(db) = schema.database() {
893 format!("{db}.")
894 } else {
895 String::new()
896 };
897 let sql = format!(
898 "SELECT 1 FROM {}pg_catalog.pg_namespace WHERE nspname = '{}'",
899 db_prefix,
900 schema.unescaped().replace('\'', "''")
901 );
902 Ok(self.fetch_optional(&sql).await?.is_some())
903 }
904
905 /// Checks whether a table exists (async).
906 ///
907 /// # Errors
908 ///
909 /// - Returns an error if `table_name` cannot be converted to a
910 /// [`TableName`](crate::TableName).
911 /// - Returns [`Error::Server`] if the catalog lookup query fails.
912 pub async fn has_table<T>(&self, table_name: T) -> Result<bool>
913 where
914 T: TryInto<crate::TableName>,
915 crate::Error: From<T::Error>,
916 {
917 let table: crate::TableName = table_name.try_into()?;
918 let schema = table
919 .schema()
920 .map_or("public", super::names::Name::unescaped);
921 let db_prefix = if let Some(db) = table.database() {
922 format!("{db}.")
923 } else {
924 String::new()
925 };
926 let sql = format!(
927 "SELECT 1 FROM {}pg_catalog.pg_tables WHERE schemaname = '{}' AND tablename = '{}'",
928 db_prefix,
929 schema.replace('\'', "''"),
930 table.table().unescaped().replace('\'', "''")
931 );
932 Ok(self.fetch_optional(&sql).await?.is_some())
933 }
934
935 /// Unloads the database from memory but keeps the session alive (async).
936 ///
937 /// # Errors
938 ///
939 /// Returns [`Error::Server`] if the server rejects `UNLOAD DATABASE`
940 /// (e.g. the database is in use by another session).
941 pub async fn unload_database(&self) -> Result<()> {
942 self.execute_command("UNLOAD DATABASE").await?;
943 Ok(())
944 }
945
946 /// Releases the database completely from the session (async).
947 ///
948 /// # Errors
949 ///
950 /// Returns [`Error::Server`] if the server rejects `UNLOAD RELEASE`,
951 /// most commonly because multiple databases are attached to the same
952 /// session.
953 pub async fn unload_release(&self) -> Result<()> {
954 self.execute_command("UNLOAD RELEASE").await?;
955 Ok(())
956 }
957
958 // =========================================================================
959 // Diagnostics / Explain
960 // =========================================================================
961
962 /// Executes EXPLAIN and returns the plan text (async).
963 ///
964 /// # Errors
965 ///
966 /// Returns [`Error::Server`] if `EXPLAIN <query>` fails to parse or plan.
967 pub async fn explain(&self, query: &str) -> Result<String> {
968 let sql = format!("EXPLAIN {query}");
969 let rows = self.fetch_all(&sql).await?;
970 let lines: Vec<String> = rows.iter().filter_map(|r| r.get::<String>(0)).collect();
971 Ok(lines.join("\n"))
972 }
973
974 /// Executes EXPLAIN ANALYZE and returns the plan with timing (async).
975 ///
976 /// # Errors
977 ///
978 /// Returns [`Error::Server`] if `EXPLAIN ANALYZE <query>` fails — this
979 /// includes any runtime error raised by actually executing `query`.
980 pub async fn explain_analyze(&self, query: &str) -> Result<String> {
981 let sql = format!("EXPLAIN ANALYZE {query}");
982 let rows = self.fetch_all(&sql).await?;
983 let lines: Vec<String> = rows.iter().filter_map(|r| r.get::<String>(0)).collect();
984 Ok(lines.join("\n"))
985 }
986
987 // =========================================================================
988 // Connection Introspection / Lifecycle
989 // =========================================================================
990
991 /// Returns true if the connection is alive (passive check).
992 pub fn is_alive(&self) -> bool {
993 match &self.transport {
994 AsyncTransport::Tcp(tcp) => tcp.client.is_alive(),
995 AsyncTransport::Grpc(_) => true,
996 }
997 }
998
999 /// Actively pings the server with `SELECT 1` (async).
1000 ///
1001 /// # Errors
1002 ///
1003 /// Returns [`Error::Server`] or [`Error::Io`] if the `SELECT 1`
1004 /// round-trip fails — i.e. the connection is no longer usable.
1005 pub async fn ping(&self) -> Result<()> {
1006 self.execute_command("SELECT 1").await?;
1007 Ok(())
1008 }
1009
1010 /// Returns the backend process ID, or 0 for gRPC transports.
1011 pub fn process_id(&self) -> i32 {
1012 match &self.transport {
1013 AsyncTransport::Tcp(tcp) => tcp.client.process_id(),
1014 AsyncTransport::Grpc(_) => 0,
1015 }
1016 }
1017
1018 /// Returns the secret key used for cancel requests, or 0 for gRPC.
1019 pub fn secret_key(&self) -> i32 {
1020 match &self.transport {
1021 AsyncTransport::Tcp(tcp) => tcp.client.secret_key(),
1022 AsyncTransport::Grpc(_) => 0,
1023 }
1024 }
1025
1026 /// Returns a server parameter value by name (async).
1027 pub async fn parameter_status(&self, name: &str) -> Option<String> {
1028 match &self.transport {
1029 AsyncTransport::Tcp(tcp) => tcp.client.parameter_status(name).await,
1030 AsyncTransport::Grpc(_) => None,
1031 }
1032 }
1033
1034 /// Returns the server version as a parsed struct (async).
1035 pub async fn server_version(&self) -> Option<crate::ServerVersion> {
1036 let version_str = self.parameter_status("server_version").await?;
1037 crate::ServerVersion::parse(&version_str)
1038 }
1039
1040 /// Sets the notice receiver callback for this connection.
1041 pub fn set_notice_receiver(
1042 &mut self,
1043 receiver: Option<Box<dyn Fn(hyperdb_api_core::client::Notice) + Send + Sync>>,
1044 ) {
1045 match &mut self.transport {
1046 AsyncTransport::Tcp(tcp) => tcp.client.set_notice_receiver(receiver),
1047 AsyncTransport::Grpc(_) => {}
1048 }
1049 }
1050
1051 /// Cancels the currently running query (async).
1052 ///
1053 /// # Errors
1054 ///
1055 /// - Returns [`Error::FeatureNotSupported`] on gRPC transports — cancellation is not
1056 /// yet implemented for gRPC.
1057 /// - Returns [`Error::Connection`] or [`Error::Io`] if the cancel-request
1058 /// connection to the server fails.
1059 pub async fn cancel(&self) -> Result<()> {
1060 self.transport.cancel().await
1061 }
1062
1063 /// Closes the connection gracefully, detaching any attached database first (async).
1064 ///
1065 /// # Errors
1066 ///
1067 /// - Returns [`Error::Internal`] wrapping the transport close failure if
1068 /// the client cannot be shut down cleanly.
1069 /// - Returns [`Error::Internal`] wrapping the detach failure if the
1070 /// attached database could not be detached but the transport close
1071 /// itself succeeded.
1072 pub async fn close(self) -> Result<()> {
1073 let detach_err = if let Some(ref db_path) = self.database {
1074 let db_alias = std::path::Path::new(db_path)
1075 .file_stem()
1076 .and_then(|s| s.to_str())
1077 .unwrap_or("db");
1078 self.execute_command(&format!("DETACH DATABASE {}", escape_sql_path(db_alias)))
1079 .await
1080 .err()
1081 } else {
1082 None
1083 };
1084
1085 let close_result = self.transport.close().await;
1086
1087 if let Err(e) = close_result {
1088 return Err(Error::internal(format!(
1089 "Failed to close async connection: {e}"
1090 )));
1091 }
1092
1093 if let Some(e) = detach_err {
1094 return Err(Error::internal(format!(
1095 "Failed to detach database during close: {e}"
1096 )));
1097 }
1098
1099 Ok(())
1100 }
1101
1102 /// Returns a reference to the underlying async TCP client (`None` for gRPC).
1103 ///
1104 /// Prefer the high-level `AsyncConnection` methods; this escape hatch
1105 /// remains for code that needs direct protocol access (e.g. custom
1106 /// COPY loops).
1107 pub fn async_tcp_client(&self) -> Option<&hyperdb_api_core::client::AsyncClient> {
1108 self.transport.async_tcp_client()
1109 }
1110
1111 /// Crate-internal accessor for the transport. Used by
1112 /// [`AsyncPreparedStatement`](crate::AsyncPreparedStatement) to reach
1113 /// the underlying `hyperdb_api_core::client::AsyncClient`.
1114 pub(crate) fn transport(&self) -> &AsyncTransport {
1115 &self.transport
1116 }
1117
1118 /// Prepares a SQL statement (async).
1119 ///
1120 /// See [`Connection::prepare`](crate::Connection::prepare) for
1121 /// semantics. The returned
1122 /// [`AsyncPreparedStatement`](crate::AsyncPreparedStatement) can be
1123 /// executed many times with different parameter values.
1124 ///
1125 /// # Errors
1126 ///
1127 /// See [`prepare_typed`](Self::prepare_typed) — this method delegates
1128 /// to it with an empty OID list.
1129 pub async fn prepare(&self, query: &str) -> Result<crate::AsyncPreparedStatement<'_>> {
1130 self.prepare_typed(query, &[]).await
1131 }
1132
1133 /// Prepares a SQL statement with explicit parameter type OIDs (async).
1134 ///
1135 /// # Errors
1136 ///
1137 /// - Returns [`Error::FeatureNotSupported`] on gRPC transports (prepared statements
1138 /// are TCP-only).
1139 /// - Returns [`Error::Server`] if the server rejects the `Parse`
1140 /// message (SQL syntax error, unknown OID).
1141 /// - Returns [`Error::Io`] on transport-level I/O failures.
1142 pub async fn prepare_typed(
1143 &self,
1144 query: &str,
1145 param_types: &[crate::Oid],
1146 ) -> Result<crate::AsyncPreparedStatement<'_>> {
1147 let client = match &self.transport {
1148 AsyncTransport::Tcp(tcp) => &tcp.client,
1149 AsyncTransport::Grpc(_) => {
1150 return Err(Error::feature_not_supported(
1151 "prepared statements are not supported over gRPC transport",
1152 ));
1153 }
1154 };
1155 let inner = client.prepare_typed(query, param_types).await?;
1156 crate::AsyncPreparedStatement::new(self, inner)
1157 }
1158
1159 /// Owned-handle variant of [`prepare`](Self::prepare). Returns a
1160 /// `'static`-lifetime [`AsyncPreparedStatementOwned`](crate::AsyncPreparedStatementOwned)
1161 /// that holds an `Arc`-cloned reference to `self`.
1162 ///
1163 /// Intended for N-API consumers and any other caller that needs
1164 /// the prepared statement to outlive the stack frame where the
1165 /// connection is held.
1166 ///
1167 /// # Errors
1168 ///
1169 /// See [`prepare_typed_arc`](Self::prepare_typed_arc).
1170 pub async fn prepare_arc(
1171 self: &Arc<Self>,
1172 query: &str,
1173 ) -> Result<crate::async_prepared::AsyncPreparedStatementOwned> {
1174 self.prepare_typed_arc(query, &[]).await
1175 }
1176
1177 /// Owned-handle variant of [`prepare_typed`](Self::prepare_typed).
1178 ///
1179 /// # Errors
1180 ///
1181 /// - Returns [`Error::FeatureNotSupported`] on gRPC transports.
1182 /// - Returns [`Error::Server`] if the server rejects the `Parse`
1183 /// message.
1184 /// - Returns [`Error::Io`] on transport-level I/O failures.
1185 pub async fn prepare_typed_arc(
1186 self: &Arc<Self>,
1187 query: &str,
1188 param_types: &[crate::Oid],
1189 ) -> Result<crate::async_prepared::AsyncPreparedStatementOwned> {
1190 let client = match &self.transport {
1191 AsyncTransport::Tcp(tcp) => &tcp.client,
1192 AsyncTransport::Grpc(_) => {
1193 return Err(Error::feature_not_supported(
1194 "prepared statements are not supported over gRPC transport",
1195 ));
1196 }
1197 };
1198 let inner = client.prepare_typed(query, param_types).await?;
1199 crate::async_prepared::AsyncPreparedStatementOwned::new(Arc::clone(self), inner)
1200 }
1201
1202 // =========================================================================
1203 // Query Statistics
1204 // =========================================================================
1205
1206 /// Enables query statistics collection for this connection.
1207 pub fn enable_query_stats(&self, provider: impl QueryStatsProvider + 'static) {
1208 if let Ok(mut guard) = self.stats_provider.lock() {
1209 *guard = Some(Arc::new(provider));
1210 }
1211 }
1212
1213 /// Disables query statistics collection.
1214 pub fn disable_query_stats(&self) {
1215 if let Ok(mut guard) = self.stats_provider.lock() {
1216 *guard = None;
1217 }
1218 if let Ok(mut guard) = self.pending_stats.lock() {
1219 *guard = None;
1220 }
1221 }
1222
1223 /// Returns the stats for the most recent query (if enabled).
1224 pub fn last_query_stats(&self) -> Option<QueryStats> {
1225 let provider = self.stats_provider.lock().ok()?.as_ref().cloned()?;
1226 let mut guard = self.pending_stats.lock().ok()?;
1227 let (token, sql) = guard.take()?;
1228 provider.after_query(token, &sql)
1229 }
1230
1231 fn stats_before_query(&self, sql: &str) -> Option<Box<dyn Any + Send>> {
1232 self.stats_provider
1233 .lock()
1234 .ok()?
1235 .as_ref()
1236 .map(|p| p.before_query(sql))
1237 }
1238
1239 fn stats_store_pending(&self, token: Option<Box<dyn Any + Send>>, sql: &str) {
1240 if let Some(token) = token {
1241 if let Ok(mut guard) = self.pending_stats.lock() {
1242 *guard = Some((token, sql.to_string()));
1243 }
1244 }
1245 }
1246}
1247
1248impl AsyncConnection {
1249 // =========================================================================
1250 // Transaction Control
1251 // =========================================================================
1252
1253 // -------------------------------------------------------------------
1254 // Raw transaction control (internal)
1255 // -------------------------------------------------------------------
1256 //
1257 // The `*_raw` methods below are `pub(crate)` and form the canonical
1258 // implementation of session-level transaction control. The RAII
1259 // guard at `crate::AsyncTransaction` and any internal helper that
1260 // genuinely needs `&self` (rather than the guard's `&mut self`)
1261 // delegate to these.
1262 //
1263 // The matching `pub` methods (`begin_transaction`, `commit`,
1264 // `rollback`) are thin `#[doc(hidden)] #[deprecated]` wrappers
1265 // retained only so any pre-existing downstream caller sees a
1266 // compiler warning rather than a hard break. They will be deleted
1267 // in a future release; the `_raw` methods stay.
1268
1269 /// Issues `BEGIN TRANSACTION`. Crate-internal use only.
1270 pub(crate) async fn begin_transaction_raw(&self) -> Result<()> {
1271 self.execute_command("BEGIN TRANSACTION").await?;
1272 Ok(())
1273 }
1274
1275 /// Issues `COMMIT`. Crate-internal use only.
1276 pub(crate) async fn commit_raw(&self) -> Result<()> {
1277 self.execute_command("COMMIT").await?;
1278 Ok(())
1279 }
1280
1281 /// Issues `ROLLBACK`. Crate-internal use only.
1282 pub(crate) async fn rollback_raw(&self) -> Result<()> {
1283 self.execute_command("ROLLBACK").await?;
1284 Ok(())
1285 }
1286
1287 /// Begins an explicit transaction (async).
1288 ///
1289 /// **Prefer [`transaction()`](Self::transaction)** — the RAII guard
1290 /// auto-rolls back on drop and cannot leak a half-open transaction
1291 /// across error paths. Hidden from generated rustdoc and
1292 /// deprecated; slated for removal in a future release.
1293 ///
1294 /// # Errors
1295 ///
1296 /// Returns [`Error::Server`] if the server rejects `BEGIN TRANSACTION`
1297 /// (e.g. a transaction is already open on this session).
1298 #[doc(hidden)]
1299 #[deprecated(
1300 note = "Use `AsyncConnection::transaction()` for an RAII guard. This method will be \
1301 removed in a future release."
1302 )]
1303 pub async fn begin_transaction(&self) -> Result<()> {
1304 self.begin_transaction_raw().await
1305 }
1306
1307 /// Commits the current transaction (async).
1308 ///
1309 /// **Prefer [`AsyncTransaction::commit`](crate::AsyncTransaction::commit)**
1310 /// on the RAII guard returned by [`transaction()`](Self::transaction).
1311 /// Hidden from generated rustdoc and deprecated; slated for removal.
1312 ///
1313 /// # Errors
1314 ///
1315 /// Returns [`Error::Server`] if the server rejects `COMMIT`.
1316 #[doc(hidden)]
1317 #[deprecated(note = "Use `AsyncTransaction::commit()` on the RAII guard from \
1318 `AsyncConnection::transaction()`. This method will be removed in a future release.")]
1319 pub async fn commit(&self) -> Result<()> {
1320 self.commit_raw().await
1321 }
1322
1323 /// Rolls back the current transaction (async).
1324 ///
1325 /// **Prefer [`AsyncTransaction::rollback`](crate::AsyncTransaction::rollback)**
1326 /// on the RAII guard returned by [`transaction()`](Self::transaction).
1327 /// Hidden from generated rustdoc and deprecated; slated for removal.
1328 ///
1329 /// # Errors
1330 ///
1331 /// Returns [`Error::Server`] if the server rejects `ROLLBACK`.
1332 #[doc(hidden)]
1333 #[deprecated(note = "Use `AsyncTransaction::rollback()` on the RAII guard from \
1334 `AsyncConnection::transaction()`. This method will be removed in a future release.")]
1335 pub async fn rollback(&self) -> Result<()> {
1336 self.rollback_raw().await
1337 }
1338
1339 /// Starts a transaction with an async RAII guard (async).
1340 ///
1341 /// # Errors
1342 ///
1343 /// Returns [`Error::Server`] if the internal `BEGIN` issued by
1344 /// [`AsyncTransaction::new`](crate::AsyncTransaction) fails.
1345 pub async fn transaction(&mut self) -> Result<crate::AsyncTransaction<'_>> {
1346 crate::AsyncTransaction::new(self).await
1347 }
1348}