Skip to main content

hyperdb_api/
async_connection.rs

1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Async connection to Hyper database.
5//!
6//! This module provides [`AsyncConnection`], the async version of [`Connection`](crate::Connection).
7//! Use this when you're already in an async runtime (tokio).
8
9use std::any::Any;
10use std::sync::{Arc, Mutex};
11
12use crate::async_result::AsyncRowset;
13use crate::async_transport::{AsyncTcpTransport, AsyncTransport};
14use crate::error::{Error, Result};
15use crate::names::escape_sql_path;
16use crate::query_stats::{QueryStats, QueryStatsProvider};
17use crate::result::{Row, RowValue};
18use crate::CreateMode;
19
20/// An async connection to a Hyper database.
21///
22/// This is the async equivalent of [`Connection`](crate::Connection), designed for use
23/// in tokio-based async applications. All I/O operations are non-blocking.
24///
25/// # Example
26///
27/// ```no_run
28/// use hyperdb_api::{AsyncConnection, CreateMode, Result};
29///
30/// #[tokio::main]
31/// async fn main() -> Result<()> {
32///     let conn = AsyncConnection::connect(
33///         "localhost:7483",
34///         "example.hyper",
35///         CreateMode::CreateIfNotExists,
36///     ).await?;
37///
38///     conn.execute_command("CREATE TABLE test (id INT)").await?;
39///     let count: i64 = conn.fetch_scalar("SELECT COUNT(*) FROM test").await?;
40///
41///     conn.close().await?;
42///     Ok(())
43/// }
44/// ```
45pub struct AsyncConnection {
46    transport: AsyncTransport,
47    database: Option<String>,
48    stats_provider: Mutex<Option<Arc<dyn QueryStatsProvider>>>,
49    pending_stats: Mutex<Option<(Box<dyn Any + Send>, String)>>,
50}
51
52impl std::fmt::Debug for AsyncConnection {
53    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54        f.debug_struct("AsyncConnection")
55            .field("database", &self.database)
56            .finish_non_exhaustive()
57    }
58}
59
60impl AsyncConnection {
61    /// Returns a fluent [`AsyncConnectionBuilder`](crate::AsyncConnectionBuilder)
62    /// pointed at `endpoint`.
63    #[must_use]
64    pub fn builder(endpoint: &str) -> crate::AsyncConnectionBuilder {
65        crate::AsyncConnectionBuilder::new(endpoint)
66    }
67
68    /// Connects to a Hyper server (async).
69    ///
70    /// Transport is auto-detected from the endpoint:
71    /// - `https://` or `http://` → gRPC transport
72    /// - Otherwise → TCP transport (`PostgreSQL` wire protocol)
73    ///
74    /// # Errors
75    ///
76    /// - Returns [`Error::Io`] / [`Error::Client`] if the handshake with
77    ///   the server fails.
78    /// - Returns [`Error::Client`] if the `CreateMode` SQL (`CREATE`
79    ///   / `DROP` / `ATTACH`) is rejected by the server.
80    pub async fn connect(endpoint: &str, database: &str, mode: CreateMode) -> Result<Self> {
81        let transport = AsyncTransport::connect(endpoint, Some(database)).await?;
82        let conn = AsyncConnection {
83            transport,
84            database: Some(database.to_string()),
85            stats_provider: Mutex::new(None),
86            pending_stats: Mutex::new(None),
87        };
88
89        if conn.transport.supports_writes() {
90            conn.handle_creation_mode(database, mode).await?;
91            conn.attach_and_set_path(database).await?;
92        }
93
94        Ok(conn)
95    }
96
97    /// Connects with authentication (async).
98    ///
99    /// # Errors
100    ///
101    /// - Returns [`Error::Client`] if authentication is rejected.
102    /// - Returns [`Error::Io`] if the endpoint cannot be reached.
103    /// - Returns [`Error::Client`] if the `CreateMode` SQL is rejected.
104    pub async fn connect_with_auth(
105        endpoint: &str,
106        database: &str,
107        mode: CreateMode,
108        user: &str,
109        password: &str,
110    ) -> Result<Self> {
111        let transport = AsyncTransport::connect_tcp_with_auth(endpoint, user, password).await?;
112        let conn = AsyncConnection {
113            transport,
114            database: Some(database.to_string()),
115            stats_provider: Mutex::new(None),
116            pending_stats: Mutex::new(None),
117        };
118
119        conn.handle_creation_mode(database, mode).await?;
120        conn.attach_and_set_path(database).await?;
121
122        Ok(conn)
123    }
124
125    /// Connects to a server without attaching any database (async).
126    ///
127    /// Useful for running `CREATE DATABASE` / `DROP DATABASE` without an
128    /// active attachment.
129    ///
130    /// # Errors
131    ///
132    /// Returns [`Error::Io`] or [`Error::Client`] if the TCP handshake
133    /// with `endpoint` fails.
134    pub async fn without_database(endpoint: &str) -> Result<Self> {
135        let transport = AsyncTransport::connect_tcp(endpoint).await?;
136        Ok(AsyncConnection {
137            transport,
138            database: None,
139            stats_provider: Mutex::new(None),
140            pending_stats: Mutex::new(None),
141        })
142    }
143
144    /// Builds an `AsyncConnection` from a pre-existing `AsyncClient` (TCP only).
145    #[must_use]
146    pub fn from_async_client(
147        client: hyperdb_api_core::client::AsyncClient,
148        database: Option<String>,
149    ) -> Self {
150        AsyncConnection {
151            transport: AsyncTransport::Tcp(AsyncTcpTransport { client }),
152            database,
153            stats_provider: Mutex::new(None),
154            pending_stats: Mutex::new(None),
155        }
156    }
157
158    /// Builds an `AsyncConnection` from a pre-constructed transport.
159    ///
160    /// Used by [`AsyncConnectionBuilder`](crate::AsyncConnectionBuilder) to
161    /// stitch together a gRPC transport after its own config construction.
162    pub(crate) fn from_transport(transport: AsyncTransport, database: Option<String>) -> Self {
163        AsyncConnection {
164            transport,
165            database,
166            stats_provider: Mutex::new(None),
167            pending_stats: Mutex::new(None),
168        }
169    }
170
171    /// Runs the configured `CreateMode` as SQL (crate-public for use by
172    /// [`AsyncConnectionBuilder`](crate::AsyncConnectionBuilder)).
173    pub(crate) async fn handle_creation_mode_public(
174        &self,
175        database: &str,
176        mode: CreateMode,
177    ) -> Result<()> {
178        self.handle_creation_mode(database, mode).await
179    }
180
181    /// Attaches the database and sets `search_path` (crate-public for use
182    /// by [`AsyncConnectionBuilder`](crate::AsyncConnectionBuilder)).
183    pub(crate) async fn attach_and_set_path_public(&self, database: &str) -> Result<()> {
184        self.attach_and_set_path(database).await
185    }
186
187    async fn handle_creation_mode(&self, database: &str, mode: CreateMode) -> Result<()> {
188        let escaped_db = escape_sql_path(database);
189        match mode {
190            CreateMode::Create => {
191                self.execute_command(&format!("CREATE DATABASE {escaped_db}"))
192                    .await?;
193            }
194            CreateMode::CreateIfNotExists => {
195                self.execute_command(&format!("CREATE DATABASE IF NOT EXISTS {escaped_db}"))
196                    .await?;
197            }
198            CreateMode::CreateAndReplace => {
199                self.execute_command(&format!("DROP DATABASE IF EXISTS {escaped_db}"))
200                    .await?;
201                self.execute_command(&format!("CREATE DATABASE {escaped_db}"))
202                    .await?;
203            }
204            CreateMode::DoNotCreate => {}
205        }
206        Ok(())
207    }
208
209    async fn attach_and_set_path(&self, database: &str) -> Result<()> {
210        let escaped_db = escape_sql_path(database);
211        let db_alias = std::path::Path::new(database)
212            .file_stem()
213            .and_then(|s| s.to_str())
214            .unwrap_or("db");
215        let escaped_alias = escape_sql_path(db_alias);
216
217        self.execute_command(&format!("ATTACH DATABASE {escaped_db} AS {escaped_alias}"))
218            .await?;
219
220        self.execute_command(&format!("SET search_path TO {escaped_alias}, public"))
221            .await?;
222        Ok(())
223    }
224
225    /// Returns the transport type name (e.g., "TCP", "gRPC").
226    pub fn transport_type(&self) -> &'static str {
227        self.transport.transport_type().as_str()
228    }
229
230    /// Returns true if this connection supports write operations.
231    pub fn supports_writes(&self) -> bool {
232        self.transport.supports_writes()
233    }
234
235    /// Returns the database path.
236    pub fn database(&self) -> Option<&str> {
237        self.database.as_deref()
238    }
239
240    // =========================================================================
241    // Command Execution
242    // =========================================================================
243
244    /// Executes a SQL command that doesn't return rows (async).
245    ///
246    /// Use for DDL statements (CREATE, DROP, ALTER) and DML statements
247    /// (INSERT, UPDATE, DELETE). Returns the number of affected rows (DML)
248    /// or 0 (DDL).
249    ///
250    /// # Errors
251    ///
252    /// - Returns [`Error::Other`] on gRPC transports that do not yet
253    ///   support write operations.
254    /// - Returns [`Error::Client`] if the SQL fails to parse or execute.
255    /// - Returns [`Error::Io`] on transport-level I/O failures.
256    pub async fn execute_command(&self, sql: &str) -> Result<u64> {
257        let token = self.stats_before_query(sql);
258        let result = self.transport.execute_command(sql).await;
259        self.stats_store_pending(token, sql);
260        result
261    }
262
263    /// Executes multiple SQL statements sequentially (async).
264    ///
265    /// If any statement fails, execution stops and the error is returned
266    /// wrapping the SQL preview for context.
267    ///
268    /// # Errors
269    ///
270    /// Returns an [`Error::Other`] wrapping the first failing statement's
271    /// error; the wrapping message includes the statement's ordinal and
272    /// an 80-character SQL preview.
273    pub async fn execute_batch(&self, statements: &[&str]) -> Result<u64> {
274        let mut total = 0u64;
275        for (i, stmt) in statements.iter().enumerate() {
276            if !stmt.trim().is_empty() {
277                total += self.execute_command(stmt).await.map_err(|e| {
278                    let preview: String = stmt.chars().take(80).collect();
279                    Error::with_cause(
280                        format!(
281                            "execute_batch failed at statement {} of {}: {}",
282                            i + 1,
283                            statements.len(),
284                            preview,
285                        ),
286                        e,
287                    )
288                })?;
289            }
290        }
291        Ok(total)
292    }
293
294    // =========================================================================
295    // Query Execution (Streaming)
296    // =========================================================================
297
298    /// Executes a SQL query and returns a streaming [`AsyncRowset`] (async).
299    ///
300    /// Results are streamed in chunks so memory usage stays constant
301    /// regardless of result set size. See [`AsyncRowset`] for the row-level
302    /// API and collectors.
303    ///
304    /// # Errors
305    ///
306    /// - Returns [`Error::Client`] if the SQL is rejected by the server.
307    /// - Returns [`Error::Io`] on transport-level I/O failures while
308    ///   opening the stream.
309    pub async fn execute_query(&self, query: &str) -> Result<AsyncRowset<'_>> {
310        let token = self.stats_before_query(query);
311        let result = self.transport.execute_query_streaming(query).await;
312        self.stats_store_pending(token, query);
313        result
314    }
315
316    /// Fetches a single row, erroring if the query returns zero rows.
317    ///
318    /// # Errors
319    ///
320    /// - Returns the error from [`execute_query`](Self::execute_query) if
321    ///   the query fails.
322    /// - Returns [`Error::Other`] with message `"Query returned no rows"` if
323    ///   the query produced zero rows.
324    pub async fn fetch_one<Q: AsRef<str>>(&self, query: Q) -> Result<Row> {
325        self.execute_query(query.as_ref())
326            .await?
327            .require_first_row()
328            .await
329    }
330
331    /// Fetches a single row, returning `None` if the query is empty.
332    ///
333    /// # Errors
334    ///
335    /// Returns the error from [`execute_query`](Self::execute_query) if the
336    /// query fails. An empty result set yields `Ok(None)`, not an error.
337    pub async fn fetch_optional<Q: AsRef<str>>(&self, query: Q) -> Result<Option<Row>> {
338        self.execute_query(query.as_ref()).await?.first_row().await
339    }
340
341    /// Fetches all rows from a query.
342    ///
343    /// # Errors
344    ///
345    /// Returns the error from [`execute_query`](Self::execute_query), or a
346    /// transport error produced while draining every chunk.
347    pub async fn fetch_all<Q: AsRef<str>>(&self, query: Q) -> Result<Vec<Row>> {
348        self.execute_query(query.as_ref())
349            .await?
350            .collect_rows()
351            .await
352    }
353
354    /// Fetches a single row and maps it to a struct using [`crate::FromRow`].
355    ///
356    /// # Errors
357    ///
358    /// - Returns the error from [`fetch_one`](Self::fetch_one).
359    /// - Returns whatever [`FromRow::from_row`](crate::FromRow::from_row)
360    ///   produces when the row cannot be mapped.
361    pub async fn fetch_one_as<T: crate::FromRow>(&self, query: &str) -> Result<T> {
362        let row = self.fetch_one(query).await?;
363        T::from_row(&row)
364    }
365
366    /// Fetches all rows and maps them to structs using [`crate::FromRow`].
367    ///
368    /// # Errors
369    ///
370    /// - Returns the error from [`fetch_all`](Self::fetch_all).
371    /// - Returns the first error produced by
372    ///   [`FromRow::from_row`](crate::FromRow::from_row) on any row.
373    pub async fn fetch_all_as<T: crate::FromRow>(&self, query: &str) -> Result<Vec<T>> {
374        let rows = self.fetch_all(query).await?;
375        rows.iter().map(|r| T::from_row(r)).collect()
376    }
377
378    /// Fetches a single non-NULL scalar value. Errors on empty / NULL.
379    ///
380    /// # Errors
381    ///
382    /// - Returns the error from [`execute_query`](Self::execute_query).
383    /// - Returns [`Error::Other`] with message `"Query returned no rows"` if
384    ///   the query is empty.
385    /// - Returns [`Error::Other`] with message `"Scalar query returned NULL"`
386    ///   if the first cell is SQL `NULL`.
387    pub async fn fetch_scalar<T, Q>(&self, query: Q) -> Result<T>
388    where
389        T: RowValue,
390        Q: AsRef<str>,
391    {
392        self.execute_query(query.as_ref())
393            .await?
394            .require_scalar()
395            .await
396    }
397
398    /// Fetches a single scalar value, allowing NULL (returns `None`).
399    ///
400    /// # Errors
401    ///
402    /// Returns the error from [`execute_query`](Self::execute_query). An
403    /// empty result still yields an error; SQL `NULL` in the first cell
404    /// yields `Ok(None)`.
405    pub async fn fetch_optional_scalar<T, Q>(&self, query: Q) -> Result<Option<T>>
406    where
407        T: RowValue,
408        Q: AsRef<str>,
409    {
410        self.execute_query(query.as_ref()).await?.scalar().await
411    }
412
413    /// Returns the count from a `SELECT COUNT(*)` style query, defaulting
414    /// to 0 on NULL.
415    ///
416    /// # Errors
417    ///
418    /// Returns the error from [`execute_query`](Self::execute_query) if the
419    /// query itself fails.
420    pub async fn query_count(&self, query: &str) -> Result<i64> {
421        let opt: Option<i64> = self.fetch_optional_scalar(query).await?;
422        Ok(opt.unwrap_or(0))
423    }
424
425    // =========================================================================
426    // Arrow Queries
427    // =========================================================================
428
429    /// Executes a SELECT query and returns results as Arrow IPC stream bytes (async).
430    ///
431    /// TCP uses `COPY ... TO STDOUT WITH (FORMAT ARROWSTREAM)`; gRPC uses
432    /// the native Arrow transport. Both return the same IPC stream shape.
433    ///
434    /// # Errors
435    ///
436    /// Propagates any [`Error::Client`] from the transport when the query
437    /// fails or the server cannot produce Arrow IPC output.
438    pub async fn execute_query_to_arrow(&self, sql: &str) -> Result<bytes::Bytes> {
439        self.transport.execute_query_to_arrow(sql).await
440    }
441
442    /// Exports an entire table to Arrow IPC stream format (async).
443    ///
444    /// # Errors
445    ///
446    /// See [`execute_query_to_arrow`](Self::execute_query_to_arrow).
447    pub async fn export_table_to_arrow(&self, table_name: &str) -> Result<bytes::Bytes> {
448        self.execute_query_to_arrow(&format!("SELECT * FROM {table_name}"))
449            .await
450    }
451
452    /// Executes a SELECT query and returns parsed Arrow `RecordBatch`es (async).
453    ///
454    /// # Errors
455    ///
456    /// - Returns [`Error::Client`] if the query fails.
457    /// - Returns [`Error::Other`] if the Arrow IPC payload cannot be
458    ///   decoded into record batches.
459    pub async fn execute_query_to_batches(
460        &self,
461        sql: &str,
462    ) -> Result<Vec<arrow::record_batch::RecordBatch>> {
463        let arrow_data = self.execute_query_to_arrow(sql).await?;
464        crate::arrow_result::parse_arrow_ipc(arrow_data)
465    }
466
467    // =========================================================================
468    // Parameterized Queries
469    // =========================================================================
470
471    /// Executes a parameterized query with safely escaped parameters (async).
472    ///
473    /// Mirrors the sync [`Connection::query_params`](crate::Connection::query_params);
474    /// see that method for the design rationale around text-mode escaping
475    /// vs. future native Bind/Execute support.
476    ///
477    /// # Errors
478    ///
479    /// - Returns [`Error::Other`] on gRPC transports (prepared statements
480    ///   are TCP-only).
481    /// - Returns [`Error::Client`] if the server rejects the statement at
482    ///   `Parse`, `Bind`, or `Execute` time.
483    /// - Returns [`Error::Io`] on transport-level I/O failures.
484    pub async fn query_params(
485        &self,
486        query: &str,
487        params: &[&dyn crate::params::ToSqlParam],
488    ) -> Result<AsyncRowset<'_>> {
489        // Route through the extended query protocol. See
490        // [`Connection::query_params`] for the sync equivalent and the
491        // rationale behind the statement-guard pattern.
492        let client = match &self.transport {
493            AsyncTransport::Tcp(tcp) => &tcp.client,
494            AsyncTransport::Grpc(_) => {
495                return Err(Error::new(
496                    "prepared statements are not supported over gRPC transport",
497                ));
498            }
499        };
500        let oids: Vec<crate::Oid> = params.iter().map(|p| p.sql_oid()).collect();
501        let stmt = client.prepare_typed(query, &oids).await?;
502        let encoded: Vec<Option<Vec<u8>>> = params.iter().map(|p| p.encode_param()).collect();
503        let stream = client
504            .execute_prepared_streaming(&stmt, encoded, crate::result::DEFAULT_BINARY_CHUNK_SIZE)
505            .await?;
506        Ok(AsyncRowset::from_prepared(stream).with_statement_guard(stmt))
507    }
508
509    /// Executes a parameterized command (INSERT / UPDATE / DELETE) with
510    /// binary-encoded parameters via Parse/Bind/Execute (async).
511    ///
512    /// # Errors
513    ///
514    /// - Returns [`Error::Other`] on gRPC transports.
515    /// - Returns [`Error::Client`] if the server rejects the statement at
516    ///   `Parse`, `Bind`, or `Execute` time.
517    /// - Returns [`Error::Io`] on transport-level I/O failures.
518    pub async fn command_params(
519        &self,
520        query: &str,
521        params: &[&dyn crate::params::ToSqlParam],
522    ) -> Result<u64> {
523        let client = match &self.transport {
524            AsyncTransport::Tcp(tcp) => &tcp.client,
525            AsyncTransport::Grpc(_) => {
526                return Err(Error::new(
527                    "prepared statements are not supported over gRPC transport",
528                ));
529            }
530        };
531        let oids: Vec<crate::Oid> = params.iter().map(|p| p.sql_oid()).collect();
532        let stmt = client.prepare_typed(query, &oids).await?;
533        let encoded: Vec<Option<Vec<u8>>> = params.iter().map(|p| p.encode_param()).collect();
534        Ok(client.execute_prepared_no_result(&stmt, encoded).await?)
535    }
536
537    // =========================================================================
538    // Catalog / Database Management
539    // =========================================================================
540
541    /// Creates a new database file (async).
542    ///
543    /// # Errors
544    ///
545    /// Returns [`Error::Client`] if the server rejects
546    /// `CREATE DATABASE IF NOT EXISTS` (e.g. the path is not writable).
547    pub async fn create_database(&self, path: &str) -> Result<()> {
548        let sql = format!("CREATE DATABASE IF NOT EXISTS {}", escape_sql_path(path));
549        self.execute_command(&sql).await?;
550        Ok(())
551    }
552
553    /// Drops (deletes) a database file (async).
554    ///
555    /// # Errors
556    ///
557    /// Returns [`Error::Client`] if the server rejects
558    /// `DROP DATABASE IF EXISTS` (e.g. the database is still attached).
559    pub async fn drop_database(&self, path: &str) -> Result<()> {
560        let sql = format!("DROP DATABASE IF EXISTS {}", escape_sql_path(path));
561        self.execute_command(&sql).await?;
562        Ok(())
563    }
564
565    /// Attaches a database file to the connection (async).
566    ///
567    /// # Errors
568    ///
569    /// Returns [`Error::Client`] if the server rejects the
570    /// `ATTACH DATABASE` statement (file missing, permission denied,
571    /// alias conflict).
572    pub async fn attach_database(&self, path: &str, alias: Option<&str>) -> Result<()> {
573        let sql = if let Some(alias) = alias {
574            format!(
575                "ATTACH DATABASE {} AS {}",
576                escape_sql_path(path),
577                escape_sql_path(alias)
578            )
579        } else {
580            format!("ATTACH DATABASE {}", escape_sql_path(path))
581        };
582        self.execute_command(&sql).await?;
583        Ok(())
584    }
585
586    /// Detaches a database alias from this connection (async).
587    ///
588    /// # Errors
589    ///
590    /// Returns [`Error::Client`] if the alias is not attached or the
591    /// server cannot flush pending updates.
592    pub async fn detach_database(&self, alias: &str) -> Result<()> {
593        let sql = format!("DETACH DATABASE {}", escape_sql_path(alias));
594        self.execute_command(&sql).await?;
595        Ok(())
596    }
597
598    /// Detaches all databases from this connection (async).
599    ///
600    /// # Errors
601    ///
602    /// Returns [`Error::Client`] if the server rejects
603    /// `DETACH ALL DATABASES`.
604    pub async fn detach_all_databases(&self) -> Result<()> {
605        self.execute_command("DETACH ALL DATABASES").await?;
606        Ok(())
607    }
608
609    /// Copies a database file to a new path (async).
610    ///
611    /// # Errors
612    ///
613    /// Returns [`Error::Client`] if the server rejects the
614    /// `COPY DATABASE` statement — e.g. the source is not attached or the
615    /// destination path is not writable.
616    pub async fn copy_database(&self, source: &str, destination: &str) -> Result<()> {
617        let sql = format!(
618            "COPY DATABASE {} TO {}",
619            escape_sql_path(source),
620            escape_sql_path(destination)
621        );
622        self.execute_command(&sql).await?;
623        Ok(())
624    }
625
626    /// Creates a schema in the database (async).
627    ///
628    /// # Errors
629    ///
630    /// - Returns an error if `schema_name` cannot be converted to a
631    ///   [`SchemaName`](crate::SchemaName).
632    /// - Returns [`Error::Client`] if the server rejects
633    ///   `CREATE SCHEMA IF NOT EXISTS`.
634    pub async fn create_schema<T>(&self, schema_name: T) -> Result<()>
635    where
636        T: TryInto<crate::SchemaName>,
637        crate::Error: From<T::Error>,
638    {
639        let schema: crate::SchemaName = schema_name.try_into()?;
640        let sql = format!("CREATE SCHEMA IF NOT EXISTS {schema}");
641        self.execute_command(&sql).await?;
642        Ok(())
643    }
644
645    /// Checks whether a schema exists (async).
646    ///
647    /// # Errors
648    ///
649    /// - Returns an error if `schema` cannot be converted to a
650    ///   [`SchemaName`](crate::SchemaName).
651    /// - Returns [`Error::Client`] if the catalog lookup query fails.
652    pub async fn has_schema<T>(&self, schema: T) -> Result<bool>
653    where
654        T: TryInto<crate::SchemaName>,
655        crate::Error: From<T::Error>,
656    {
657        let schema: crate::SchemaName = schema.try_into()?;
658        let db_prefix = if let Some(db) = schema.database() {
659            format!("{db}.")
660        } else {
661            String::new()
662        };
663        let sql = format!(
664            "SELECT 1 FROM {}pg_catalog.pg_namespace WHERE nspname = '{}'",
665            db_prefix,
666            schema.unescaped().replace('\'', "''")
667        );
668        Ok(self.fetch_optional(&sql).await?.is_some())
669    }
670
671    /// Checks whether a table exists (async).
672    ///
673    /// # Errors
674    ///
675    /// - Returns an error if `table_name` cannot be converted to a
676    ///   [`TableName`](crate::TableName).
677    /// - Returns [`Error::Client`] if the catalog lookup query fails.
678    pub async fn has_table<T>(&self, table_name: T) -> Result<bool>
679    where
680        T: TryInto<crate::TableName>,
681        crate::Error: From<T::Error>,
682    {
683        let table: crate::TableName = table_name.try_into()?;
684        let schema = table
685            .schema()
686            .map_or("public", super::names::Name::unescaped);
687        let db_prefix = if let Some(db) = table.database() {
688            format!("{db}.")
689        } else {
690            String::new()
691        };
692        let sql = format!(
693            "SELECT 1 FROM {}pg_catalog.pg_tables WHERE schemaname = '{}' AND tablename = '{}'",
694            db_prefix,
695            schema.replace('\'', "''"),
696            table.table().unescaped().replace('\'', "''")
697        );
698        Ok(self.fetch_optional(&sql).await?.is_some())
699    }
700
701    /// Unloads the database from memory but keeps the session alive (async).
702    ///
703    /// # Errors
704    ///
705    /// Returns [`Error::Client`] if the server rejects `UNLOAD DATABASE`
706    /// (e.g. the database is in use by another session).
707    pub async fn unload_database(&self) -> Result<()> {
708        self.execute_command("UNLOAD DATABASE").await?;
709        Ok(())
710    }
711
712    /// Releases the database completely from the session (async).
713    ///
714    /// # Errors
715    ///
716    /// Returns [`Error::Client`] if the server rejects `UNLOAD RELEASE`,
717    /// most commonly because multiple databases are attached to the same
718    /// session.
719    pub async fn unload_release(&self) -> Result<()> {
720        self.execute_command("UNLOAD RELEASE").await?;
721        Ok(())
722    }
723
724    // =========================================================================
725    // Diagnostics / Explain
726    // =========================================================================
727
728    /// Executes EXPLAIN and returns the plan text (async).
729    ///
730    /// # Errors
731    ///
732    /// Returns [`Error::Client`] if `EXPLAIN <query>` fails to parse or plan.
733    pub async fn explain(&self, query: &str) -> Result<String> {
734        let sql = format!("EXPLAIN {query}");
735        let rows = self.fetch_all(&sql).await?;
736        let lines: Vec<String> = rows.iter().filter_map(|r| r.get::<String>(0)).collect();
737        Ok(lines.join("\n"))
738    }
739
740    /// Executes EXPLAIN ANALYZE and returns the plan with timing (async).
741    ///
742    /// # Errors
743    ///
744    /// Returns [`Error::Client`] if `EXPLAIN ANALYZE <query>` fails — this
745    /// includes any runtime error raised by actually executing `query`.
746    pub async fn explain_analyze(&self, query: &str) -> Result<String> {
747        let sql = format!("EXPLAIN ANALYZE {query}");
748        let rows = self.fetch_all(&sql).await?;
749        let lines: Vec<String> = rows.iter().filter_map(|r| r.get::<String>(0)).collect();
750        Ok(lines.join("\n"))
751    }
752
753    // =========================================================================
754    // Connection Introspection / Lifecycle
755    // =========================================================================
756
757    /// Returns true if the connection is alive (passive check).
758    pub fn is_alive(&self) -> bool {
759        match &self.transport {
760            AsyncTransport::Tcp(tcp) => tcp.client.is_alive(),
761            AsyncTransport::Grpc(_) => true,
762        }
763    }
764
765    /// Actively pings the server with `SELECT 1` (async).
766    ///
767    /// # Errors
768    ///
769    /// Returns [`Error::Client`] or [`Error::Io`] if the `SELECT 1`
770    /// round-trip fails — i.e. the connection is no longer usable.
771    pub async fn ping(&self) -> Result<()> {
772        self.execute_command("SELECT 1").await?;
773        Ok(())
774    }
775
776    /// Returns the backend process ID, or 0 for gRPC transports.
777    pub fn process_id(&self) -> i32 {
778        match &self.transport {
779            AsyncTransport::Tcp(tcp) => tcp.client.process_id(),
780            AsyncTransport::Grpc(_) => 0,
781        }
782    }
783
784    /// Returns the secret key used for cancel requests, or 0 for gRPC.
785    pub fn secret_key(&self) -> i32 {
786        match &self.transport {
787            AsyncTransport::Tcp(tcp) => tcp.client.secret_key(),
788            AsyncTransport::Grpc(_) => 0,
789        }
790    }
791
792    /// Returns a server parameter value by name (async).
793    pub async fn parameter_status(&self, name: &str) -> Option<String> {
794        match &self.transport {
795            AsyncTransport::Tcp(tcp) => tcp.client.parameter_status(name).await,
796            AsyncTransport::Grpc(_) => None,
797        }
798    }
799
800    /// Returns the server version as a parsed struct (async).
801    pub async fn server_version(&self) -> Option<crate::ServerVersion> {
802        let version_str = self.parameter_status("server_version").await?;
803        crate::ServerVersion::parse(&version_str)
804    }
805
806    /// Sets the notice receiver callback for this connection.
807    pub fn set_notice_receiver(
808        &mut self,
809        receiver: Option<Box<dyn Fn(hyperdb_api_core::client::Notice) + Send + Sync>>,
810    ) {
811        match &mut self.transport {
812            AsyncTransport::Tcp(tcp) => tcp.client.set_notice_receiver(receiver),
813            AsyncTransport::Grpc(_) => {}
814        }
815    }
816
817    /// Cancels the currently running query (async).
818    ///
819    /// # Errors
820    ///
821    /// - Returns [`Error::Other`] on gRPC transports — cancellation is not
822    ///   yet implemented for gRPC.
823    /// - Returns [`Error::Client`] or [`Error::Io`] if the cancel-request
824    ///   connection to the server fails.
825    pub async fn cancel(&self) -> Result<()> {
826        self.transport.cancel().await
827    }
828
829    /// Closes the connection gracefully, detaching any attached database first (async).
830    ///
831    /// # Errors
832    ///
833    /// - Returns [`Error::Other`] wrapping the transport close failure if
834    ///   the client cannot be shut down cleanly.
835    /// - Returns [`Error::Other`] wrapping the detach failure if the
836    ///   attached database could not be detached but the transport close
837    ///   itself succeeded.
838    pub async fn close(self) -> Result<()> {
839        let detach_err = if let Some(ref db_path) = self.database {
840            let db_alias = std::path::Path::new(db_path)
841                .file_stem()
842                .and_then(|s| s.to_str())
843                .unwrap_or("db");
844            self.execute_command(&format!("DETACH DATABASE {}", escape_sql_path(db_alias)))
845                .await
846                .err()
847        } else {
848            None
849        };
850
851        let close_result = self.transport.close().await;
852
853        if let Err(e) = close_result {
854            return Err(Error::with_cause("Failed to close async connection", e));
855        }
856
857        if let Some(e) = detach_err {
858            return Err(Error::with_cause(
859                "Failed to detach database during close",
860                e,
861            ));
862        }
863
864        Ok(())
865    }
866
867    /// Returns a reference to the underlying async TCP client (`None` for gRPC).
868    ///
869    /// Prefer the high-level `AsyncConnection` methods; this escape hatch
870    /// remains for code that needs direct protocol access (e.g. custom
871    /// COPY loops).
872    pub fn async_tcp_client(&self) -> Option<&hyperdb_api_core::client::AsyncClient> {
873        self.transport.async_tcp_client()
874    }
875
876    /// Crate-internal accessor for the transport. Used by
877    /// [`AsyncPreparedStatement`](crate::AsyncPreparedStatement) to reach
878    /// the underlying `hyperdb_api_core::client::AsyncClient`.
879    pub(crate) fn transport(&self) -> &AsyncTransport {
880        &self.transport
881    }
882
883    /// Prepares a SQL statement (async).
884    ///
885    /// See [`Connection::prepare`](crate::Connection::prepare) for
886    /// semantics. The returned
887    /// [`AsyncPreparedStatement`](crate::AsyncPreparedStatement) can be
888    /// executed many times with different parameter values.
889    ///
890    /// # Errors
891    ///
892    /// See [`prepare_typed`](Self::prepare_typed) — this method delegates
893    /// to it with an empty OID list.
894    pub async fn prepare(&self, query: &str) -> Result<crate::AsyncPreparedStatement<'_>> {
895        self.prepare_typed(query, &[]).await
896    }
897
898    /// Prepares a SQL statement with explicit parameter type OIDs (async).
899    ///
900    /// # Errors
901    ///
902    /// - Returns [`Error::Other`] on gRPC transports (prepared statements
903    ///   are TCP-only).
904    /// - Returns [`Error::Client`] if the server rejects the `Parse`
905    ///   message (SQL syntax error, unknown OID).
906    /// - Returns [`Error::Io`] on transport-level I/O failures.
907    pub async fn prepare_typed(
908        &self,
909        query: &str,
910        param_types: &[crate::Oid],
911    ) -> Result<crate::AsyncPreparedStatement<'_>> {
912        let client = match &self.transport {
913            AsyncTransport::Tcp(tcp) => &tcp.client,
914            AsyncTransport::Grpc(_) => {
915                return Err(Error::new(
916                    "prepared statements are not supported over gRPC transport",
917                ));
918            }
919        };
920        let inner = client.prepare_typed(query, param_types).await?;
921        crate::AsyncPreparedStatement::new(self, inner)
922    }
923
924    /// Owned-handle variant of [`prepare`](Self::prepare). Returns a
925    /// `'static`-lifetime [`AsyncPreparedStatementOwned`](crate::AsyncPreparedStatementOwned)
926    /// that holds an `Arc`-cloned reference to `self`.
927    ///
928    /// Intended for N-API consumers and any other caller that needs
929    /// the prepared statement to outlive the stack frame where the
930    /// connection is held.
931    ///
932    /// # Errors
933    ///
934    /// See [`prepare_typed_arc`](Self::prepare_typed_arc).
935    pub async fn prepare_arc(
936        self: &Arc<Self>,
937        query: &str,
938    ) -> Result<crate::async_prepared::AsyncPreparedStatementOwned> {
939        self.prepare_typed_arc(query, &[]).await
940    }
941
942    /// Owned-handle variant of [`prepare_typed`](Self::prepare_typed).
943    ///
944    /// # Errors
945    ///
946    /// - Returns [`Error::Other`] on gRPC transports.
947    /// - Returns [`Error::Client`] if the server rejects the `Parse`
948    ///   message.
949    /// - Returns [`Error::Io`] on transport-level I/O failures.
950    pub async fn prepare_typed_arc(
951        self: &Arc<Self>,
952        query: &str,
953        param_types: &[crate::Oid],
954    ) -> Result<crate::async_prepared::AsyncPreparedStatementOwned> {
955        let client = match &self.transport {
956            AsyncTransport::Tcp(tcp) => &tcp.client,
957            AsyncTransport::Grpc(_) => {
958                return Err(Error::new(
959                    "prepared statements are not supported over gRPC transport",
960                ));
961            }
962        };
963        let inner = client.prepare_typed(query, param_types).await?;
964        crate::async_prepared::AsyncPreparedStatementOwned::new(Arc::clone(self), inner)
965    }
966
967    // =========================================================================
968    // Query Statistics
969    // =========================================================================
970
971    /// Enables query statistics collection for this connection.
972    pub fn enable_query_stats(&self, provider: impl QueryStatsProvider + 'static) {
973        if let Ok(mut guard) = self.stats_provider.lock() {
974            *guard = Some(Arc::new(provider));
975        }
976    }
977
978    /// Disables query statistics collection.
979    pub fn disable_query_stats(&self) {
980        if let Ok(mut guard) = self.stats_provider.lock() {
981            *guard = None;
982        }
983        if let Ok(mut guard) = self.pending_stats.lock() {
984            *guard = None;
985        }
986    }
987
988    /// Returns the stats for the most recent query (if enabled).
989    pub fn last_query_stats(&self) -> Option<QueryStats> {
990        let provider = self.stats_provider.lock().ok()?.as_ref().cloned()?;
991        let mut guard = self.pending_stats.lock().ok()?;
992        let (token, sql) = guard.take()?;
993        provider.after_query(token, &sql)
994    }
995
996    fn stats_before_query(&self, sql: &str) -> Option<Box<dyn Any + Send>> {
997        self.stats_provider
998            .lock()
999            .ok()?
1000            .as_ref()
1001            .map(|p| p.before_query(sql))
1002    }
1003
1004    fn stats_store_pending(&self, token: Option<Box<dyn Any + Send>>, sql: &str) {
1005        if let Some(token) = token {
1006            if let Ok(mut guard) = self.pending_stats.lock() {
1007                *guard = Some((token, sql.to_string()));
1008            }
1009        }
1010    }
1011}
1012
1013impl AsyncConnection {
1014    // =========================================================================
1015    // Transaction Control
1016    // =========================================================================
1017
1018    /// Begins an explicit transaction (async).
1019    ///
1020    /// # Errors
1021    ///
1022    /// Returns [`Error::Client`] if the server rejects `BEGIN TRANSACTION`
1023    /// (e.g. a transaction is already open on this session).
1024    pub async fn begin_transaction(&self) -> Result<()> {
1025        self.execute_command("BEGIN TRANSACTION").await?;
1026        Ok(())
1027    }
1028
1029    /// Commits the current transaction (async).
1030    ///
1031    /// # Errors
1032    ///
1033    /// Returns [`Error::Client`] if the server rejects `COMMIT` (e.g. no
1034    /// transaction is currently open).
1035    pub async fn commit(&self) -> Result<()> {
1036        self.execute_command("COMMIT").await?;
1037        Ok(())
1038    }
1039
1040    /// Rolls back the current transaction (async).
1041    ///
1042    /// # Errors
1043    ///
1044    /// Returns [`Error::Client`] if the server rejects `ROLLBACK` (e.g. no
1045    /// transaction is currently open).
1046    pub async fn rollback(&self) -> Result<()> {
1047        self.execute_command("ROLLBACK").await?;
1048        Ok(())
1049    }
1050
1051    /// Starts a transaction with an async RAII guard (async).
1052    ///
1053    /// # Errors
1054    ///
1055    /// Returns [`Error::Client`] if the internal `BEGIN` issued by
1056    /// [`AsyncTransaction::new`](crate::AsyncTransaction) fails.
1057    pub async fn transaction(&mut self) -> Result<crate::AsyncTransaction<'_>> {
1058        crate::AsyncTransaction::new(self).await
1059    }
1060}