hyperdb_api/async_connection.rs
1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Async connection to Hyper database.
5//!
6//! This module provides [`AsyncConnection`], the async version of [`Connection`](crate::Connection).
7//! Use this when you're already in an async runtime (tokio).
8
9use std::any::Any;
10use std::sync::{Arc, Mutex};
11
12use crate::async_result::AsyncRowset;
13use crate::async_transport::{AsyncTcpTransport, AsyncTransport};
14use crate::error::{Error, Result};
15use crate::names::escape_sql_path;
16use crate::query_stats::{QueryStats, QueryStatsProvider};
17use crate::result::{Row, RowValue};
18use crate::CreateMode;
19
20/// An async connection to a Hyper database.
21///
22/// This is the async equivalent of [`Connection`](crate::Connection), designed for use
23/// in tokio-based async applications. All I/O operations are non-blocking.
24///
25/// # Example
26///
27/// ```no_run
28/// use hyperdb_api::{AsyncConnection, CreateMode, Result};
29///
30/// #[tokio::main]
31/// async fn main() -> Result<()> {
32/// let conn = AsyncConnection::connect(
33/// "localhost:7483",
34/// "example.hyper",
35/// CreateMode::CreateIfNotExists,
36/// ).await?;
37///
38/// conn.execute_command("CREATE TABLE test (id INT)").await?;
39/// let count: i64 = conn.fetch_scalar("SELECT COUNT(*) FROM test").await?;
40///
41/// conn.close().await?;
42/// Ok(())
43/// }
44/// ```
45pub struct AsyncConnection {
46 transport: AsyncTransport,
47 database: Option<String>,
48 stats_provider: Mutex<Option<Arc<dyn QueryStatsProvider>>>,
49 pending_stats: Mutex<Option<(Box<dyn Any + Send>, String)>>,
50}
51
52impl std::fmt::Debug for AsyncConnection {
53 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54 f.debug_struct("AsyncConnection")
55 .field("database", &self.database)
56 .finish_non_exhaustive()
57 }
58}
59
60impl AsyncConnection {
61 /// Returns a fluent [`AsyncConnectionBuilder`](crate::AsyncConnectionBuilder)
62 /// pointed at `endpoint`.
63 #[must_use]
64 pub fn builder(endpoint: &str) -> crate::AsyncConnectionBuilder {
65 crate::AsyncConnectionBuilder::new(endpoint)
66 }
67
68 /// Connects to a Hyper server (async).
69 ///
70 /// Transport is auto-detected from the endpoint:
71 /// - `https://` or `http://` → gRPC transport
72 /// - Otherwise → TCP transport (`PostgreSQL` wire protocol)
73 ///
74 /// # Errors
75 ///
76 /// - Returns [`Error::Io`] / [`Error::Client`] if the handshake with
77 /// the server fails.
78 /// - Returns [`Error::Client`] if the `CreateMode` SQL (`CREATE`
79 /// / `DROP` / `ATTACH`) is rejected by the server.
80 pub async fn connect(endpoint: &str, database: &str, mode: CreateMode) -> Result<Self> {
81 let transport = AsyncTransport::connect(endpoint, Some(database)).await?;
82 let conn = AsyncConnection {
83 transport,
84 database: Some(database.to_string()),
85 stats_provider: Mutex::new(None),
86 pending_stats: Mutex::new(None),
87 };
88
89 if conn.transport.supports_writes() {
90 conn.handle_creation_mode(database, mode).await?;
91 conn.attach_and_set_path(database).await?;
92 }
93
94 Ok(conn)
95 }
96
97 /// Connects with authentication (async).
98 ///
99 /// # Errors
100 ///
101 /// - Returns [`Error::Client`] if authentication is rejected.
102 /// - Returns [`Error::Io`] if the endpoint cannot be reached.
103 /// - Returns [`Error::Client`] if the `CreateMode` SQL is rejected.
104 pub async fn connect_with_auth(
105 endpoint: &str,
106 database: &str,
107 mode: CreateMode,
108 user: &str,
109 password: &str,
110 ) -> Result<Self> {
111 let transport = AsyncTransport::connect_tcp_with_auth(endpoint, user, password).await?;
112 let conn = AsyncConnection {
113 transport,
114 database: Some(database.to_string()),
115 stats_provider: Mutex::new(None),
116 pending_stats: Mutex::new(None),
117 };
118
119 conn.handle_creation_mode(database, mode).await?;
120 conn.attach_and_set_path(database).await?;
121
122 Ok(conn)
123 }
124
125 /// Connects to a server without attaching any database (async).
126 ///
127 /// Useful for running `CREATE DATABASE` / `DROP DATABASE` without an
128 /// active attachment.
129 ///
130 /// # Errors
131 ///
132 /// Returns [`Error::Io`] or [`Error::Client`] if the TCP handshake
133 /// with `endpoint` fails.
134 pub async fn without_database(endpoint: &str) -> Result<Self> {
135 let transport = AsyncTransport::connect_tcp(endpoint).await?;
136 Ok(AsyncConnection {
137 transport,
138 database: None,
139 stats_provider: Mutex::new(None),
140 pending_stats: Mutex::new(None),
141 })
142 }
143
144 /// Builds an `AsyncConnection` from a pre-existing `AsyncClient` (TCP only).
145 #[must_use]
146 pub fn from_async_client(
147 client: hyperdb_api_core::client::AsyncClient,
148 database: Option<String>,
149 ) -> Self {
150 AsyncConnection {
151 transport: AsyncTransport::Tcp(AsyncTcpTransport { client }),
152 database,
153 stats_provider: Mutex::new(None),
154 pending_stats: Mutex::new(None),
155 }
156 }
157
158 /// Builds an `AsyncConnection` from a pre-constructed transport.
159 ///
160 /// Used by [`AsyncConnectionBuilder`](crate::AsyncConnectionBuilder) to
161 /// stitch together a gRPC transport after its own config construction.
162 pub(crate) fn from_transport(transport: AsyncTransport, database: Option<String>) -> Self {
163 AsyncConnection {
164 transport,
165 database,
166 stats_provider: Mutex::new(None),
167 pending_stats: Mutex::new(None),
168 }
169 }
170
171 /// Runs the configured `CreateMode` as SQL (crate-public for use by
172 /// [`AsyncConnectionBuilder`](crate::AsyncConnectionBuilder)).
173 pub(crate) async fn handle_creation_mode_public(
174 &self,
175 database: &str,
176 mode: CreateMode,
177 ) -> Result<()> {
178 self.handle_creation_mode(database, mode).await
179 }
180
181 /// Attaches the database and sets `search_path` (crate-public for use
182 /// by [`AsyncConnectionBuilder`](crate::AsyncConnectionBuilder)).
183 pub(crate) async fn attach_and_set_path_public(&self, database: &str) -> Result<()> {
184 self.attach_and_set_path(database).await
185 }
186
187 async fn handle_creation_mode(&self, database: &str, mode: CreateMode) -> Result<()> {
188 let escaped_db = escape_sql_path(database);
189 match mode {
190 CreateMode::Create => {
191 self.execute_command(&format!("CREATE DATABASE {escaped_db}"))
192 .await?;
193 }
194 CreateMode::CreateIfNotExists => {
195 self.execute_command(&format!("CREATE DATABASE IF NOT EXISTS {escaped_db}"))
196 .await?;
197 }
198 CreateMode::CreateAndReplace => {
199 self.execute_command(&format!("DROP DATABASE IF EXISTS {escaped_db}"))
200 .await?;
201 self.execute_command(&format!("CREATE DATABASE {escaped_db}"))
202 .await?;
203 }
204 CreateMode::DoNotCreate => {}
205 }
206 Ok(())
207 }
208
209 async fn attach_and_set_path(&self, database: &str) -> Result<()> {
210 let escaped_db = escape_sql_path(database);
211 let db_alias = std::path::Path::new(database)
212 .file_stem()
213 .and_then(|s| s.to_str())
214 .unwrap_or("db");
215 let escaped_alias = escape_sql_path(db_alias);
216
217 self.execute_command(&format!("ATTACH DATABASE {escaped_db} AS {escaped_alias}"))
218 .await?;
219
220 self.execute_command(&format!("SET search_path TO {escaped_alias}, public"))
221 .await?;
222 Ok(())
223 }
224
225 /// Returns the transport type name (e.g., "TCP", "gRPC").
226 pub fn transport_type(&self) -> &'static str {
227 self.transport.transport_type().as_str()
228 }
229
230 /// Returns true if this connection supports write operations.
231 pub fn supports_writes(&self) -> bool {
232 self.transport.supports_writes()
233 }
234
235 /// Returns the database path.
236 pub fn database(&self) -> Option<&str> {
237 self.database.as_deref()
238 }
239
240 // =========================================================================
241 // Command Execution
242 // =========================================================================
243
244 /// Executes a SQL command that doesn't return rows (async).
245 ///
246 /// Use for DDL statements (CREATE, DROP, ALTER) and DML statements
247 /// (INSERT, UPDATE, DELETE). Returns the number of affected rows (DML)
248 /// or 0 (DDL).
249 ///
250 /// # Errors
251 ///
252 /// - Returns [`Error::Other`] on gRPC transports that do not yet
253 /// support write operations.
254 /// - Returns [`Error::Client`] if the SQL fails to parse or execute.
255 /// - Returns [`Error::Io`] on transport-level I/O failures.
256 pub async fn execute_command(&self, sql: &str) -> Result<u64> {
257 let token = self.stats_before_query(sql);
258 let result = self.transport.execute_command(sql).await;
259 self.stats_store_pending(token, sql);
260 result
261 }
262
263 /// Executes multiple SQL statements sequentially (async).
264 ///
265 /// If any statement fails, execution stops and the error is returned
266 /// wrapping the SQL preview for context.
267 ///
268 /// # Errors
269 ///
270 /// Returns an [`Error::Other`] wrapping the first failing statement's
271 /// error; the wrapping message includes the statement's ordinal and
272 /// an 80-character SQL preview.
273 pub async fn execute_batch(&self, statements: &[&str]) -> Result<u64> {
274 let mut total = 0u64;
275 for (i, stmt) in statements.iter().enumerate() {
276 if !stmt.trim().is_empty() {
277 total += self.execute_command(stmt).await.map_err(|e| {
278 let preview: String = stmt.chars().take(80).collect();
279 Error::with_cause(
280 format!(
281 "execute_batch failed at statement {} of {}: {}",
282 i + 1,
283 statements.len(),
284 preview,
285 ),
286 e,
287 )
288 })?;
289 }
290 }
291 Ok(total)
292 }
293
294 // =========================================================================
295 // Query Execution (Streaming)
296 // =========================================================================
297
298 /// Executes a SQL query and returns a streaming [`AsyncRowset`] (async).
299 ///
300 /// Results are streamed in chunks so memory usage stays constant
301 /// regardless of result set size. See [`AsyncRowset`] for the row-level
302 /// API and collectors.
303 ///
304 /// # Errors
305 ///
306 /// - Returns [`Error::Client`] if the SQL is rejected by the server.
307 /// - Returns [`Error::Io`] on transport-level I/O failures while
308 /// opening the stream.
309 pub async fn execute_query(&self, query: &str) -> Result<AsyncRowset<'_>> {
310 let token = self.stats_before_query(query);
311 let result = self.transport.execute_query_streaming(query).await;
312 self.stats_store_pending(token, query);
313 result
314 }
315
316 /// Fetches a single row, erroring if the query returns zero rows.
317 ///
318 /// # Errors
319 ///
320 /// - Returns the error from [`execute_query`](Self::execute_query) if
321 /// the query fails.
322 /// - Returns [`Error::Other`] with message `"Query returned no rows"` if
323 /// the query produced zero rows.
324 pub async fn fetch_one<Q: AsRef<str>>(&self, query: Q) -> Result<Row> {
325 self.execute_query(query.as_ref())
326 .await?
327 .require_first_row()
328 .await
329 }
330
331 /// Fetches a single row, returning `None` if the query is empty.
332 ///
333 /// # Errors
334 ///
335 /// Returns the error from [`execute_query`](Self::execute_query) if the
336 /// query fails. An empty result set yields `Ok(None)`, not an error.
337 pub async fn fetch_optional<Q: AsRef<str>>(&self, query: Q) -> Result<Option<Row>> {
338 self.execute_query(query.as_ref()).await?.first_row().await
339 }
340
341 /// Fetches all rows from a query.
342 ///
343 /// # Errors
344 ///
345 /// Returns the error from [`execute_query`](Self::execute_query), or a
346 /// transport error produced while draining every chunk.
347 pub async fn fetch_all<Q: AsRef<str>>(&self, query: Q) -> Result<Vec<Row>> {
348 self.execute_query(query.as_ref())
349 .await?
350 .collect_rows()
351 .await
352 }
353
354 /// Fetches a single row and maps it to a struct using [`crate::FromRow`].
355 ///
356 /// # Errors
357 ///
358 /// - Returns the error from [`fetch_one`](Self::fetch_one).
359 /// - Returns whatever [`FromRow::from_row`](crate::FromRow::from_row)
360 /// produces when the row cannot be mapped.
361 pub async fn fetch_one_as<T: crate::FromRow>(&self, query: &str) -> Result<T> {
362 let row = self.fetch_one(query).await?;
363 T::from_row(&row)
364 }
365
366 /// Fetches all rows and maps them to structs using [`crate::FromRow`].
367 ///
368 /// # Errors
369 ///
370 /// - Returns the error from [`fetch_all`](Self::fetch_all).
371 /// - Returns the first error produced by
372 /// [`FromRow::from_row`](crate::FromRow::from_row) on any row.
373 pub async fn fetch_all_as<T: crate::FromRow>(&self, query: &str) -> Result<Vec<T>> {
374 let rows = self.fetch_all(query).await?;
375 rows.iter().map(|r| T::from_row(r)).collect()
376 }
377
378 /// Fetches a single non-NULL scalar value. Errors on empty / NULL.
379 ///
380 /// # Errors
381 ///
382 /// - Returns the error from [`execute_query`](Self::execute_query).
383 /// - Returns [`Error::Other`] with message `"Query returned no rows"` if
384 /// the query is empty.
385 /// - Returns [`Error::Other`] with message `"Scalar query returned NULL"`
386 /// if the first cell is SQL `NULL`.
387 pub async fn fetch_scalar<T, Q>(&self, query: Q) -> Result<T>
388 where
389 T: RowValue,
390 Q: AsRef<str>,
391 {
392 self.execute_query(query.as_ref())
393 .await?
394 .require_scalar()
395 .await
396 }
397
398 /// Fetches a single scalar value, allowing NULL (returns `None`).
399 ///
400 /// # Errors
401 ///
402 /// Returns the error from [`execute_query`](Self::execute_query). An
403 /// empty result still yields an error; SQL `NULL` in the first cell
404 /// yields `Ok(None)`.
405 pub async fn fetch_optional_scalar<T, Q>(&self, query: Q) -> Result<Option<T>>
406 where
407 T: RowValue,
408 Q: AsRef<str>,
409 {
410 self.execute_query(query.as_ref()).await?.scalar().await
411 }
412
413 /// Returns the count from a `SELECT COUNT(*)` style query, defaulting
414 /// to 0 on NULL.
415 ///
416 /// # Errors
417 ///
418 /// Returns the error from [`execute_query`](Self::execute_query) if the
419 /// query itself fails.
420 pub async fn query_count(&self, query: &str) -> Result<i64> {
421 let opt: Option<i64> = self.fetch_optional_scalar(query).await?;
422 Ok(opt.unwrap_or(0))
423 }
424
425 // =========================================================================
426 // Arrow Queries
427 // =========================================================================
428
429 /// Executes a SELECT query and returns results as Arrow IPC stream bytes (async).
430 ///
431 /// TCP uses `COPY ... TO STDOUT WITH (FORMAT ARROWSTREAM)`; gRPC uses
432 /// the native Arrow transport. Both return the same IPC stream shape.
433 ///
434 /// # Errors
435 ///
436 /// Propagates any [`Error::Client`] from the transport when the query
437 /// fails or the server cannot produce Arrow IPC output.
438 pub async fn execute_query_to_arrow(&self, sql: &str) -> Result<bytes::Bytes> {
439 self.transport.execute_query_to_arrow(sql).await
440 }
441
442 /// Exports an entire table to Arrow IPC stream format (async).
443 ///
444 /// # Errors
445 ///
446 /// See [`execute_query_to_arrow`](Self::execute_query_to_arrow).
447 pub async fn export_table_to_arrow(&self, table_name: &str) -> Result<bytes::Bytes> {
448 self.execute_query_to_arrow(&format!("SELECT * FROM {table_name}"))
449 .await
450 }
451
452 /// Executes a SELECT query and returns parsed Arrow `RecordBatch`es (async).
453 ///
454 /// # Errors
455 ///
456 /// - Returns [`Error::Client`] if the query fails.
457 /// - Returns [`Error::Other`] if the Arrow IPC payload cannot be
458 /// decoded into record batches.
459 pub async fn execute_query_to_batches(
460 &self,
461 sql: &str,
462 ) -> Result<Vec<arrow::record_batch::RecordBatch>> {
463 let arrow_data = self.execute_query_to_arrow(sql).await?;
464 crate::arrow_result::parse_arrow_ipc(arrow_data)
465 }
466
467 // =========================================================================
468 // Parameterized Queries
469 // =========================================================================
470
471 /// Executes a parameterized query with safely escaped parameters (async).
472 ///
473 /// Mirrors the sync [`Connection::query_params`](crate::Connection::query_params);
474 /// see that method for the design rationale around text-mode escaping
475 /// vs. future native Bind/Execute support.
476 ///
477 /// # Errors
478 ///
479 /// - Returns [`Error::Other`] on gRPC transports (prepared statements
480 /// are TCP-only).
481 /// - Returns [`Error::Client`] if the server rejects the statement at
482 /// `Parse`, `Bind`, or `Execute` time.
483 /// - Returns [`Error::Io`] on transport-level I/O failures.
484 pub async fn query_params(
485 &self,
486 query: &str,
487 params: &[&dyn crate::params::ToSqlParam],
488 ) -> Result<AsyncRowset<'_>> {
489 // Route through the extended query protocol. See
490 // [`Connection::query_params`] for the sync equivalent and the
491 // rationale behind the statement-guard pattern.
492 let client = match &self.transport {
493 AsyncTransport::Tcp(tcp) => &tcp.client,
494 AsyncTransport::Grpc(_) => {
495 return Err(Error::new(
496 "prepared statements are not supported over gRPC transport",
497 ));
498 }
499 };
500 let oids: Vec<crate::Oid> = params.iter().map(|p| p.sql_oid()).collect();
501 let stmt = client.prepare_typed(query, &oids).await?;
502 let encoded: Vec<Option<Vec<u8>>> = params.iter().map(|p| p.encode_param()).collect();
503 let stream = client
504 .execute_prepared_streaming(&stmt, encoded, crate::result::DEFAULT_BINARY_CHUNK_SIZE)
505 .await?;
506 Ok(AsyncRowset::from_prepared(stream).with_statement_guard(stmt))
507 }
508
509 /// Executes a parameterized command (INSERT / UPDATE / DELETE) with
510 /// binary-encoded parameters via Parse/Bind/Execute (async).
511 ///
512 /// # Errors
513 ///
514 /// - Returns [`Error::Other`] on gRPC transports.
515 /// - Returns [`Error::Client`] if the server rejects the statement at
516 /// `Parse`, `Bind`, or `Execute` time.
517 /// - Returns [`Error::Io`] on transport-level I/O failures.
518 pub async fn command_params(
519 &self,
520 query: &str,
521 params: &[&dyn crate::params::ToSqlParam],
522 ) -> Result<u64> {
523 let client = match &self.transport {
524 AsyncTransport::Tcp(tcp) => &tcp.client,
525 AsyncTransport::Grpc(_) => {
526 return Err(Error::new(
527 "prepared statements are not supported over gRPC transport",
528 ));
529 }
530 };
531 let oids: Vec<crate::Oid> = params.iter().map(|p| p.sql_oid()).collect();
532 let stmt = client.prepare_typed(query, &oids).await?;
533 let encoded: Vec<Option<Vec<u8>>> = params.iter().map(|p| p.encode_param()).collect();
534 Ok(client.execute_prepared_no_result(&stmt, encoded).await?)
535 }
536
537 // =========================================================================
538 // Catalog / Database Management
539 // =========================================================================
540
541 /// Creates a new database file (async).
542 ///
543 /// # Errors
544 ///
545 /// Returns [`Error::Client`] if the server rejects
546 /// `CREATE DATABASE IF NOT EXISTS` (e.g. the path is not writable).
547 pub async fn create_database(&self, path: &str) -> Result<()> {
548 let sql = format!("CREATE DATABASE IF NOT EXISTS {}", escape_sql_path(path));
549 self.execute_command(&sql).await?;
550 Ok(())
551 }
552
553 /// Drops (deletes) a database file (async).
554 ///
555 /// # Errors
556 ///
557 /// Returns [`Error::Client`] if the server rejects
558 /// `DROP DATABASE IF EXISTS` (e.g. the database is still attached).
559 pub async fn drop_database(&self, path: &str) -> Result<()> {
560 let sql = format!("DROP DATABASE IF EXISTS {}", escape_sql_path(path));
561 self.execute_command(&sql).await?;
562 Ok(())
563 }
564
565 /// Attaches a database file to the connection (async).
566 ///
567 /// # Errors
568 ///
569 /// Returns [`Error::Client`] if the server rejects the
570 /// `ATTACH DATABASE` statement (file missing, permission denied,
571 /// alias conflict).
572 pub async fn attach_database(&self, path: &str, alias: Option<&str>) -> Result<()> {
573 let sql = if let Some(alias) = alias {
574 format!(
575 "ATTACH DATABASE {} AS {}",
576 escape_sql_path(path),
577 escape_sql_path(alias)
578 )
579 } else {
580 format!("ATTACH DATABASE {}", escape_sql_path(path))
581 };
582 self.execute_command(&sql).await?;
583 Ok(())
584 }
585
586 /// Detaches a database alias from this connection (async).
587 ///
588 /// # Errors
589 ///
590 /// Returns [`Error::Client`] if the alias is not attached or the
591 /// server cannot flush pending updates.
592 pub async fn detach_database(&self, alias: &str) -> Result<()> {
593 let sql = format!("DETACH DATABASE {}", escape_sql_path(alias));
594 self.execute_command(&sql).await?;
595 Ok(())
596 }
597
598 /// Detaches all databases from this connection (async).
599 ///
600 /// # Errors
601 ///
602 /// Returns [`Error::Client`] if the server rejects
603 /// `DETACH ALL DATABASES`.
604 pub async fn detach_all_databases(&self) -> Result<()> {
605 self.execute_command("DETACH ALL DATABASES").await?;
606 Ok(())
607 }
608
609 /// Copies a database file to a new path (async).
610 ///
611 /// # Errors
612 ///
613 /// Returns [`Error::Client`] if the server rejects the
614 /// `COPY DATABASE` statement — e.g. the source is not attached or the
615 /// destination path is not writable.
616 pub async fn copy_database(&self, source: &str, destination: &str) -> Result<()> {
617 let sql = format!(
618 "COPY DATABASE {} TO {}",
619 escape_sql_path(source),
620 escape_sql_path(destination)
621 );
622 self.execute_command(&sql).await?;
623 Ok(())
624 }
625
626 /// Creates a schema in the database (async).
627 ///
628 /// # Errors
629 ///
630 /// - Returns an error if `schema_name` cannot be converted to a
631 /// [`SchemaName`](crate::SchemaName).
632 /// - Returns [`Error::Client`] if the server rejects
633 /// `CREATE SCHEMA IF NOT EXISTS`.
634 pub async fn create_schema<T>(&self, schema_name: T) -> Result<()>
635 where
636 T: TryInto<crate::SchemaName>,
637 crate::Error: From<T::Error>,
638 {
639 let schema: crate::SchemaName = schema_name.try_into()?;
640 let sql = format!("CREATE SCHEMA IF NOT EXISTS {schema}");
641 self.execute_command(&sql).await?;
642 Ok(())
643 }
644
645 /// Checks whether a schema exists (async).
646 ///
647 /// # Errors
648 ///
649 /// - Returns an error if `schema` cannot be converted to a
650 /// [`SchemaName`](crate::SchemaName).
651 /// - Returns [`Error::Client`] if the catalog lookup query fails.
652 pub async fn has_schema<T>(&self, schema: T) -> Result<bool>
653 where
654 T: TryInto<crate::SchemaName>,
655 crate::Error: From<T::Error>,
656 {
657 let schema: crate::SchemaName = schema.try_into()?;
658 let db_prefix = if let Some(db) = schema.database() {
659 format!("{db}.")
660 } else {
661 String::new()
662 };
663 let sql = format!(
664 "SELECT 1 FROM {}pg_catalog.pg_namespace WHERE nspname = '{}'",
665 db_prefix,
666 schema.unescaped().replace('\'', "''")
667 );
668 Ok(self.fetch_optional(&sql).await?.is_some())
669 }
670
671 /// Checks whether a table exists (async).
672 ///
673 /// # Errors
674 ///
675 /// - Returns an error if `table_name` cannot be converted to a
676 /// [`TableName`](crate::TableName).
677 /// - Returns [`Error::Client`] if the catalog lookup query fails.
678 pub async fn has_table<T>(&self, table_name: T) -> Result<bool>
679 where
680 T: TryInto<crate::TableName>,
681 crate::Error: From<T::Error>,
682 {
683 let table: crate::TableName = table_name.try_into()?;
684 let schema = table
685 .schema()
686 .map_or("public", super::names::Name::unescaped);
687 let db_prefix = if let Some(db) = table.database() {
688 format!("{db}.")
689 } else {
690 String::new()
691 };
692 let sql = format!(
693 "SELECT 1 FROM {}pg_catalog.pg_tables WHERE schemaname = '{}' AND tablename = '{}'",
694 db_prefix,
695 schema.replace('\'', "''"),
696 table.table().unescaped().replace('\'', "''")
697 );
698 Ok(self.fetch_optional(&sql).await?.is_some())
699 }
700
701 /// Unloads the database from memory but keeps the session alive (async).
702 ///
703 /// # Errors
704 ///
705 /// Returns [`Error::Client`] if the server rejects `UNLOAD DATABASE`
706 /// (e.g. the database is in use by another session).
707 pub async fn unload_database(&self) -> Result<()> {
708 self.execute_command("UNLOAD DATABASE").await?;
709 Ok(())
710 }
711
712 /// Releases the database completely from the session (async).
713 ///
714 /// # Errors
715 ///
716 /// Returns [`Error::Client`] if the server rejects `UNLOAD RELEASE`,
717 /// most commonly because multiple databases are attached to the same
718 /// session.
719 pub async fn unload_release(&self) -> Result<()> {
720 self.execute_command("UNLOAD RELEASE").await?;
721 Ok(())
722 }
723
724 // =========================================================================
725 // Diagnostics / Explain
726 // =========================================================================
727
728 /// Executes EXPLAIN and returns the plan text (async).
729 ///
730 /// # Errors
731 ///
732 /// Returns [`Error::Client`] if `EXPLAIN <query>` fails to parse or plan.
733 pub async fn explain(&self, query: &str) -> Result<String> {
734 let sql = format!("EXPLAIN {query}");
735 let rows = self.fetch_all(&sql).await?;
736 let lines: Vec<String> = rows.iter().filter_map(|r| r.get::<String>(0)).collect();
737 Ok(lines.join("\n"))
738 }
739
740 /// Executes EXPLAIN ANALYZE and returns the plan with timing (async).
741 ///
742 /// # Errors
743 ///
744 /// Returns [`Error::Client`] if `EXPLAIN ANALYZE <query>` fails — this
745 /// includes any runtime error raised by actually executing `query`.
746 pub async fn explain_analyze(&self, query: &str) -> Result<String> {
747 let sql = format!("EXPLAIN ANALYZE {query}");
748 let rows = self.fetch_all(&sql).await?;
749 let lines: Vec<String> = rows.iter().filter_map(|r| r.get::<String>(0)).collect();
750 Ok(lines.join("\n"))
751 }
752
753 // =========================================================================
754 // Connection Introspection / Lifecycle
755 // =========================================================================
756
757 /// Returns true if the connection is alive (passive check).
758 pub fn is_alive(&self) -> bool {
759 match &self.transport {
760 AsyncTransport::Tcp(tcp) => tcp.client.is_alive(),
761 AsyncTransport::Grpc(_) => true,
762 }
763 }
764
765 /// Actively pings the server with `SELECT 1` (async).
766 ///
767 /// # Errors
768 ///
769 /// Returns [`Error::Client`] or [`Error::Io`] if the `SELECT 1`
770 /// round-trip fails — i.e. the connection is no longer usable.
771 pub async fn ping(&self) -> Result<()> {
772 self.execute_command("SELECT 1").await?;
773 Ok(())
774 }
775
776 /// Returns the backend process ID, or 0 for gRPC transports.
777 pub fn process_id(&self) -> i32 {
778 match &self.transport {
779 AsyncTransport::Tcp(tcp) => tcp.client.process_id(),
780 AsyncTransport::Grpc(_) => 0,
781 }
782 }
783
784 /// Returns the secret key used for cancel requests, or 0 for gRPC.
785 pub fn secret_key(&self) -> i32 {
786 match &self.transport {
787 AsyncTransport::Tcp(tcp) => tcp.client.secret_key(),
788 AsyncTransport::Grpc(_) => 0,
789 }
790 }
791
792 /// Returns a server parameter value by name (async).
793 pub async fn parameter_status(&self, name: &str) -> Option<String> {
794 match &self.transport {
795 AsyncTransport::Tcp(tcp) => tcp.client.parameter_status(name).await,
796 AsyncTransport::Grpc(_) => None,
797 }
798 }
799
800 /// Returns the server version as a parsed struct (async).
801 pub async fn server_version(&self) -> Option<crate::ServerVersion> {
802 let version_str = self.parameter_status("server_version").await?;
803 crate::ServerVersion::parse(&version_str)
804 }
805
806 /// Sets the notice receiver callback for this connection.
807 pub fn set_notice_receiver(
808 &mut self,
809 receiver: Option<Box<dyn Fn(hyperdb_api_core::client::Notice) + Send + Sync>>,
810 ) {
811 match &mut self.transport {
812 AsyncTransport::Tcp(tcp) => tcp.client.set_notice_receiver(receiver),
813 AsyncTransport::Grpc(_) => {}
814 }
815 }
816
817 /// Cancels the currently running query (async).
818 ///
819 /// # Errors
820 ///
821 /// - Returns [`Error::Other`] on gRPC transports — cancellation is not
822 /// yet implemented for gRPC.
823 /// - Returns [`Error::Client`] or [`Error::Io`] if the cancel-request
824 /// connection to the server fails.
825 pub async fn cancel(&self) -> Result<()> {
826 self.transport.cancel().await
827 }
828
829 /// Closes the connection gracefully, detaching any attached database first (async).
830 ///
831 /// # Errors
832 ///
833 /// - Returns [`Error::Other`] wrapping the transport close failure if
834 /// the client cannot be shut down cleanly.
835 /// - Returns [`Error::Other`] wrapping the detach failure if the
836 /// attached database could not be detached but the transport close
837 /// itself succeeded.
838 pub async fn close(self) -> Result<()> {
839 let detach_err = if let Some(ref db_path) = self.database {
840 let db_alias = std::path::Path::new(db_path)
841 .file_stem()
842 .and_then(|s| s.to_str())
843 .unwrap_or("db");
844 self.execute_command(&format!("DETACH DATABASE {}", escape_sql_path(db_alias)))
845 .await
846 .err()
847 } else {
848 None
849 };
850
851 let close_result = self.transport.close().await;
852
853 if let Err(e) = close_result {
854 return Err(Error::with_cause("Failed to close async connection", e));
855 }
856
857 if let Some(e) = detach_err {
858 return Err(Error::with_cause(
859 "Failed to detach database during close",
860 e,
861 ));
862 }
863
864 Ok(())
865 }
866
867 /// Returns a reference to the underlying async TCP client (`None` for gRPC).
868 ///
869 /// Prefer the high-level `AsyncConnection` methods; this escape hatch
870 /// remains for code that needs direct protocol access (e.g. custom
871 /// COPY loops).
872 pub fn async_tcp_client(&self) -> Option<&hyperdb_api_core::client::AsyncClient> {
873 self.transport.async_tcp_client()
874 }
875
876 /// Crate-internal accessor for the transport. Used by
877 /// [`AsyncPreparedStatement`](crate::AsyncPreparedStatement) to reach
878 /// the underlying `hyperdb_api_core::client::AsyncClient`.
879 pub(crate) fn transport(&self) -> &AsyncTransport {
880 &self.transport
881 }
882
883 /// Prepares a SQL statement (async).
884 ///
885 /// See [`Connection::prepare`](crate::Connection::prepare) for
886 /// semantics. The returned
887 /// [`AsyncPreparedStatement`](crate::AsyncPreparedStatement) can be
888 /// executed many times with different parameter values.
889 ///
890 /// # Errors
891 ///
892 /// See [`prepare_typed`](Self::prepare_typed) — this method delegates
893 /// to it with an empty OID list.
894 pub async fn prepare(&self, query: &str) -> Result<crate::AsyncPreparedStatement<'_>> {
895 self.prepare_typed(query, &[]).await
896 }
897
898 /// Prepares a SQL statement with explicit parameter type OIDs (async).
899 ///
900 /// # Errors
901 ///
902 /// - Returns [`Error::Other`] on gRPC transports (prepared statements
903 /// are TCP-only).
904 /// - Returns [`Error::Client`] if the server rejects the `Parse`
905 /// message (SQL syntax error, unknown OID).
906 /// - Returns [`Error::Io`] on transport-level I/O failures.
907 pub async fn prepare_typed(
908 &self,
909 query: &str,
910 param_types: &[crate::Oid],
911 ) -> Result<crate::AsyncPreparedStatement<'_>> {
912 let client = match &self.transport {
913 AsyncTransport::Tcp(tcp) => &tcp.client,
914 AsyncTransport::Grpc(_) => {
915 return Err(Error::new(
916 "prepared statements are not supported over gRPC transport",
917 ));
918 }
919 };
920 let inner = client.prepare_typed(query, param_types).await?;
921 crate::AsyncPreparedStatement::new(self, inner)
922 }
923
924 /// Owned-handle variant of [`prepare`](Self::prepare). Returns a
925 /// `'static`-lifetime [`AsyncPreparedStatementOwned`](crate::AsyncPreparedStatementOwned)
926 /// that holds an `Arc`-cloned reference to `self`.
927 ///
928 /// Intended for N-API consumers and any other caller that needs
929 /// the prepared statement to outlive the stack frame where the
930 /// connection is held.
931 ///
932 /// # Errors
933 ///
934 /// See [`prepare_typed_arc`](Self::prepare_typed_arc).
935 pub async fn prepare_arc(
936 self: &Arc<Self>,
937 query: &str,
938 ) -> Result<crate::async_prepared::AsyncPreparedStatementOwned> {
939 self.prepare_typed_arc(query, &[]).await
940 }
941
942 /// Owned-handle variant of [`prepare_typed`](Self::prepare_typed).
943 ///
944 /// # Errors
945 ///
946 /// - Returns [`Error::Other`] on gRPC transports.
947 /// - Returns [`Error::Client`] if the server rejects the `Parse`
948 /// message.
949 /// - Returns [`Error::Io`] on transport-level I/O failures.
950 pub async fn prepare_typed_arc(
951 self: &Arc<Self>,
952 query: &str,
953 param_types: &[crate::Oid],
954 ) -> Result<crate::async_prepared::AsyncPreparedStatementOwned> {
955 let client = match &self.transport {
956 AsyncTransport::Tcp(tcp) => &tcp.client,
957 AsyncTransport::Grpc(_) => {
958 return Err(Error::new(
959 "prepared statements are not supported over gRPC transport",
960 ));
961 }
962 };
963 let inner = client.prepare_typed(query, param_types).await?;
964 crate::async_prepared::AsyncPreparedStatementOwned::new(Arc::clone(self), inner)
965 }
966
967 // =========================================================================
968 // Query Statistics
969 // =========================================================================
970
971 /// Enables query statistics collection for this connection.
972 pub fn enable_query_stats(&self, provider: impl QueryStatsProvider + 'static) {
973 if let Ok(mut guard) = self.stats_provider.lock() {
974 *guard = Some(Arc::new(provider));
975 }
976 }
977
978 /// Disables query statistics collection.
979 pub fn disable_query_stats(&self) {
980 if let Ok(mut guard) = self.stats_provider.lock() {
981 *guard = None;
982 }
983 if let Ok(mut guard) = self.pending_stats.lock() {
984 *guard = None;
985 }
986 }
987
988 /// Returns the stats for the most recent query (if enabled).
989 pub fn last_query_stats(&self) -> Option<QueryStats> {
990 let provider = self.stats_provider.lock().ok()?.as_ref().cloned()?;
991 let mut guard = self.pending_stats.lock().ok()?;
992 let (token, sql) = guard.take()?;
993 provider.after_query(token, &sql)
994 }
995
996 fn stats_before_query(&self, sql: &str) -> Option<Box<dyn Any + Send>> {
997 self.stats_provider
998 .lock()
999 .ok()?
1000 .as_ref()
1001 .map(|p| p.before_query(sql))
1002 }
1003
1004 fn stats_store_pending(&self, token: Option<Box<dyn Any + Send>>, sql: &str) {
1005 if let Some(token) = token {
1006 if let Ok(mut guard) = self.pending_stats.lock() {
1007 *guard = Some((token, sql.to_string()));
1008 }
1009 }
1010 }
1011}
1012
1013impl AsyncConnection {
1014 // =========================================================================
1015 // Transaction Control
1016 // =========================================================================
1017
1018 /// Begins an explicit transaction (async).
1019 ///
1020 /// # Errors
1021 ///
1022 /// Returns [`Error::Client`] if the server rejects `BEGIN TRANSACTION`
1023 /// (e.g. a transaction is already open on this session).
1024 pub async fn begin_transaction(&self) -> Result<()> {
1025 self.execute_command("BEGIN TRANSACTION").await?;
1026 Ok(())
1027 }
1028
1029 /// Commits the current transaction (async).
1030 ///
1031 /// # Errors
1032 ///
1033 /// Returns [`Error::Client`] if the server rejects `COMMIT` (e.g. no
1034 /// transaction is currently open).
1035 pub async fn commit(&self) -> Result<()> {
1036 self.execute_command("COMMIT").await?;
1037 Ok(())
1038 }
1039
1040 /// Rolls back the current transaction (async).
1041 ///
1042 /// # Errors
1043 ///
1044 /// Returns [`Error::Client`] if the server rejects `ROLLBACK` (e.g. no
1045 /// transaction is currently open).
1046 pub async fn rollback(&self) -> Result<()> {
1047 self.execute_command("ROLLBACK").await?;
1048 Ok(())
1049 }
1050
1051 /// Starts a transaction with an async RAII guard (async).
1052 ///
1053 /// # Errors
1054 ///
1055 /// Returns [`Error::Client`] if the internal `BEGIN` issued by
1056 /// [`AsyncTransaction::new`](crate::AsyncTransaction) fails.
1057 pub async fn transaction(&mut self) -> Result<crate::AsyncTransaction<'_>> {
1058 crate::AsyncTransaction::new(self).await
1059 }
1060}