Skip to main content

hyperdb_api/
async_connection.rs

1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Async connection to Hyper database.
5//!
6//! This module provides [`AsyncConnection`] the async version of [`Connection`](crate::Connection).
7//! Use this when you're already in an async runtime (tokio).
8
9use std::any::Any;
10use std::sync::{Arc, Mutex};
11
12use crate::async_result::AsyncRowset;
13use crate::async_transport::{AsyncTcpTransport, AsyncTransport};
14use crate::error::{Error, Result};
15use crate::names::escape_sql_path;
16use crate::query_stats::{QueryStats, QueryStatsProvider};
17use crate::result::{Row, RowValue};
18use crate::CreateMode;
19
20/// An async connection to a Hyper database.
21///
22/// This is the async equivalent of [`Connection`](crate::Connection), designed for use
23/// in tokio-based async applications. All I/O operations are non-blocking.
24///
25/// # Example
26///
27/// ```no_run
28/// use hyperdb_api::{AsyncConnection, CreateMode, Result};
29///
30/// #[tokio::main]
31/// async fn main() -> Result<()> {
32///     let conn = AsyncConnection::connect(
33///         "localhost:7483",
34///         "example.hyper",
35///         CreateMode::CreateIfNotExists,
36///     ).await?;
37///
38///     conn.execute_command("CREATE TABLE test (id INT)").await?;
39///     let count: i64 = conn.fetch_scalar("SELECT COUNT(*) FROM test").await?;
40///
41///     conn.close().await?;
42///     Ok(())
43/// }
44/// ```
45pub struct AsyncConnection {
46    transport: AsyncTransport,
47    database: Option<String>,
48    stats_provider: Mutex<Option<Arc<dyn QueryStatsProvider>>>,
49    pending_stats: Mutex<Option<(Box<dyn Any + Send>, String)>>,
50}
51
52impl std::fmt::Debug for AsyncConnection {
53    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54        f.debug_struct("AsyncConnection")
55            .field("database", &self.database)
56            .finish_non_exhaustive()
57    }
58}
59
60impl AsyncConnection {
61    /// Returns a fluent [`AsyncConnectionBuilder`](crate::AsyncConnectionBuilder)
62    /// pointed at `endpoint`.
63    #[must_use]
64    pub fn builder(endpoint: &str) -> crate::AsyncConnectionBuilder {
65        crate::AsyncConnectionBuilder::new(endpoint)
66    }
67
68    /// Connects to a Hyper server (async).
69    ///
70    /// Transport is auto-detected from the endpoint:
71    /// - `https://` or `http://` → gRPC transport
72    /// - Otherwise → TCP transport (`PostgreSQL` wire protocol)
73    ///
74    /// # Errors
75    ///
76    /// - Returns [`Error::Io`] / [`Error::Connection`] if the handshake with
77    ///   the server fails.
78    /// - Returns [`Error::Server`] if the `CreateMode` SQL (`CREATE`
79    ///   / `DROP` / `ATTACH`) is rejected by the server.
80    pub async fn connect(endpoint: &str, database: &str, mode: CreateMode) -> Result<Self> {
81        let transport = AsyncTransport::connect(endpoint, Some(database)).await?;
82        let conn = AsyncConnection {
83            transport,
84            database: Some(database.to_string()),
85            stats_provider: Mutex::new(None),
86            pending_stats: Mutex::new(None),
87        };
88
89        if conn.transport.supports_writes() {
90            conn.handle_creation_mode(database, mode).await?;
91            conn.attach_and_set_path(database).await?;
92        }
93
94        Ok(conn)
95    }
96
97    /// Connects with authentication (async).
98    ///
99    /// # Errors
100    ///
101    /// - Returns [`Error::Authentication`] if authentication is rejected.
102    /// - Returns [`Error::Io`] if the endpoint cannot be reached.
103    /// - Returns [`Error::Server`] if the `CreateMode` SQL is rejected.
104    pub async fn connect_with_auth(
105        endpoint: &str,
106        database: &str,
107        mode: CreateMode,
108        user: &str,
109        password: &str,
110    ) -> Result<Self> {
111        let transport = AsyncTransport::connect_tcp_with_auth(endpoint, user, password).await?;
112        let conn = AsyncConnection {
113            transport,
114            database: Some(database.to_string()),
115            stats_provider: Mutex::new(None),
116            pending_stats: Mutex::new(None),
117        };
118
119        conn.handle_creation_mode(database, mode).await?;
120        conn.attach_and_set_path(database).await?;
121
122        Ok(conn)
123    }
124
125    /// Connects to a server without attaching any database (async).
126    ///
127    /// Useful for running `CREATE DATABASE` / `DROP DATABASE` without an
128    /// active attachment.
129    ///
130    /// # Errors
131    ///
132    /// Returns [`Error::Io`] or [`Error::Connection`] if the TCP handshake
133    /// with `endpoint` fails.
134    pub async fn without_database(endpoint: &str) -> Result<Self> {
135        let transport = AsyncTransport::connect_tcp(endpoint).await?;
136        Ok(AsyncConnection {
137            transport,
138            database: None,
139            stats_provider: Mutex::new(None),
140            pending_stats: Mutex::new(None),
141        })
142    }
143
144    /// Builds an `AsyncConnection` from a pre-existing `AsyncClient` (TCP only).
145    #[must_use]
146    pub fn from_async_client(
147        client: hyperdb_api_core::client::AsyncClient,
148        database: Option<String>,
149    ) -> Self {
150        AsyncConnection {
151            transport: AsyncTransport::Tcp(AsyncTcpTransport { client }),
152            database,
153            stats_provider: Mutex::new(None),
154            pending_stats: Mutex::new(None),
155        }
156    }
157
158    /// Builds an `AsyncConnection` from a pre-constructed transport.
159    ///
160    /// Used by [`AsyncConnectionBuilder`](crate::AsyncConnectionBuilder) to
161    /// stitch together a gRPC transport after its own config construction.
162    pub(crate) fn from_transport(transport: AsyncTransport, database: Option<String>) -> Self {
163        AsyncConnection {
164            transport,
165            database,
166            stats_provider: Mutex::new(None),
167            pending_stats: Mutex::new(None),
168        }
169    }
170
171    /// Runs the configured `CreateMode` as SQL (crate-public for use by
172    /// [`AsyncConnectionBuilder`](crate::AsyncConnectionBuilder)).
173    pub(crate) async fn handle_creation_mode_public(
174        &self,
175        database: &str,
176        mode: CreateMode,
177    ) -> Result<()> {
178        self.handle_creation_mode(database, mode).await
179    }
180
181    /// Attaches the database and sets `search_path` (crate-public for use
182    /// by [`AsyncConnectionBuilder`](crate::AsyncConnectionBuilder)).
183    pub(crate) async fn attach_and_set_path_public(&self, database: &str) -> Result<()> {
184        self.attach_and_set_path(database).await
185    }
186
187    async fn handle_creation_mode(&self, database: &str, mode: CreateMode) -> Result<()> {
188        let escaped_db = escape_sql_path(database);
189        match mode {
190            CreateMode::Create => {
191                self.execute_command(&format!("CREATE DATABASE {escaped_db}"))
192                    .await?;
193            }
194            CreateMode::CreateIfNotExists => {
195                self.execute_command(&format!("CREATE DATABASE IF NOT EXISTS {escaped_db}"))
196                    .await?;
197            }
198            CreateMode::CreateAndReplace => {
199                self.execute_command(&format!("DROP DATABASE IF EXISTS {escaped_db}"))
200                    .await?;
201                self.execute_command(&format!("CREATE DATABASE {escaped_db}"))
202                    .await?;
203            }
204            CreateMode::DoNotCreate => {}
205        }
206        Ok(())
207    }
208
209    async fn attach_and_set_path(&self, database: &str) -> Result<()> {
210        let escaped_db = escape_sql_path(database);
211        let db_alias = std::path::Path::new(database)
212            .file_stem()
213            .and_then(|s| s.to_str())
214            .unwrap_or("db");
215        let escaped_alias = escape_sql_path(db_alias);
216
217        self.execute_command(&format!("ATTACH DATABASE {escaped_db} AS {escaped_alias}"))
218            .await?;
219
220        self.execute_command(&format!("SET search_path TO {escaped_alias}, public"))
221            .await?;
222        Ok(())
223    }
224
225    /// Returns the transport type name (e.g., "TCP", "gRPC").
226    pub fn transport_type(&self) -> &'static str {
227        self.transport.transport_type().as_str()
228    }
229
230    /// Returns true if this connection supports write operations.
231    pub fn supports_writes(&self) -> bool {
232        self.transport.supports_writes()
233    }
234
235    /// Returns the database path.
236    pub fn database(&self) -> Option<&str> {
237        self.database.as_deref()
238    }
239
240    // =========================================================================
241    // Command Execution
242    // =========================================================================
243
244    /// Executes a SQL command that doesn't return rows (async).
245    ///
246    /// Use for DDL statements (CREATE, DROP, ALTER) and DML statements
247    /// (INSERT, UPDATE, DELETE). Returns the number of affected rows (DML)
248    /// or 0 (DDL).
249    ///
250    /// # Errors
251    ///
252    /// - Returns [`Error::FeatureNotSupported`] on gRPC transports that do not yet
253    ///   support write operations.
254    /// - Returns [`Error::Server`] if the SQL fails to parse or execute.
255    /// - Returns [`Error::Io`] on transport-level I/O failures.
256    pub async fn execute_command(&self, sql: &str) -> Result<u64> {
257        let token = self.stats_before_query(sql);
258        let result = self.transport.execute_command(sql).await;
259        self.stats_store_pending(token, sql);
260        result
261    }
262
263    /// Executes multiple SQL statements sequentially (async).
264    ///
265    /// If any statement fails, execution stops and the error is returned
266    /// wrapping the SQL preview for context.
267    ///
268    /// # Errors
269    ///
270    /// Returns an [`Error::Internal`] wrapping the first failing statement's
271    /// error; the wrapping message includes the statement's ordinal and
272    /// an 80-character SQL preview.
273    pub async fn execute_batch(&self, statements: &[&str]) -> Result<u64> {
274        let mut total = 0u64;
275        for (i, stmt) in statements.iter().enumerate() {
276            if !stmt.trim().is_empty() {
277                total += self.execute_command(stmt).await.map_err(|e| {
278                    let preview: String = stmt.chars().take(80).collect();
279                    Error::internal(format!(
280                        "execute_batch failed at statement {} of {}: {}: {}",
281                        i + 1,
282                        statements.len(),
283                        preview,
284                        e,
285                    ))
286                })?;
287            }
288        }
289        Ok(total)
290    }
291
292    // =========================================================================
293    // Query Execution (Streaming)
294    // =========================================================================
295
296    /// Executes a SQL query and returns a streaming [`AsyncRowset`] (async).
297    ///
298    /// Results are streamed in chunks so memory usage stays constant
299    /// regardless of result set size. See [`AsyncRowset`] for the row-level
300    /// API and collectors.
301    ///
302    /// # Errors
303    ///
304    /// - Returns [`Error::Server`] if the SQL is rejected by the server.
305    /// - Returns [`Error::Io`] on transport-level I/O failures while
306    ///   opening the stream.
307    pub async fn execute_query(&self, query: &str) -> Result<AsyncRowset<'_>> {
308        let token = self.stats_before_query(query);
309        let result = self.transport.execute_query_streaming(query).await;
310        self.stats_store_pending(token, query);
311        result
312    }
313
314    /// Fetches a single row, erroring if the query returns zero rows.
315    ///
316    /// # Errors
317    ///
318    /// - Returns the error from [`execute_query`](Self::execute_query) if
319    ///   the query fails.
320    /// - Returns [`Error::Conversion`] with message `"Query returned no rows"` if
321    ///   the query produced zero rows.
322    pub async fn fetch_one<Q: AsRef<str>>(&self, query: Q) -> Result<Row> {
323        self.execute_query(query.as_ref())
324            .await?
325            .require_first_row()
326            .await
327    }
328
329    /// Fetches a single row, returning `None` if the query is empty.
330    ///
331    /// # Errors
332    ///
333    /// Returns the error from [`execute_query`](Self::execute_query) if the
334    /// query fails. An empty result set yields `Ok(None)`, not an error.
335    pub async fn fetch_optional<Q: AsRef<str>>(&self, query: Q) -> Result<Option<Row>> {
336        self.execute_query(query.as_ref()).await?.first_row().await
337    }
338
339    /// Fetches all rows from a query.
340    ///
341    /// # Errors
342    ///
343    /// Returns the error from [`execute_query`](Self::execute_query), or a
344    /// transport error produced while draining every chunk.
345    pub async fn fetch_all<Q: AsRef<str>>(&self, query: Q) -> Result<Vec<Row>> {
346        self.execute_query(query.as_ref())
347            .await?
348            .collect_rows()
349            .await
350    }
351
352    /// Fetches a single row and maps it to a struct using [`crate::FromRow`].
353    ///
354    /// # Errors
355    ///
356    /// - Returns the error from [`fetch_one`](Self::fetch_one).
357    /// - Returns whatever [`FromRow::from_row`](crate::FromRow::from_row)
358    ///   produces when the row cannot be mapped.
359    pub async fn fetch_one_as<T: crate::FromRow>(&self, query: &str) -> Result<T> {
360        let row = self.fetch_one(query).await?;
361        let indices = row
362            .schema()
363            .map(crate::row_accessor::RowAccessor::build_indices)
364            .unwrap_or_default();
365        T::from_row(crate::RowAccessor::new(&row, &indices))
366    }
367
368    /// Fetches all rows and maps them to structs using [`crate::FromRow`].
369    ///
370    /// # Errors
371    ///
372    /// - Returns the error from [`fetch_all`](Self::fetch_all).
373    /// - Returns the first error produced by
374    ///   [`FromRow::from_row`](crate::FromRow::from_row) on any row.
375    pub async fn fetch_all_as<T: crate::FromRow>(&self, query: &str) -> Result<Vec<T>> {
376        let rows = self.fetch_all(query).await?;
377        // Build the column-name → index lookup once from the first
378        // row's schema; reuse for every row.
379        let indices = rows
380            .first()
381            .and_then(crate::result::Row::schema)
382            .map(crate::row_accessor::RowAccessor::build_indices)
383            .unwrap_or_default();
384        rows.iter()
385            .map(|r| T::from_row(crate::RowAccessor::new(r, &indices)))
386            .collect()
387    }
388
389    /// Returns a lazy `Stream` over rows, mapping each to `T` via
390    /// [`FromRow`].
391    ///
392    /// This is the streaming variant of [`fetch_all_as`](Self::fetch_all_as):
393    /// memory usage is bounded by the chunk size (default 64K rows), not by
394    /// the total row count. Use this for large result sets where collecting
395    /// all rows into a `Vec` would exceed memory limits.
396    ///
397    /// The column-name → index lookup table is built exactly once (on the
398    /// first non-empty chunk) and reused for all rows, so per-row mapping is
399    /// O(1) in column count.
400    ///
401    /// # Example
402    ///
403    /// ```no_run
404    /// # use hyperdb_api::{AsyncConnection, CreateMode, FromRow, RowAccessor, Result};
405    /// # use futures::StreamExt;
406    /// # struct User { id: i32, name: String }
407    /// # impl FromRow for User {
408    /// #     fn from_row(row: RowAccessor<'_>) -> Result<Self> {
409    /// #         Ok(User { id: row.get("id")?, name: row.get("name")? })
410    /// #     }
411    /// # }
412    /// # async fn example(conn: &AsyncConnection) -> Result<()> {
413    /// let stream = conn.stream_as::<User>("SELECT id, name FROM users");
414    /// tokio::pin!(stream);
415    /// while let Some(row_result) = stream.next().await {
416    ///     let user = row_result?;
417    ///     println!("{}: {}", user.id, user.name);
418    /// }
419    /// # Ok(())
420    /// # }
421    /// ```
422    ///
423    /// # Errors
424    ///
425    /// Each yielded item is a `Result<T>`:
426    /// - The first item will be `Err(e)` if query submission fails (parse
427    ///   failures, server errors, transport failures). The stream is lazy and
428    ///   does not execute the query until first polled.
429    /// - Subsequent items are `Ok(T)` if the row was successfully mapped via
430    ///   `FromRow`, or `Err(e)` if mapping failed (missing column, type
431    ///   mismatch, NULL in a non-optional field). These errors surface lazily
432    ///   during iteration.
433    ///
434    /// [`FromRow`]: crate::FromRow
435    pub fn stream_as<'a, T: crate::FromRow + 'a>(
436        &'a self,
437        query: &str,
438    ) -> impl futures_core::Stream<Item = Result<T>> + 'a {
439        // Own the query string so the stream doesn't borrow the &str arg
440        // across await points.
441        let query = query.to_owned();
442        async_stream::try_stream! {
443            let mut rs = self.execute_query(&query).await?;   // submit err → first Err item
444            let mut indices: Option<std::collections::HashMap<String, usize>> = None;
445            while let Some(chunk) = rs.next_chunk().await? {
446                // Build the name→index map once, on the first chunk, after
447                // next_chunk() has materialized the schema (TCP sends the
448                // RowDescription as the first stream message). If the schema is
449                // somehow unavailable, fall back to an empty map so per-row
450                // lookups surface a `Missing` error — matching `fetch_all_as`'s
451                // `unwrap_or_default()` and the sync `stream_as`, rather than
452                // silently skipping the chunk.
453                if indices.is_none() {
454                    let map = rs
455                        .schema()
456                        .map(|schema| crate::RowAccessor::build_owned_indices(&schema))
457                        .unwrap_or_default();
458                    indices = Some(map);
459                }
460                let idx = indices.get_or_insert_with(Default::default);
461                for row in &chunk {
462                    yield T::from_row(crate::RowAccessor::new_owned(row, idx))?;
463                }
464            }
465        }
466    }
467
468    /// Fetches a single row from a **parameterized** query and maps it to a
469    /// struct using [`FromRow`](crate::FromRow) (async).
470    ///
471    /// Parameterized counterpart to [`fetch_one_as`](Self::fetch_one_as): binds
472    /// `$1`, `$2`, … placeholders from `params` (via
473    /// [`ToSqlParam`](crate::params::ToSqlParam), exactly as
474    /// [`query_params`](Self::query_params)) and maps the first result row into
475    /// `T`.
476    ///
477    /// # Errors
478    ///
479    /// - Returns [`Error::FeatureNotSupported`] on gRPC transports (prepared
480    ///   statements are TCP-only).
481    /// - Returns the error from [`query_params`](Self::query_params) if the
482    ///   server rejects the statement, or on transport-level I/O failures.
483    /// - Returns [`Error::Conversion`] with message `"Query returned no rows"`
484    ///   if the query produced zero rows.
485    /// - Returns whatever [`FromRow::from_row`](crate::FromRow::from_row)
486    ///   produces when the row cannot be mapped.
487    pub async fn fetch_one_as_params<T: crate::FromRow>(
488        &self,
489        query: &str,
490        params: &[&dyn crate::params::ToSqlParam],
491    ) -> Result<T> {
492        let row = self
493            .query_params(query, params)
494            .await?
495            .require_first_row()
496            .await?;
497        let indices = row
498            .schema()
499            .map(crate::row_accessor::RowAccessor::build_indices)
500            .unwrap_or_default();
501        T::from_row(crate::RowAccessor::new(&row, &indices))
502    }
503
504    /// Fetches all rows from a **parameterized** query and maps them to structs
505    /// using [`FromRow`](crate::FromRow) (async).
506    ///
507    /// Parameterized counterpart to [`fetch_all_as`](Self::fetch_all_as): binds
508    /// `$1`, `$2`, … placeholders from `params` (see
509    /// [`query_params`](Self::query_params)) and maps every result row into `T`.
510    ///
511    /// # Errors
512    ///
513    /// - Returns [`Error::FeatureNotSupported`] on gRPC transports.
514    /// - Returns the error from [`query_params`](Self::query_params) if the
515    ///   server rejects the statement, or on transport-level I/O failures.
516    /// - Returns the first error produced by
517    ///   [`FromRow::from_row`](crate::FromRow::from_row) on any row.
518    pub async fn fetch_all_as_params<T: crate::FromRow>(
519        &self,
520        query: &str,
521        params: &[&dyn crate::params::ToSqlParam],
522    ) -> Result<Vec<T>> {
523        let rows = self
524            .query_params(query, params)
525            .await?
526            .collect_rows()
527            .await?;
528        // Build the column-name → index lookup once from the first row's
529        // schema; reuse for every row. See `fetch_all_as`.
530        let indices = rows
531            .first()
532            .and_then(crate::result::Row::schema)
533            .map(crate::row_accessor::RowAccessor::build_indices)
534            .unwrap_or_default();
535        rows.iter()
536            .map(|r| T::from_row(crate::RowAccessor::new(r, &indices)))
537            .collect()
538    }
539
540    /// Returns a lazy `Stream` over the rows of a **parameterized** query,
541    /// mapping each to `T` via [`FromRow`] (async).
542    ///
543    /// Parameterized counterpart to [`stream_as`](Self::stream_as): binds `$1`,
544    /// `$2`, … placeholders from `params` and streams the result with O(chunk)
545    /// memory, mapping each row into `T`. The column-index map is built once on
546    /// the first chunk and reused.
547    ///
548    /// # Errors
549    ///
550    /// Like [`stream_as`](Self::stream_as), this returns the `Stream` directly
551    /// (no outer `Result`) — the query is lazy and does not execute until first
552    /// polled. Each yielded item is a `Result<T>`:
553    /// - The **first** item is `Err(e)` if statement submission fails:
554    ///   [`Error::FeatureNotSupported`] on gRPC transport, a `Parse`/`Bind`
555    ///   rejection, or a transport failure. These surface as the first item,
556    ///   *not* eagerly.
557    /// - Subsequent items are `Ok(T)` on a clean per-row mapping, or `Err(e)`
558    ///   for a mapping failure (missing column, type mismatch, NULL in a
559    ///   non-`Option` field) or a transport error hit on a later chunk.
560    ///
561    /// [`FromRow`]: crate::FromRow
562    pub fn stream_as_params<'a, T: crate::FromRow + 'a>(
563        &'a self,
564        query: &str,
565        params: &[&dyn crate::params::ToSqlParam],
566    ) -> impl futures_core::Stream<Item = Result<T>> + 'a {
567        // `&[&dyn ToSqlParam]` can't cross the `try_stream!` await points, so
568        // own the query string and encode params up front (encoding needs no
569        // connection). The prepare+execute sequence below mirrors
570        // `query_params` (see that method) — keep the two in sync if its
571        // Parse/Bind/Execute handling ever changes.
572        let query = query.to_owned();
573        let oids: Vec<crate::Oid> = params.iter().map(|p| p.sql_oid()).collect();
574        let encoded: Vec<Option<Vec<u8>>> = params.iter().map(|p| p.encode_param()).collect();
575        async_stream::try_stream! {
576            let client = match &self.transport {
577                AsyncTransport::Tcp(tcp) => &tcp.client,
578                AsyncTransport::Grpc(_) => {
579                    // `?` inside try_stream! yields Err(e) and terminates the
580                    // generator, so `unreachable!()` is dead code — its `!`
581                    // type just satisfies the arm's need for a `&AsyncClient`
582                    // (same type the Tcp arm produces).
583                    Err(Error::feature_not_supported(
584                        "prepared statements are not supported over gRPC transport",
585                    ))?;
586                    unreachable!()
587                }
588            };
589            let stmt = client.prepare_typed(&query, &oids).await?;
590            let stream = client
591                .execute_prepared_streaming(&stmt, encoded, crate::result::DEFAULT_BINARY_CHUNK_SIZE)
592                .await?;
593            let mut rs = AsyncRowset::from_prepared(stream).with_statement_guard(stmt);
594            // The Prepared path captures the schema at prepare time, so the
595            // column-name → index map is available immediately — build it once
596            // up front rather than deferring to the first chunk (the empty-map
597            // fallback matches `stream_as` / `fetch_all_as` if it is somehow
598            // unavailable, surfacing a per-row `Missing` error).
599            let idx = rs
600                .schema()
601                .map(|schema| crate::RowAccessor::build_owned_indices(&schema))
602                .unwrap_or_default();
603            while let Some(chunk) = rs.next_chunk().await? {
604                for row in &chunk {
605                    yield T::from_row(crate::RowAccessor::new_owned(row, &idx))?;
606                }
607            }
608        }
609    }
610
611    /// Fetches a single non-NULL scalar value. Errors on empty / NULL.
612    ///
613    /// # Errors
614    ///
615    /// - Returns the error from [`execute_query`](Self::execute_query).
616    /// - Returns [`Error::Conversion`] with message `"Query returned no rows"` if
617    ///   the query is empty.
618    /// - Returns [`Error::Conversion`] with message `"Scalar query returned NULL"`
619    ///   if the first cell is SQL `NULL`.
620    pub async fn fetch_scalar<T, Q>(&self, query: Q) -> Result<T>
621    where
622        T: RowValue,
623        Q: AsRef<str>,
624    {
625        self.execute_query(query.as_ref())
626            .await?
627            .require_scalar()
628            .await
629    }
630
631    /// Fetches a single scalar value, allowing NULL (returns `None`).
632    ///
633    /// # Errors
634    ///
635    /// Returns the error from [`execute_query`](Self::execute_query). An
636    /// empty result still yields an error; SQL `NULL` in the first cell
637    /// yields `Ok(None)`.
638    pub async fn fetch_optional_scalar<T, Q>(&self, query: Q) -> Result<Option<T>>
639    where
640        T: RowValue,
641        Q: AsRef<str>,
642    {
643        self.execute_query(query.as_ref()).await?.scalar().await
644    }
645
646    /// Returns the count from a `SELECT COUNT(*)` style query, defaulting
647    /// to 0 on NULL.
648    ///
649    /// # Errors
650    ///
651    /// Returns the error from [`execute_query`](Self::execute_query) if the
652    /// query itself fails.
653    pub async fn query_count(&self, query: &str) -> Result<i64> {
654        let opt: Option<i64> = self.fetch_optional_scalar(query).await?;
655        Ok(opt.unwrap_or(0))
656    }
657
658    // =========================================================================
659    // Arrow Queries
660    // =========================================================================
661
662    /// Executes a SELECT query and returns results as Arrow IPC stream bytes (async).
663    ///
664    /// TCP uses `COPY ... TO STDOUT WITH (FORMAT ARROWSTREAM)`; gRPC uses
665    /// the native Arrow transport. Both return the same IPC stream shape.
666    ///
667    /// # Errors
668    ///
669    /// Propagates any [`Error::Server`] from the transport when the query
670    /// fails or the server cannot produce Arrow IPC output.
671    pub async fn execute_query_to_arrow(&self, sql: &str) -> Result<bytes::Bytes> {
672        self.transport.execute_query_to_arrow(sql).await
673    }
674
675    /// Exports an entire table to Arrow IPC stream format (async).
676    ///
677    /// # Errors
678    ///
679    /// See [`execute_query_to_arrow`](Self::execute_query_to_arrow).
680    pub async fn export_table_to_arrow(&self, table_name: &str) -> Result<bytes::Bytes> {
681        self.execute_query_to_arrow(&format!("SELECT * FROM {table_name}"))
682            .await
683    }
684
685    /// Executes a SELECT query and returns parsed Arrow `RecordBatch`es (async).
686    ///
687    /// # Errors
688    ///
689    /// - Returns [`Error::Server`] if the query fails.
690    /// - Returns [`Error::Conversion`] if the Arrow IPC payload cannot be
691    ///   decoded into record batches.
692    pub async fn execute_query_to_batches(
693        &self,
694        sql: &str,
695    ) -> Result<Vec<arrow::record_batch::RecordBatch>> {
696        let arrow_data = self.execute_query_to_arrow(sql).await?;
697        crate::arrow_result::parse_arrow_ipc(arrow_data)
698    }
699
700    // =========================================================================
701    // Parameterized Queries
702    // =========================================================================
703
704    /// Executes a parameterized query with binary-encoded parameters (async).
705    ///
706    /// Mirrors the sync [`Connection::query_params`](crate::Connection::query_params);
707    /// see that method for the design rationale. Parameters travel through the
708    /// extended query protocol (Parse/Bind/Execute) in HyperBinary format — no
709    /// SQL escaping, full SQL-injection safety regardless of parameter content.
710    ///
711    /// # Errors
712    ///
713    /// - Returns [`Error::FeatureNotSupported`] on gRPC transports (prepared statements
714    ///   are TCP-only).
715    /// - Returns [`Error::Server`] if the server rejects the statement at
716    ///   `Parse`, `Bind`, or `Execute` time.
717    /// - Returns [`Error::Io`] on transport-level I/O failures.
718    pub async fn query_params(
719        &self,
720        query: &str,
721        params: &[&dyn crate::params::ToSqlParam],
722    ) -> Result<AsyncRowset<'_>> {
723        // Route through the extended query protocol. See
724        // [`Connection::query_params`] for the sync equivalent and the
725        // rationale behind the statement-guard pattern.
726        let client = match &self.transport {
727            AsyncTransport::Tcp(tcp) => &tcp.client,
728            AsyncTransport::Grpc(_) => {
729                return Err(Error::feature_not_supported(
730                    "prepared statements are not supported over gRPC transport",
731                ));
732            }
733        };
734        let oids: Vec<crate::Oid> = params.iter().map(|p| p.sql_oid()).collect();
735        let stmt = client.prepare_typed(query, &oids).await?;
736        let encoded: Vec<Option<Vec<u8>>> = params.iter().map(|p| p.encode_param()).collect();
737        let stream = client
738            .execute_prepared_streaming(&stmt, encoded, crate::result::DEFAULT_BINARY_CHUNK_SIZE)
739            .await?;
740        Ok(AsyncRowset::from_prepared(stream).with_statement_guard(stmt))
741    }
742
743    /// Executes a parameterized command (INSERT / UPDATE / DELETE) with
744    /// binary-encoded parameters via Parse/Bind/Execute (async).
745    ///
746    /// # Errors
747    ///
748    /// - Returns [`Error::FeatureNotSupported`] on gRPC transports.
749    /// - Returns [`Error::Server`] if the server rejects the statement at
750    ///   `Parse`, `Bind`, or `Execute` time.
751    /// - Returns [`Error::Io`] on transport-level I/O failures.
752    pub async fn command_params(
753        &self,
754        query: &str,
755        params: &[&dyn crate::params::ToSqlParam],
756    ) -> Result<u64> {
757        let client = match &self.transport {
758            AsyncTransport::Tcp(tcp) => &tcp.client,
759            AsyncTransport::Grpc(_) => {
760                return Err(Error::feature_not_supported(
761                    "prepared statements are not supported over gRPC transport",
762                ));
763            }
764        };
765        let oids: Vec<crate::Oid> = params.iter().map(|p| p.sql_oid()).collect();
766        let stmt = client.prepare_typed(query, &oids).await?;
767        let encoded: Vec<Option<Vec<u8>>> = params.iter().map(|p| p.encode_param()).collect();
768        Ok(client.execute_prepared_no_result(&stmt, encoded).await?)
769    }
770
771    // =========================================================================
772    // Catalog / Database Management
773    // =========================================================================
774
775    /// Creates a new database file (async).
776    ///
777    /// # Errors
778    ///
779    /// Returns [`Error::Server`] if the server rejects
780    /// `CREATE DATABASE IF NOT EXISTS` (e.g. the path is not writable).
781    pub async fn create_database(&self, path: &str) -> Result<()> {
782        let sql = format!("CREATE DATABASE IF NOT EXISTS {}", escape_sql_path(path));
783        self.execute_command(&sql).await?;
784        Ok(())
785    }
786
787    /// Drops (deletes) a database file (async).
788    ///
789    /// # Errors
790    ///
791    /// Returns [`Error::Server`] if the server rejects
792    /// `DROP DATABASE IF EXISTS` (e.g. the database is still attached).
793    pub async fn drop_database(&self, path: &str) -> Result<()> {
794        let sql = format!("DROP DATABASE IF EXISTS {}", escape_sql_path(path));
795        self.execute_command(&sql).await?;
796        Ok(())
797    }
798
799    /// Attaches a database file to the connection (async).
800    ///
801    /// # Errors
802    ///
803    /// Returns [`Error::Server`] if the server rejects the
804    /// `ATTACH DATABASE` statement (file missing, permission denied,
805    /// alias conflict).
806    pub async fn attach_database(&self, path: &str, alias: Option<&str>) -> Result<()> {
807        let sql = if let Some(alias) = alias {
808            format!(
809                "ATTACH DATABASE {} AS {}",
810                escape_sql_path(path),
811                escape_sql_path(alias)
812            )
813        } else {
814            format!("ATTACH DATABASE {}", escape_sql_path(path))
815        };
816        self.execute_command(&sql).await?;
817        Ok(())
818    }
819
820    /// Detaches a database alias from this connection (async).
821    ///
822    /// # Errors
823    ///
824    /// Returns [`Error::Server`] if the alias is not attached or the
825    /// server cannot flush pending updates.
826    pub async fn detach_database(&self, alias: &str) -> Result<()> {
827        let sql = format!("DETACH DATABASE {}", escape_sql_path(alias));
828        self.execute_command(&sql).await?;
829        Ok(())
830    }
831
832    /// Detaches all databases from this connection (async).
833    ///
834    /// # Errors
835    ///
836    /// Returns [`Error::Server`] if the server rejects
837    /// `DETACH ALL DATABASES`.
838    pub async fn detach_all_databases(&self) -> Result<()> {
839        self.execute_command("DETACH ALL DATABASES").await?;
840        Ok(())
841    }
842
843    /// Copies a database file to a new path (async).
844    ///
845    /// # Errors
846    ///
847    /// Returns [`Error::Server`] if the server rejects the
848    /// `COPY DATABASE` statement — e.g. the source is not attached or the
849    /// destination path is not writable.
850    pub async fn copy_database(&self, source: &str, destination: &str) -> Result<()> {
851        let sql = format!(
852            "COPY DATABASE {} TO {}",
853            escape_sql_path(source),
854            escape_sql_path(destination)
855        );
856        self.execute_command(&sql).await?;
857        Ok(())
858    }
859
860    /// Creates a schema in the database (async).
861    ///
862    /// # Errors
863    ///
864    /// - Returns an error if `schema_name` cannot be converted to a
865    ///   [`SchemaName`](crate::SchemaName).
866    /// - Returns [`Error::Server`] if the server rejects
867    ///   `CREATE SCHEMA IF NOT EXISTS`.
868    pub async fn create_schema<T>(&self, schema_name: T) -> Result<()>
869    where
870        T: TryInto<crate::SchemaName>,
871        crate::Error: From<T::Error>,
872    {
873        let schema: crate::SchemaName = schema_name.try_into()?;
874        let sql = format!("CREATE SCHEMA IF NOT EXISTS {schema}");
875        self.execute_command(&sql).await?;
876        Ok(())
877    }
878
879    /// Checks whether a schema exists (async).
880    ///
881    /// # Errors
882    ///
883    /// - Returns an error if `schema` cannot be converted to a
884    ///   [`SchemaName`](crate::SchemaName).
885    /// - Returns [`Error::Server`] if the catalog lookup query fails.
886    pub async fn has_schema<T>(&self, schema: T) -> Result<bool>
887    where
888        T: TryInto<crate::SchemaName>,
889        crate::Error: From<T::Error>,
890    {
891        let schema: crate::SchemaName = schema.try_into()?;
892        let db_prefix = if let Some(db) = schema.database() {
893            format!("{db}.")
894        } else {
895            String::new()
896        };
897        let sql = format!(
898            "SELECT 1 FROM {}pg_catalog.pg_namespace WHERE nspname = '{}'",
899            db_prefix,
900            schema.unescaped().replace('\'', "''")
901        );
902        Ok(self.fetch_optional(&sql).await?.is_some())
903    }
904
905    /// Checks whether a table exists (async).
906    ///
907    /// # Errors
908    ///
909    /// - Returns an error if `table_name` cannot be converted to a
910    ///   [`TableName`](crate::TableName).
911    /// - Returns [`Error::Server`] if the catalog lookup query fails.
912    pub async fn has_table<T>(&self, table_name: T) -> Result<bool>
913    where
914        T: TryInto<crate::TableName>,
915        crate::Error: From<T::Error>,
916    {
917        let table: crate::TableName = table_name.try_into()?;
918        let schema = table
919            .schema()
920            .map_or("public", super::names::Name::unescaped);
921        let db_prefix = if let Some(db) = table.database() {
922            format!("{db}.")
923        } else {
924            String::new()
925        };
926        let sql = format!(
927            "SELECT 1 FROM {}pg_catalog.pg_tables WHERE schemaname = '{}' AND tablename = '{}'",
928            db_prefix,
929            schema.replace('\'', "''"),
930            table.table().unescaped().replace('\'', "''")
931        );
932        Ok(self.fetch_optional(&sql).await?.is_some())
933    }
934
935    /// Unloads the database from memory but keeps the session alive (async).
936    ///
937    /// # Errors
938    ///
939    /// Returns [`Error::Server`] if the server rejects `UNLOAD DATABASE`
940    /// (e.g. the database is in use by another session).
941    pub async fn unload_database(&self) -> Result<()> {
942        self.execute_command("UNLOAD DATABASE").await?;
943        Ok(())
944    }
945
946    /// Releases the database completely from the session (async).
947    ///
948    /// # Errors
949    ///
950    /// Returns [`Error::Server`] if the server rejects `UNLOAD RELEASE`,
951    /// most commonly because multiple databases are attached to the same
952    /// session.
953    pub async fn unload_release(&self) -> Result<()> {
954        self.execute_command("UNLOAD RELEASE").await?;
955        Ok(())
956    }
957
958    // =========================================================================
959    // Diagnostics / Explain
960    // =========================================================================
961
962    /// Executes EXPLAIN and returns the plan text (async).
963    ///
964    /// # Errors
965    ///
966    /// Returns [`Error::Server`] if `EXPLAIN <query>` fails to parse or plan.
967    pub async fn explain(&self, query: &str) -> Result<String> {
968        let sql = format!("EXPLAIN {query}");
969        let rows = self.fetch_all(&sql).await?;
970        let lines: Vec<String> = rows.iter().filter_map(|r| r.get::<String>(0)).collect();
971        Ok(lines.join("\n"))
972    }
973
974    /// Executes EXPLAIN ANALYZE and returns the plan with timing (async).
975    ///
976    /// # Errors
977    ///
978    /// Returns [`Error::Server`] if `EXPLAIN ANALYZE <query>` fails — this
979    /// includes any runtime error raised by actually executing `query`.
980    pub async fn explain_analyze(&self, query: &str) -> Result<String> {
981        let sql = format!("EXPLAIN ANALYZE {query}");
982        let rows = self.fetch_all(&sql).await?;
983        let lines: Vec<String> = rows.iter().filter_map(|r| r.get::<String>(0)).collect();
984        Ok(lines.join("\n"))
985    }
986
987    // =========================================================================
988    // Connection Introspection / Lifecycle
989    // =========================================================================
990
991    /// Returns true if the connection is alive (passive check).
992    pub fn is_alive(&self) -> bool {
993        match &self.transport {
994            AsyncTransport::Tcp(tcp) => tcp.client.is_alive(),
995            AsyncTransport::Grpc(_) => true,
996        }
997    }
998
999    /// Actively pings the server with `SELECT 1` (async).
1000    ///
1001    /// # Errors
1002    ///
1003    /// Returns [`Error::Server`] or [`Error::Io`] if the `SELECT 1`
1004    /// round-trip fails — i.e. the connection is no longer usable.
1005    pub async fn ping(&self) -> Result<()> {
1006        self.execute_command("SELECT 1").await?;
1007        Ok(())
1008    }
1009
1010    /// Returns the backend process ID, or 0 for gRPC transports.
1011    pub fn process_id(&self) -> i32 {
1012        match &self.transport {
1013            AsyncTransport::Tcp(tcp) => tcp.client.process_id(),
1014            AsyncTransport::Grpc(_) => 0,
1015        }
1016    }
1017
1018    /// Returns the secret key used for cancel requests, or 0 for gRPC.
1019    pub fn secret_key(&self) -> i32 {
1020        match &self.transport {
1021            AsyncTransport::Tcp(tcp) => tcp.client.secret_key(),
1022            AsyncTransport::Grpc(_) => 0,
1023        }
1024    }
1025
1026    /// Returns a server parameter value by name (async).
1027    pub async fn parameter_status(&self, name: &str) -> Option<String> {
1028        match &self.transport {
1029            AsyncTransport::Tcp(tcp) => tcp.client.parameter_status(name).await,
1030            AsyncTransport::Grpc(_) => None,
1031        }
1032    }
1033
1034    /// Returns the server version as a parsed struct (async).
1035    pub async fn server_version(&self) -> Option<crate::ServerVersion> {
1036        let version_str = self.parameter_status("server_version").await?;
1037        crate::ServerVersion::parse(&version_str)
1038    }
1039
1040    /// Sets the notice receiver callback for this connection.
1041    pub fn set_notice_receiver(
1042        &mut self,
1043        receiver: Option<Box<dyn Fn(hyperdb_api_core::client::Notice) + Send + Sync>>,
1044    ) {
1045        match &mut self.transport {
1046            AsyncTransport::Tcp(tcp) => tcp.client.set_notice_receiver(receiver),
1047            AsyncTransport::Grpc(_) => {}
1048        }
1049    }
1050
1051    /// Cancels the currently running query (async).
1052    ///
1053    /// # Errors
1054    ///
1055    /// - Returns [`Error::FeatureNotSupported`] on gRPC transports — cancellation is not
1056    ///   yet implemented for gRPC.
1057    /// - Returns [`Error::Connection`] or [`Error::Io`] if the cancel-request
1058    ///   connection to the server fails.
1059    pub async fn cancel(&self) -> Result<()> {
1060        self.transport.cancel().await
1061    }
1062
1063    /// Closes the connection gracefully, detaching any attached database first (async).
1064    ///
1065    /// # Errors
1066    ///
1067    /// - Returns [`Error::Internal`] wrapping the transport close failure if
1068    ///   the client cannot be shut down cleanly.
1069    /// - Returns [`Error::Internal`] wrapping the detach failure if the
1070    ///   attached database could not be detached but the transport close
1071    ///   itself succeeded.
1072    pub async fn close(self) -> Result<()> {
1073        let detach_err = if let Some(ref db_path) = self.database {
1074            let db_alias = std::path::Path::new(db_path)
1075                .file_stem()
1076                .and_then(|s| s.to_str())
1077                .unwrap_or("db");
1078            self.execute_command(&format!("DETACH DATABASE {}", escape_sql_path(db_alias)))
1079                .await
1080                .err()
1081        } else {
1082            None
1083        };
1084
1085        let close_result = self.transport.close().await;
1086
1087        if let Err(e) = close_result {
1088            return Err(Error::internal(format!(
1089                "Failed to close async connection: {e}"
1090            )));
1091        }
1092
1093        if let Some(e) = detach_err {
1094            return Err(Error::internal(format!(
1095                "Failed to detach database during close: {e}"
1096            )));
1097        }
1098
1099        Ok(())
1100    }
1101
1102    /// Returns a reference to the underlying async TCP client (`None` for gRPC).
1103    ///
1104    /// Prefer the high-level `AsyncConnection` methods; this escape hatch
1105    /// remains for code that needs direct protocol access (e.g. custom
1106    /// COPY loops).
1107    pub fn async_tcp_client(&self) -> Option<&hyperdb_api_core::client::AsyncClient> {
1108        self.transport.async_tcp_client()
1109    }
1110
1111    /// Crate-internal accessor for the transport. Used by
1112    /// [`AsyncPreparedStatement`](crate::AsyncPreparedStatement) to reach
1113    /// the underlying `hyperdb_api_core::client::AsyncClient`.
1114    pub(crate) fn transport(&self) -> &AsyncTransport {
1115        &self.transport
1116    }
1117
1118    /// Prepares a SQL statement (async).
1119    ///
1120    /// See [`Connection::prepare`](crate::Connection::prepare) for
1121    /// semantics. The returned
1122    /// [`AsyncPreparedStatement`](crate::AsyncPreparedStatement) can be
1123    /// executed many times with different parameter values.
1124    ///
1125    /// # Errors
1126    ///
1127    /// See [`prepare_typed`](Self::prepare_typed) — this method delegates
1128    /// to it with an empty OID list.
1129    pub async fn prepare(&self, query: &str) -> Result<crate::AsyncPreparedStatement<'_>> {
1130        self.prepare_typed(query, &[]).await
1131    }
1132
1133    /// Prepares a SQL statement with explicit parameter type OIDs (async).
1134    ///
1135    /// # Errors
1136    ///
1137    /// - Returns [`Error::FeatureNotSupported`] on gRPC transports (prepared statements
1138    ///   are TCP-only).
1139    /// - Returns [`Error::Server`] if the server rejects the `Parse`
1140    ///   message (SQL syntax error, unknown OID).
1141    /// - Returns [`Error::Io`] on transport-level I/O failures.
1142    pub async fn prepare_typed(
1143        &self,
1144        query: &str,
1145        param_types: &[crate::Oid],
1146    ) -> Result<crate::AsyncPreparedStatement<'_>> {
1147        let client = match &self.transport {
1148            AsyncTransport::Tcp(tcp) => &tcp.client,
1149            AsyncTransport::Grpc(_) => {
1150                return Err(Error::feature_not_supported(
1151                    "prepared statements are not supported over gRPC transport",
1152                ));
1153            }
1154        };
1155        let inner = client.prepare_typed(query, param_types).await?;
1156        crate::AsyncPreparedStatement::new(self, inner)
1157    }
1158
1159    /// Owned-handle variant of [`prepare`](Self::prepare). Returns a
1160    /// `'static`-lifetime [`AsyncPreparedStatementOwned`](crate::AsyncPreparedStatementOwned)
1161    /// that holds an `Arc`-cloned reference to `self`.
1162    ///
1163    /// Intended for N-API consumers and any other caller that needs
1164    /// the prepared statement to outlive the stack frame where the
1165    /// connection is held.
1166    ///
1167    /// # Errors
1168    ///
1169    /// See [`prepare_typed_arc`](Self::prepare_typed_arc).
1170    pub async fn prepare_arc(
1171        self: &Arc<Self>,
1172        query: &str,
1173    ) -> Result<crate::async_prepared::AsyncPreparedStatementOwned> {
1174        self.prepare_typed_arc(query, &[]).await
1175    }
1176
1177    /// Owned-handle variant of [`prepare_typed`](Self::prepare_typed).
1178    ///
1179    /// # Errors
1180    ///
1181    /// - Returns [`Error::FeatureNotSupported`] on gRPC transports.
1182    /// - Returns [`Error::Server`] if the server rejects the `Parse`
1183    ///   message.
1184    /// - Returns [`Error::Io`] on transport-level I/O failures.
1185    pub async fn prepare_typed_arc(
1186        self: &Arc<Self>,
1187        query: &str,
1188        param_types: &[crate::Oid],
1189    ) -> Result<crate::async_prepared::AsyncPreparedStatementOwned> {
1190        let client = match &self.transport {
1191            AsyncTransport::Tcp(tcp) => &tcp.client,
1192            AsyncTransport::Grpc(_) => {
1193                return Err(Error::feature_not_supported(
1194                    "prepared statements are not supported over gRPC transport",
1195                ));
1196            }
1197        };
1198        let inner = client.prepare_typed(query, param_types).await?;
1199        crate::async_prepared::AsyncPreparedStatementOwned::new(Arc::clone(self), inner)
1200    }
1201
1202    // =========================================================================
1203    // Query Statistics
1204    // =========================================================================
1205
1206    /// Enables query statistics collection for this connection.
1207    pub fn enable_query_stats(&self, provider: impl QueryStatsProvider + 'static) {
1208        if let Ok(mut guard) = self.stats_provider.lock() {
1209            *guard = Some(Arc::new(provider));
1210        }
1211    }
1212
1213    /// Disables query statistics collection.
1214    pub fn disable_query_stats(&self) {
1215        if let Ok(mut guard) = self.stats_provider.lock() {
1216            *guard = None;
1217        }
1218        if let Ok(mut guard) = self.pending_stats.lock() {
1219            *guard = None;
1220        }
1221    }
1222
1223    /// Returns the stats for the most recent query (if enabled).
1224    pub fn last_query_stats(&self) -> Option<QueryStats> {
1225        let provider = self.stats_provider.lock().ok()?.as_ref().cloned()?;
1226        let mut guard = self.pending_stats.lock().ok()?;
1227        let (token, sql) = guard.take()?;
1228        provider.after_query(token, &sql)
1229    }
1230
1231    fn stats_before_query(&self, sql: &str) -> Option<Box<dyn Any + Send>> {
1232        self.stats_provider
1233            .lock()
1234            .ok()?
1235            .as_ref()
1236            .map(|p| p.before_query(sql))
1237    }
1238
1239    fn stats_store_pending(&self, token: Option<Box<dyn Any + Send>>, sql: &str) {
1240        if let Some(token) = token {
1241            if let Ok(mut guard) = self.pending_stats.lock() {
1242                *guard = Some((token, sql.to_string()));
1243            }
1244        }
1245    }
1246}
1247
1248impl AsyncConnection {
1249    // =========================================================================
1250    // Transaction Control
1251    // =========================================================================
1252
1253    // -------------------------------------------------------------------
1254    // Raw transaction control (internal)
1255    // -------------------------------------------------------------------
1256    //
1257    // The `*_raw` methods below are `pub(crate)` and form the canonical
1258    // implementation of session-level transaction control. The RAII
1259    // guard at `crate::AsyncTransaction` and any internal helper that
1260    // genuinely needs `&self` (rather than the guard's `&mut self`)
1261    // delegate to these.
1262    //
1263    // The matching `pub` methods (`begin_transaction`, `commit`,
1264    // `rollback`) are thin `#[doc(hidden)] #[deprecated]` wrappers
1265    // retained only so any pre-existing downstream caller sees a
1266    // compiler warning rather than a hard break. They will be deleted
1267    // in a future release; the `_raw` methods stay.
1268
1269    /// Issues `BEGIN TRANSACTION`. Crate-internal use only.
1270    pub(crate) async fn begin_transaction_raw(&self) -> Result<()> {
1271        self.execute_command("BEGIN TRANSACTION").await?;
1272        Ok(())
1273    }
1274
1275    /// Issues `COMMIT`. Crate-internal use only.
1276    pub(crate) async fn commit_raw(&self) -> Result<()> {
1277        self.execute_command("COMMIT").await?;
1278        Ok(())
1279    }
1280
1281    /// Issues `ROLLBACK`. Crate-internal use only.
1282    pub(crate) async fn rollback_raw(&self) -> Result<()> {
1283        self.execute_command("ROLLBACK").await?;
1284        Ok(())
1285    }
1286
1287    /// Begins an explicit transaction (async).
1288    ///
1289    /// **Prefer [`transaction()`](Self::transaction)** — the RAII guard
1290    /// auto-rolls back on drop and cannot leak a half-open transaction
1291    /// across error paths. Hidden from generated rustdoc and
1292    /// deprecated; slated for removal in a future release.
1293    ///
1294    /// # Errors
1295    ///
1296    /// Returns [`Error::Server`] if the server rejects `BEGIN TRANSACTION`
1297    /// (e.g. a transaction is already open on this session).
1298    #[doc(hidden)]
1299    #[deprecated(
1300        note = "Use `AsyncConnection::transaction()` for an RAII guard. This method will be \
1301                removed in a future release."
1302    )]
1303    pub async fn begin_transaction(&self) -> Result<()> {
1304        self.begin_transaction_raw().await
1305    }
1306
1307    /// Commits the current transaction (async).
1308    ///
1309    /// **Prefer [`AsyncTransaction::commit`](crate::AsyncTransaction::commit)**
1310    /// on the RAII guard returned by [`transaction()`](Self::transaction).
1311    /// Hidden from generated rustdoc and deprecated; slated for removal.
1312    ///
1313    /// # Errors
1314    ///
1315    /// Returns [`Error::Server`] if the server rejects `COMMIT`.
1316    #[doc(hidden)]
1317    #[deprecated(note = "Use `AsyncTransaction::commit()` on the RAII guard from \
1318                `AsyncConnection::transaction()`. This method will be removed in a future release.")]
1319    pub async fn commit(&self) -> Result<()> {
1320        self.commit_raw().await
1321    }
1322
1323    /// Rolls back the current transaction (async).
1324    ///
1325    /// **Prefer [`AsyncTransaction::rollback`](crate::AsyncTransaction::rollback)**
1326    /// on the RAII guard returned by [`transaction()`](Self::transaction).
1327    /// Hidden from generated rustdoc and deprecated; slated for removal.
1328    ///
1329    /// # Errors
1330    ///
1331    /// Returns [`Error::Server`] if the server rejects `ROLLBACK`.
1332    #[doc(hidden)]
1333    #[deprecated(note = "Use `AsyncTransaction::rollback()` on the RAII guard from \
1334                `AsyncConnection::transaction()`. This method will be removed in a future release.")]
1335    pub async fn rollback(&self) -> Result<()> {
1336        self.rollback_raw().await
1337    }
1338
1339    /// Starts a transaction with an async RAII guard (async).
1340    ///
1341    /// # Errors
1342    ///
1343    /// Returns [`Error::Server`] if the internal `BEGIN` issued by
1344    /// [`AsyncTransaction::new`](crate::AsyncTransaction) fails.
1345    pub async fn transaction(&mut self) -> Result<crate::AsyncTransaction<'_>> {
1346        crate::AsyncTransaction::new(self).await
1347    }
1348}