Skip to main content

hyperdb_api/
connection.rs

1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Database connection management.
5//!
6//! The [`Connection`] type provides a unified interface for connecting to Hyper
7//! databases via either TCP (`PostgreSQL` wire protocol) or gRPC transport.
8//! The transport is automatically detected from the endpoint URL:
9//!
10//! - `https://` or `http://` → gRPC transport
11//! - Otherwise → TCP transport (e.g., `localhost:7483`)
12
13use std::path::Path;
14
15use hyperdb_api_core::client::Client;
16
17use crate::error::{Error, Result};
18use crate::names::escape_sql_path;
19use crate::process::HyperProcess;
20use crate::result::{Row, Rowset, DEFAULT_BINARY_CHUNK_SIZE};
21use crate::transport::Transport;
22
23use std::any::Any;
24use std::sync::{Arc, Mutex};
25
26use crate::query_stats::{QueryStats, QueryStatsProvider};
27
28/// Trait for types that can be extracted from a scalar query result.
29///
30/// This trait enables the generic [`Connection::execute_scalar_query`] method,
31/// similar to C++'s `executeScalarQuery<T>()` template.
32///
33/// # Implementing Custom Types
34///
35/// You can implement this trait for custom types to use them with `execute_scalar_query`:
36///
37/// ```no_run
38/// # use hyperdb_api::{Row, ScalarValue};
39/// # struct MyType;
40/// # impl MyType { fn parse(s: &str) -> Self { MyType } }
41/// impl ScalarValue for MyType {
42///     fn from_row(row: &Row, col: usize) -> Option<Self> {
43///         row.get_string(col).map(|s| MyType::parse(&s))
44///     }
45/// }
46/// ```
47pub trait ScalarValue: Sized {
48    /// Extracts a value of this type from a row at the given column.
49    fn from_row(row: &Row, col: usize) -> Option<Self>;
50}
51
52impl ScalarValue for i64 {
53    fn from_row(row: &Row, col: usize) -> Option<Self> {
54        row.get_i64(col)
55    }
56}
57
58impl ScalarValue for i32 {
59    fn from_row(row: &Row, col: usize) -> Option<Self> {
60        row.get_i32(col)
61    }
62}
63
64impl ScalarValue for i16 {
65    fn from_row(row: &Row, col: usize) -> Option<Self> {
66        row.get_i16(col)
67    }
68}
69
70impl ScalarValue for f64 {
71    fn from_row(row: &Row, col: usize) -> Option<Self> {
72        row.get_f64(col)
73    }
74}
75
76impl ScalarValue for bool {
77    fn from_row(row: &Row, col: usize) -> Option<Self> {
78        row.get_bool(col)
79    }
80}
81
82impl ScalarValue for String {
83    fn from_row(row: &Row, col: usize) -> Option<Self> {
84        row.get_string(col)
85    }
86}
87
88/// Database creation mode when connecting.
89#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
90pub enum CreateMode {
91    /// Do not create the database. Method will fail if database doesn't exist.
92    #[default]
93    DoNotCreate,
94    /// Create the database. Method will fail if the database already exists.
95    Create,
96    /// Create the database if it doesn't exist.
97    CreateIfNotExists,
98    /// Create the database. If it already exists, drop the old one first.
99    CreateAndReplace,
100}
101
102/// A connection to a Hyper database.
103///
104/// This struct represents an active connection to a Hyper server and optionally
105/// an attached database. The connection is automatically closed when dropped.
106///
107/// # Transport Auto-Detection
108///
109/// The transport is automatically detected from the endpoint URL:
110/// - `https://` or `http://` → gRPC transport (read-only until server supports writes)
111/// - Otherwise → TCP transport (full read/write support)
112///
113/// # CSV / Text Import & Export
114///
115/// For CSV, TSV, and other delimited-text formats, see the [`copy`](crate::copy)
116/// module which provides [`export_csv()`](Self::export_csv),
117/// [`import_csv()`](Self::import_csv), and related methods on this struct.
118///
119/// # Example
120///
121/// ```no_run
122/// use hyperdb_api::{Connection, CreateMode, Result};
123///
124/// fn main() -> Result<()> {
125///     // TCP connection (full read/write)
126///     let conn = Connection::connect("localhost:7483", "example.hyper", CreateMode::CreateIfNotExists)?;
127///
128///     // Execute SQL commands
129///     conn.execute_command("CREATE TABLE test (id INT, name TEXT)")?;
130///     conn.execute_command("INSERT INTO test VALUES (1, 'Hello')")?;
131///
132///     Ok(())
133/// }
134/// ```
135///
136/// ```no_run
137/// # use hyperdb_api::{Connection, CreateMode, Result};
138/// # fn example() -> Result<()> {
139/// // gRPC connection (read-only, auto-detected from URL)
140/// let conn = Connection::connect(
141///     "https://hyper-server.example.com:443",
142///     "example.hyper",
143///     CreateMode::DoNotCreate,  // Must be DoNotCreate for gRPC
144/// )?;
145/// # Ok(())
146/// # }
147/// ```
148pub struct Connection {
149    transport: Transport,
150    database: Option<String>,
151    stats_provider: Option<Arc<dyn QueryStatsProvider>>,
152    /// Pending stats token + SQL from the most recent query, resolved lazily.
153    pending_stats: Mutex<Option<(Box<dyn Any + Send>, String)>>,
154}
155
156impl std::fmt::Debug for Connection {
157    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158        f.debug_struct("Connection")
159            .field("database", &self.database)
160            .finish_non_exhaustive()
161    }
162}
163
164impl Connection {
165    /// Creates a new connection to a Hyper instance with a database.
166    ///
167    /// This is the primary way to connect to a running [`HyperProcess`].
168    ///
169    /// # Arguments
170    ///
171    /// * `instance` - The Hyper server instance to connect to.
172    /// * `database_path` - Path to the database file.
173    /// * `create_mode` - How to handle database creation.
174    ///
175    /// # Errors
176    ///
177    /// Returns an error if the connection could not be established.
178    ///
179    /// # Example
180    ///
181    /// ```no_run
182    /// use hyperdb_api::{HyperProcess, Connection, CreateMode, Result};
183    ///
184    /// fn main() -> Result<()> {
185    ///     let hyper = HyperProcess::new(None, None)?;
186    ///     let conn = Connection::new(&hyper, "database.hyper", CreateMode::CreateIfNotExists)?;
187    ///     Ok(())
188    /// }
189    /// ```
190    pub fn new(
191        instance: &HyperProcess,
192        database_path: impl AsRef<Path>,
193        create_mode: CreateMode,
194    ) -> Result<Self> {
195        // Prefer using the connection_endpoint which properly handles UDS/Named Pipes
196        if let Some(conn_endpoint) = instance.connection_endpoint() {
197            return Self::connect_with_endpoint(
198                conn_endpoint,
199                &database_path.as_ref().to_string_lossy(),
200                create_mode,
201            );
202        }
203
204        // Fall back to string endpoint (TCP)
205        let endpoint = instance.require_endpoint()?;
206        Self::connect(
207            endpoint,
208            &database_path.as_ref().to_string_lossy(),
209            create_mode,
210        )
211    }
212
213    /// Connects using a `ConnectionEndpoint` (supports TCP, UDS, and Named Pipes).
214    fn connect_with_endpoint(
215        endpoint: &hyperdb_api_core::client::ConnectionEndpoint,
216        database_path: &str,
217        create_mode: CreateMode,
218    ) -> Result<Self> {
219        let db_path_str = Some(database_path.to_string());
220
221        let config = hyperdb_api_core::client::Config::new().with_user("tableau_internal_user");
222
223        let client = hyperdb_api_core::client::Client::connect_endpoint(endpoint, &config)?;
224
225        let conn = Connection::from_client(client, db_path_str.clone());
226
227        // Handle database creation
228        if let Some(db_path) = db_path_str {
229            conn.handle_creation_mode(&db_path, create_mode)?;
230            conn.attach_and_set_path(&db_path)?;
231        }
232
233        Ok(conn)
234    }
235
236    /// Connects to a Hyper server and optionally attaches a database.
237    ///
238    /// # Arguments
239    ///
240    /// * `endpoint` - The server endpoint (host:port).
241    /// * `database_path` - Path to the database file.
242    /// * `create_mode` - How to handle database creation.
243    ///
244    /// # Errors
245    ///
246    /// Returns an error if the connection could not be established.
247    pub fn connect(endpoint: &str, database_path: &str, create_mode: CreateMode) -> Result<Self> {
248        crate::ConnectionBuilder::new(endpoint)
249            .database(database_path)
250            .create_mode(create_mode)
251            .build()
252    }
253
254    /// Returns a connection builder for advanced configuration.
255    ///
256    /// This is useful when you need to set authentication, timeouts, or
257    /// other advanced options before connecting.
258    #[must_use]
259    pub fn builder(endpoint: &str) -> crate::ConnectionBuilder {
260        crate::ConnectionBuilder::new(endpoint)
261    }
262
263    /// Creates a Connection from a low-level Client (internal use, TCP only).
264    pub(crate) fn from_client(client: Client, database: Option<String>) -> Self {
265        Connection {
266            transport: Transport::Tcp(Box::new(crate::transport::TcpTransport { client })),
267            database,
268            stats_provider: None,
269            pending_stats: Mutex::new(None),
270        }
271    }
272
273    /// Creates a Connection from a Transport (internal use).
274    #[allow(
275        dead_code,
276        reason = "used by ConnectionBuilder for the gRPC path; not reached under non-gRPC feature builds"
277    )]
278    pub(crate) fn from_transport(transport: Transport, database: Option<String>) -> Self {
279        Connection {
280            transport,
281            database,
282            stats_provider: None,
283            pending_stats: Mutex::new(None),
284        }
285    }
286
287    /// Returns the transport type name (e.g., "TCP", "gRPC", "Unix Socket").
288    pub fn transport_type(&self) -> &'static str {
289        self.transport.transport_type().as_str()
290    }
291
292    /// Returns true if this connection supports write operations.
293    ///
294    /// Currently, only TCP connections support writes. gRPC connections are
295    /// read-only until the server supports write operations over gRPC.
296    pub fn supports_writes(&self) -> bool {
297        self.transport.supports_writes()
298    }
299
300    /// Handles database creation logic (internal use).
301    pub(crate) fn handle_creation_mode(
302        &self,
303        database_path: &str,
304        create_mode: CreateMode,
305    ) -> Result<()> {
306        match create_mode {
307            CreateMode::DoNotCreate => {}
308            CreateMode::Create => {
309                self.execute_command(&format!(
310                    "CREATE DATABASE {}",
311                    escape_sql_path(database_path)
312                ))?;
313            }
314            CreateMode::CreateIfNotExists => {
315                if let Err(e) = self.execute_command(&format!(
316                    "CREATE DATABASE IF NOT EXISTS {}",
317                    escape_sql_path(database_path)
318                )) {
319                    if !is_already_exists_error(&e) {
320                        return Err(Error::internal(format!(
321                            "Failed to create database '{database_path}': {e}"
322                        )));
323                    }
324                }
325            }
326            CreateMode::CreateAndReplace => {
327                let _ = self.execute_command(&format!(
328                    "DROP DATABASE IF EXISTS {}",
329                    escape_sql_path(database_path)
330                ));
331                self.execute_command(&format!(
332                    "CREATE DATABASE {}",
333                    escape_sql_path(database_path)
334                ))?;
335            }
336        }
337        Ok(())
338    }
339
340    /// Attaches and sets the database path (internal use).
341    pub(crate) fn attach_and_set_path(&self, database_path: &str) -> Result<()> {
342        let db_alias = std::path::Path::new(database_path)
343            .file_stem()
344            .and_then(|s| s.to_str())
345            .unwrap_or("db");
346
347        self.execute_command(&format!(
348            "ATTACH DATABASE {} AS {}",
349            escape_sql_path(database_path),
350            escape_sql_path(db_alias)
351        ))?;
352
353        self.execute_command(&format!(
354            "SET search_path TO {}, public",
355            escape_sql_path(db_alias)
356        ))?;
357
358        Ok(())
359    }
360
361    /// Connects to a Hyper server with authentication.
362    ///
363    /// # Arguments
364    ///
365    /// * `endpoint` - The server endpoint (host:port).
366    /// * `database_path` - Path to the database file.
367    /// * `create_mode` - How to handle database creation.
368    /// * `user` - Username for authentication.
369    /// * `password` - Password for authentication.
370    ///
371    /// # Errors
372    ///
373    /// Returns an error if the connection or authentication fails.
374    pub fn connect_with_auth(
375        endpoint: &str,
376        database_path: &str,
377        create_mode: CreateMode,
378        user: &str,
379        password: &str,
380    ) -> Result<Self> {
381        crate::ConnectionBuilder::new(endpoint)
382            .database(database_path)
383            .create_mode(create_mode)
384            .user(user.to_string())
385            .password(password)
386            .build()
387    }
388
389    /// Creates a connection to a Hyper server without attaching a database.
390    ///
391    /// # Errors
392    ///
393    /// Returns [`Error::Connection`] if the TCP or gRPC handshake fails, and
394    /// [`Error::Io`] if the endpoint cannot be reached.
395    pub fn without_database(endpoint: &str) -> Result<Self> {
396        crate::ConnectionBuilder::new(endpoint).build()
397    }
398
399    /// Executes a SQL command that doesn't return results.
400    ///
401    /// Use this for DDL statements (CREATE, ALTER, DROP) and DML statements
402    /// (INSERT, UPDATE, DELETE).
403    ///
404    /// # Arguments
405    ///
406    /// * `command` - The SQL command to execute.
407    ///
408    /// # Returns
409    ///
410    /// The number of affected rows, or 0 if not applicable.
411    ///
412    /// # Errors
413    ///
414    /// Returns an error if:
415    /// - The connection is using gRPC transport (write operations not yet supported)
416    /// - The command fails to execute
417    pub fn execute_command(&self, command: &str) -> Result<u64> {
418        let token = self.stats_before_query(command);
419
420        let result = self.transport.execute_command(command);
421
422        // For commands, the query is fully executed synchronously, so we
423        // can store the pending token immediately for lazy resolution.
424        self.stats_store_pending(token, command);
425
426        result
427    }
428
429    /// Executes a SQL query and returns a streaming result set.
430    ///
431    /// Results are streamed in chunks (default 64K rows), keeping memory usage
432    /// constant regardless of result set size. This makes it safe for any
433    /// result size, from a single row to billions of rows.
434    ///
435    /// # Example
436    ///
437    /// ```no_run
438    /// # use hyperdb_api::{Connection, Result};
439    /// # fn example(conn: &Connection) -> Result<()> {
440    /// let mut result = conn.execute_query("SELECT id, value FROM measurements")?;
441    /// while let Some(chunk) = result.next_chunk()? {
442    ///     for row in &chunk {
443    ///         // Generic typed access (like C++ row.get<T>())
444    ///         let id: Option<i32> = row.get(0);
445    ///         let value: Option<f64> = row.get(1);
446    ///
447    ///         // Or direct accessors
448    ///         let id = row.get_i32(0);
449    ///         let value = row.get_f64(1);
450    ///     }
451    /// }
452    /// # Ok(())
453    /// # }
454    /// ```
455    ///
456    /// # Memory Behavior
457    ///
458    /// - Only one chunk is held in memory at a time (~few MB for 64K rows)
459    /// - Safe for result sets of any size (millions/billions of rows)
460    /// - Memory usage is `O(chunk_size)`, not `O(total_rows)`
461    ///
462    /// # Errors
463    ///
464    /// - Returns [`Error::Server`] wrapping a `hyperdb_api_core::client::Error` if the
465    ///   SQL fails to parse, execute, or if the server reports an error
466    ///   while streaming.
467    /// - Returns [`Error::Io`] on transport-level I/O failures.
468    pub fn execute_query(&self, query: &str) -> Result<Rowset<'_>> {
469        let token = self.stats_before_query(query);
470
471        let result = match &self.transport {
472            Transport::Tcp(tcp) => {
473                let stream = tcp
474                    .client
475                    .query_streaming(query, DEFAULT_BINARY_CHUNK_SIZE)?;
476                Ok(Rowset::new(stream))
477            }
478            Transport::Grpc(grpc) => {
479                // gRPC streaming: pull chunks lazily so peak memory is
480                // bounded by one gRPC message (tonic default 64 MB), not
481                // by the full result size. Matches TCP's
482                // constant-memory streaming shape.
483                //
484                // The transport module already creates a fresh gRPC client
485                // per query (gRPC client needs &mut self to execute), so we
486                // do the same here: connect, start the stream, wrap as a
487                // `ChunkSource`. The stream keeps the channel and runtime
488                // alive via refcounted handles inside `GrpcChunkStreamSync`.
489                let mut client =
490                    hyperdb_api_core::client::grpc::GrpcClientSync::connect(grpc.config.clone())?;
491                let stream = client.execute_query_stream(query)?;
492                let source = Box::new(crate::grpc_connection::GrpcChunkStreamSource::new(stream));
493                let arrow_rowset = crate::arrow_result::ArrowRowset::from_stream(source)?;
494                Ok(Rowset::from_arrow(arrow_rowset))
495            }
496        };
497
498        // Store the pending token — Hyper logs the execution stats after the
499        // result is consumed (streamed), so we defer resolution until
500        // last_query_stats() is called.
501        self.stats_store_pending(token, query);
502
503        result
504    }
505
506    // =========================================================================
507    // Arrow Format Queries
508    // =========================================================================
509
510    /// Executes a SELECT query and returns results as Arrow IPC stream bytes.
511    ///
512    /// # Example
513    ///
514    /// ```no_run
515    /// use hyperdb_api::{Connection, CreateMode, Result};
516    ///
517    /// fn main() -> Result<()> {
518    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
519    ///
520    ///     // Create and populate a table
521    ///     conn.execute_command("CREATE TABLE data (id INT, value DOUBLE PRECISION)")?;
522    ///     conn.execute_command("INSERT INTO data VALUES (1, 1.5), (2, 2.5)")?;
523    ///
524    ///     // Get results as Arrow IPC stream
525    ///     let arrow_data = conn.execute_query_to_arrow("SELECT * FROM data")?;
526    ///     println!("Got {} bytes of Arrow IPC data", arrow_data.len());
527    ///
528    ///     Ok(())
529    /// }
530    /// ```
531    ///
532    /// # Errors
533    ///
534    /// Propagates any [`Error::Server`] from the TCP or gRPC transport when
535    /// the query fails or the server cannot produce Arrow IPC output.
536    pub fn execute_query_to_arrow(&self, select_query: &str) -> Result<bytes::Bytes> {
537        self.transport.execute_query_to_arrow(select_query)
538    }
539
540    /// Exports an entire table to Arrow IPC stream format.
541    ///
542    /// This is a convenience method equivalent to
543    /// `execute_query_to_arrow("SELECT * FROM table_name")`.
544    ///
545    /// # Arguments
546    ///
547    /// * `table_name` - The table name
548    ///
549    /// # Returns
550    ///
551    /// Raw Arrow IPC stream bytes containing all rows from the table.
552    ///
553    /// # Example
554    ///
555    /// ```no_run
556    /// use hyperdb_api::{Connection, CreateMode, Result};
557    ///
558    /// fn main() -> Result<()> {
559    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
560    ///     let arrow_data = conn.export_table_to_arrow("my_table")?;
561    ///     Ok(())
562    /// }
563    /// ```
564    ///
565    /// # Errors
566    ///
567    /// Returns whatever [`execute_query_to_arrow`](Self::execute_query_to_arrow)
568    /// would return for `SELECT * FROM <table_name>` — typically
569    /// [`Error::Server`] if the table does not exist or the query is rejected.
570    pub fn export_table_to_arrow(&self, table_name: &str) -> Result<bytes::Bytes> {
571        self.execute_query_to_arrow(&format!("SELECT * FROM {table_name}"))
572    }
573
574    /// Executes a SELECT query and returns results as Arrow `RecordBatch`es.
575    ///
576    /// This is the recommended method for Arrow-native workflows (`DataFusion`,
577    /// Polars, etc.) where you want direct `RecordBatch` access without going
578    /// through the `Row` abstraction.
579    ///
580    /// # Example
581    ///
582    /// ```no_run
583    /// use hyperdb_api::{Connection, CreateMode, Result};
584    /// use arrow::record_batch::RecordBatch;
585    ///
586    /// fn main() -> Result<()> {
587    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
588    ///
589    ///     let batches: Vec<RecordBatch> = conn.execute_query_to_batches("SELECT * FROM data")?;
590    ///     for batch in &batches {
591    ///         println!("batch: {} rows x {} cols", batch.num_rows(), batch.num_columns());
592    ///     }
593    ///     Ok(())
594    /// }
595    /// ```
596    ///
597    /// # Errors
598    ///
599    /// - Returns [`Error::Server`] if the query itself fails.
600    /// - Returns [`Error::Conversion`] if the Arrow IPC payload returned by the
601    ///   server is malformed and cannot be decoded into record batches.
602    pub fn execute_query_to_batches(
603        &self,
604        select_query: &str,
605    ) -> Result<Vec<arrow::record_batch::RecordBatch>> {
606        let arrow_data = self.execute_query_to_arrow(select_query)?;
607        crate::arrow_result::parse_arrow_ipc(arrow_data)
608    }
609
610    /// Fetches a single row from a query.
611    ///
612    /// Returns an error if the query returns no rows.
613    ///
614    /// # Example
615    ///
616    /// ```no_run
617    /// use hyperdb_api::{Connection, CreateMode, Result};
618    ///
619    /// fn main() -> Result<()> {
620    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
621    ///     let row = conn.fetch_one("SELECT * FROM users WHERE id = 1")?;
622    ///     let id: Option<i32> = row.get(0);
623    ///     let name: Option<String> = row.get(1);
624    ///     Ok(())
625    /// }
626    /// ```
627    ///
628    /// # Errors
629    ///
630    /// - Returns the error from [`execute_query`](Self::execute_query) if
631    ///   the query itself fails.
632    /// - Returns [`Error::Conversion`] with message `"Query returned no rows"` if
633    ///   the query produced zero rows.
634    pub fn fetch_one<Q>(&self, query: Q) -> Result<crate::Row>
635    where
636        Q: AsRef<str>,
637    {
638        let query = query.as_ref();
639        let result = self.execute_query(query)?;
640        result.require_first_row()
641    }
642
643    /// Fetches an optional single row from a query.
644    ///
645    /// Returns `None` if the query returns no rows.
646    ///
647    /// # Example
648    ///
649    /// ```no_run
650    /// use hyperdb_api::{Connection, CreateMode, Result};
651    ///
652    /// fn main() -> Result<()> {
653    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
654    ///     if let Some(row) = conn.fetch_optional("SELECT * FROM users WHERE id = 999")? {
655    ///         let name: Option<String> = row.get(1);
656    ///         println!("Found user: {:?}", name);
657    ///     }
658    ///     Ok(())
659    /// }
660    /// ```
661    ///
662    /// # Errors
663    ///
664    /// Returns the error from [`execute_query`](Self::execute_query) if the
665    /// query itself fails. An empty result set is not an error — it yields
666    /// `Ok(None)`.
667    pub fn fetch_optional<Q>(&self, query: Q) -> Result<Option<crate::Row>>
668    where
669        Q: AsRef<str>,
670    {
671        let query = query.as_ref();
672        let result = self.execute_query(query)?;
673        result.first_row()
674    }
675
676    /// Fetches all rows from a query.
677    ///
678    /// # Example
679    ///
680    /// ```no_run
681    /// use hyperdb_api::{Connection, CreateMode, Result};
682    ///
683    /// fn main() -> Result<()> {
684    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
685    ///     let rows = conn.fetch_all("SELECT * FROM users WHERE active = true ORDER BY name")?;
686    ///     for row in rows {
687    ///         let id: Option<i32> = row.get(0);
688    ///         let name: Option<String> = row.get(1);
689    ///         println!("User {}: {:?}", id.unwrap_or(-1), name);
690    ///     }
691    ///     Ok(())
692    /// }
693    /// ```
694    ///
695    /// # Errors
696    ///
697    /// Returns the error from [`execute_query`](Self::execute_query), or a
698    /// transport error produced while draining every chunk of the streamed
699    /// result set.
700    pub fn fetch_all<Q>(&self, query: Q) -> Result<Vec<crate::Row>>
701    where
702        Q: AsRef<str>,
703    {
704        let query = query.as_ref();
705        let result = self.execute_query(query)?;
706        result.collect_rows()
707    }
708
709    /// Fetches a single row and maps it to a struct using [`FromRow`](crate::FromRow).
710    ///
711    /// Returns an error if the query returns no rows or if mapping fails.
712    ///
713    /// # Example
714    ///
715    /// ```no_run
716    /// use hyperdb_api::{Connection, CreateMode, FromRow, RowAccessor, Result};
717    ///
718    /// struct User { id: i32, name: String }
719    ///
720    /// impl FromRow for User {
721    ///     fn from_row(row: RowAccessor<'_>) -> Result<Self> {
722    ///         Ok(User {
723    ///             id: row.get("id")?,
724    ///             name: row.get_opt("name")?.unwrap_or_default(),
725    ///         })
726    ///     }
727    /// }
728    ///
729    /// fn main() -> Result<()> {
730    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
731    ///     let user: User = conn.fetch_one_as("SELECT id, name FROM users WHERE id = 1")?;
732    ///     Ok(())
733    /// }
734    /// ```
735    ///
736    /// # Errors
737    ///
738    /// - Returns the error from [`fetch_one`](Self::fetch_one) if the query
739    ///   fails or returns no rows.
740    /// - Returns whatever error [`FromRow::from_row`](crate::FromRow::from_row)
741    ///   produces when the row cannot be mapped into `T`.
742    pub fn fetch_one_as<T: crate::FromRow>(&self, query: &str) -> Result<T> {
743        let row = self.fetch_one(query)?;
744        let indices = row
745            .schema()
746            .map(crate::row_accessor::RowAccessor::build_indices)
747            .unwrap_or_default();
748        T::from_row(crate::RowAccessor::new(&row, &indices))
749    }
750
751    /// Fetches all rows and maps them to structs using [`FromRow`](crate::FromRow).
752    ///
753    /// # Example
754    ///
755    /// ```no_run
756    /// # use hyperdb_api::{Connection, FromRow, RowAccessor, Result};
757    /// # struct User { id: i32, name: String }
758    /// # impl FromRow for User {
759    /// #     fn from_row(row: RowAccessor<'_>) -> Result<Self> {
760    /// #         Ok(User { id: row.get("id")?, name: row.get_opt("name")?.unwrap_or_default() })
761    /// #     }
762    /// # }
763    /// # fn example(conn: &Connection) -> Result<()> {
764    /// let users: Vec<User> = conn.fetch_all_as("SELECT id, name FROM users")?;
765    /// # Ok(())
766    /// # }
767    /// ```
768    ///
769    /// # Errors
770    ///
771    /// - Returns the error from [`fetch_all`](Self::fetch_all) if the query
772    ///   fails.
773    /// - Returns the first error produced by
774    ///   [`FromRow::from_row`](crate::FromRow::from_row) on any of the rows.
775    pub fn fetch_all_as<T: crate::FromRow>(&self, query: &str) -> Result<Vec<T>> {
776        let rows = self.fetch_all(query)?;
777        // Build the column-name → index lookup once from the first
778        // row's schema; reuse for every row. All rows in a result set
779        // share the same `Arc<ResultSchema>`, so this is safe.
780        let indices = rows
781            .first()
782            .and_then(crate::result::Row::schema)
783            .map(crate::row_accessor::RowAccessor::build_indices)
784            .unwrap_or_default();
785        rows.iter()
786            .map(|r| T::from_row(crate::RowAccessor::new(r, &indices)))
787            .collect()
788    }
789
790    /// Fetches a single scalar value from a query.
791    ///
792    /// Returns an error if the query returns no rows or NULL.
793    ///
794    /// # Example
795    ///
796    /// ```no_run
797    /// use hyperdb_api::{Connection, CreateMode, Result};
798    ///
799    /// fn main() -> Result<()> {
800    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
801    ///     let count: i64 = conn.fetch_scalar("SELECT COUNT(*) FROM users")?;
802    ///     println!("User count: {}", count);
803    ///     Ok(())
804    /// }
805    /// ```
806    ///
807    /// # Errors
808    ///
809    /// - Returns the error from [`execute_query`](Self::execute_query) if
810    ///   the query itself fails.
811    /// - Returns [`Error::Conversion`] with message `"Query returned no rows"` if
812    ///   the query produced zero rows.
813    /// - Returns [`Error::Conversion`] with message `"Scalar query returned NULL"`
814    ///   if the single cell is SQL `NULL`.
815    pub fn fetch_scalar<T, Q>(&self, query: Q) -> Result<T>
816    where
817        T: crate::connection::ScalarValue + crate::result::RowValue,
818        Q: AsRef<str>,
819    {
820        let query = query.as_ref();
821        let result = self.execute_query(query)?;
822        result.require_scalar()
823    }
824
825    /// Fetches an optional scalar value from a query.
826    ///
827    /// Returns `None` if the query returns no rows or NULL.
828    ///
829    /// # Example
830    ///
831    /// ```no_run
832    /// use hyperdb_api::{Connection, CreateMode, Result};
833    ///
834    /// fn main() -> Result<()> {
835    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
836    ///     let max_id: Option<i32> = conn.fetch_optional_scalar("SELECT MAX(id) FROM users")?;
837    ///     println!("Max ID: {:?}", max_id);
838    ///     Ok(())
839    /// }
840    /// ```
841    ///
842    /// # Errors
843    ///
844    /// - Returns the error from [`execute_query`](Self::execute_query) if
845    ///   the query itself fails.
846    /// - Returns [`Error::Conversion`] with message `"Query returned no rows"` if
847    ///   the query produced zero rows. (An empty result is treated as an
848    ///   error here because we need at least one row to inspect; SQL `NULL`
849    ///   in the single cell yields `Ok(None)`.)
850    pub fn fetch_optional_scalar<T, Q>(&self, query: Q) -> Result<Option<T>>
851    where
852        T: crate::connection::ScalarValue + crate::result::RowValue,
853        Q: AsRef<str>,
854    {
855        let query = query.as_ref();
856        let result = self.execute_query(query)?;
857        result.scalar()
858    }
859
860    /// Executes a scalar query and returns a single value of type `T`.
861    ///
862    /// Alias for [`fetch_optional_scalar`](Self::fetch_optional_scalar) for C++ API compatibility.
863    ///
864    /// # Errors
865    ///
866    /// See [`fetch_optional_scalar`](Self::fetch_optional_scalar).
867    #[inline]
868    pub fn execute_scalar_query<T>(&self, query: &str) -> Result<Option<T>>
869    where
870        T: ScalarValue + crate::result::RowValue,
871    {
872        self.fetch_optional_scalar(query)
873    }
874
875    /// Queries for a count value, defaulting to 0 if NULL.
876    ///
877    /// This is optimized for COUNT queries which typically return 0
878    /// instead of NULL when there are no matching rows.
879    ///
880    /// # Example
881    ///
882    /// ```no_run
883    /// use hyperdb_api::{Connection, CreateMode, Result};
884    ///
885    /// fn main() -> Result<()> {
886    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
887    ///     let count = conn.query_count("SELECT COUNT(*) FROM users WHERE active = true")?;
888    ///     println!("Active users: {}", count);
889    ///     Ok(())
890    /// }
891    /// ```
892    ///
893    /// # Errors
894    ///
895    /// Returns the error from [`execute_query`](Self::execute_query) if the
896    /// query fails or produces no rows. SQL `NULL` is mapped to `0`, not an
897    /// error.
898    pub fn query_count(&self, query: &str) -> Result<i64> {
899        self.fetch_optional_scalar::<i64, _>(query)
900            .map(|opt| opt.unwrap_or(0))
901    }
902
903    // =========================================================================
904    // Parameterized Queries (SQL Injection Safe)
905    // =========================================================================
906
907    /// Executes a parameterized query, returning streaming results.
908    ///
909    /// This is safe to use with untrusted user input: parameters travel
910    /// through the extended query protocol (Parse/Bind/Execute) as
911    /// binary `HyperBinary` values and are never interpolated into the
912    /// SQL string. For repeated executions of the same SQL with different
913    /// values, prefer the explicit [`prepare`](Self::prepare) API — it
914    /// returns a reusable [`PreparedStatement`](crate::PreparedStatement)
915    /// that skips the Parse round-trip on every call.
916    ///
917    /// Under the hood, `query_params` is a one-shot
918    /// prepare+execute+close: it prepares an unnamed statement, binds
919    /// the parameters, starts streaming, and closes the statement when
920    /// the returned [`Rowset`] is dropped.
921    ///
922    /// # Arguments
923    ///
924    /// * `query` - The SQL query with parameter placeholders (`$1`, `$2`, etc.)
925    /// * `params` - Parameter values matching the placeholders
926    ///
927    /// # SQL Injection Prevention
928    ///
929    /// ```no_run
930    /// use hyperdb_api::{Connection, CreateMode, Result};
931    ///
932    /// fn search_users(conn: &Connection, user_input: &str) -> Result<()> {
933    ///     // DANGEROUS - vulnerable to SQL injection:
934    ///     // let query = format!("SELECT * FROM users WHERE name = '{}'", user_input);
935    ///
936    ///     // SAFE - parameterized query:
937    ///     let mut result = conn.query_params(
938    ///         "SELECT * FROM users WHERE name = $1",
939    ///         &[&user_input],
940    ///     )?;
941    ///
942    ///     while let Some(chunk) = result.next_chunk()? {
943    ///         for row in &chunk {
944    ///             let id: Option<i32> = row.get(0);
945    ///             let name: Option<String> = row.get(1);
946    ///             println!("Found: {:?} - {:?}", id, name);
947    ///         }
948    ///     }
949    ///     Ok(())
950    /// }
951    /// ```
952    ///
953    /// # Multiple Parameters
954    ///
955    /// ```no_run
956    /// use hyperdb_api::{Connection, CreateMode, Result};
957    ///
958    /// fn main() -> Result<()> {
959    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
960    ///
961    ///     // Multiple parameters of different types
962    ///     let result = conn.query_params(
963    ///         "SELECT * FROM orders WHERE customer_id = $1 AND total > $2",
964    ///         &[&42i32, &100.0f64],
965    ///     )?;
966    ///     Ok(())
967    /// }
968    /// ```
969    ///
970    /// # Errors
971    ///
972    /// - Returns [`Error::FeatureNotSupported`] if the connection is using gRPC transport
973    ///   (prepared statements are TCP-only).
974    /// - Returns [`Error::Server`] if the server rejects the statement at
975    ///   `Parse`, `Bind`, or `Execute` time, including on type-mismatch
976    ///   between `params` and the inferred OIDs.
977    /// - Returns [`Error::Io`] on transport-level I/O failures.
978    pub fn query_params(
979        &self,
980        query: &str,
981        params: &[&dyn crate::params::ToSqlParam],
982    ) -> Result<Rowset<'_>> {
983        // Implementation note: routes through the extended query protocol
984        // via Parse/Bind/Execute so parameters travel in HyperBinary
985        // format — no SQL escaping, full SQL-injection safety regardless of
986        // parameter content. The statement handle is stashed inside the
987        // returned Rowset so its Drop-time close_statement fires *after*
988        // the rowset releases its connection lock (otherwise the close
989        // would deadlock on the still-held mutex).
990        let client = match &self.transport {
991            Transport::Tcp(tcp) => &tcp.client,
992            Transport::Grpc(_) => {
993                return Err(Error::feature_not_supported(
994                    "prepared statements are not supported over gRPC transport",
995                ));
996            }
997        };
998        let oids: Vec<crate::Oid> = params.iter().map(|p| p.sql_oid()).collect();
999        let stmt = client.prepare_typed(query, &oids)?;
1000        let encoded: Vec<Option<Vec<u8>>> = params.iter().map(|p| p.encode_param()).collect();
1001        let stream =
1002            client.execute_streaming(&stmt, encoded, crate::result::DEFAULT_BINARY_CHUNK_SIZE)?;
1003        Ok(Rowset::from_prepared(stream).with_statement_guard(stmt))
1004    }
1005
1006    /// Executes a parameterized command that doesn't return rows.
1007    ///
1008    /// Use this for INSERT, UPDATE, DELETE, or DDL statements with parameters.
1009    /// Returns the number of affected rows.
1010    ///
1011    /// See [`query_params`](Self::query_params) for details on parameter
1012    /// handling and SQL injection prevention.
1013    ///
1014    /// # Example
1015    ///
1016    /// ```no_run
1017    /// use hyperdb_api::{Connection, CreateMode, Result};
1018    ///
1019    /// fn delete_user(conn: &Connection, user_id: i32) -> Result<u64> {
1020    ///     // Safe from SQL injection
1021    ///     conn.command_params("DELETE FROM users WHERE id = $1", &[&user_id])
1022    /// }
1023    /// ```
1024    ///
1025    /// # Errors
1026    ///
1027    /// - Returns [`Error::FeatureNotSupported`] if the connection is using gRPC transport.
1028    /// - Returns [`Error::Server`] if the server rejects the statement at
1029    ///   `Parse`, `Bind`, or `Execute` time.
1030    /// - Returns [`Error::Io`] on transport-level I/O failures.
1031    pub fn command_params(
1032        &self,
1033        query: &str,
1034        params: &[&dyn crate::params::ToSqlParam],
1035    ) -> Result<u64> {
1036        // One-shot prepare+execute with explicit OIDs — see `query_params`
1037        // for why we collect OIDs from each parameter.
1038        let client = match &self.transport {
1039            Transport::Tcp(tcp) => &tcp.client,
1040            Transport::Grpc(_) => {
1041                return Err(Error::feature_not_supported(
1042                    "prepared statements are not supported over gRPC transport",
1043                ));
1044            }
1045        };
1046        let oids: Vec<crate::Oid> = params.iter().map(|p| p.sql_oid()).collect();
1047        let stmt = client.prepare_typed(query, &oids)?;
1048        let encoded: Vec<Option<Vec<u8>>> = params.iter().map(|p| p.encode_param()).collect();
1049        Ok(client.execute_no_result(&stmt, encoded)?)
1050    }
1051
1052    /// Executes multiple SQL statements in a single call.
1053    ///
1054    /// Each statement is executed sequentially. If any statement fails,
1055    /// execution stops and the error is returned. Returns the total number
1056    /// of affected rows across all statements.
1057    ///
1058    /// This is more efficient than calling `execute_command` in a loop
1059    /// because it reduces round-trips for DDL scripts and multi-statement setup.
1060    ///
1061    /// # Example
1062    ///
1063    /// ```no_run
1064    /// use hyperdb_api::{Connection, CreateMode, Result};
1065    ///
1066    /// fn main() -> Result<()> {
1067    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
1068    ///     let total = conn.execute_batch(&[
1069    ///         "CREATE TABLE users (id INT, name TEXT)",
1070    ///         "INSERT INTO users VALUES (1, 'Alice')",
1071    ///         "INSERT INTO users VALUES (2, 'Bob')",
1072    ///     ])?;
1073    ///     println!("Total affected: {}", total);
1074    ///     Ok(())
1075    /// }
1076    /// ```
1077    ///
1078    /// # Errors
1079    ///
1080    /// Returns a wrapped [`Error::Internal`] on the first statement that fails;
1081    /// its `source` is the original [`Error::Server`] from
1082    /// [`execute_command`](Self::execute_command). The error message
1083    /// includes the failing statement's ordinal and an 80-character preview
1084    /// of its SQL.
1085    pub fn execute_batch(&self, statements: &[&str]) -> Result<u64> {
1086        let mut total = 0u64;
1087        for (i, stmt) in statements.iter().enumerate() {
1088            if !stmt.trim().is_empty() {
1089                total += self.execute_command(stmt).map_err(|e| {
1090                    let preview: String = stmt.chars().take(80).collect();
1091                    Error::internal(format!(
1092                        "execute_batch failed at statement {} of {}: {}: {}",
1093                        i + 1,
1094                        statements.len(),
1095                        preview,
1096                        e,
1097                    ))
1098                })?;
1099            }
1100        }
1101        Ok(total)
1102    }
1103
1104    /// Returns the attached database path, if any.
1105    pub fn database(&self) -> Option<&str> {
1106        self.database.as_deref()
1107    }
1108
1109    /// Creates a new database file.
1110    ///
1111    /// # Example
1112    ///
1113    /// ```no_run
1114    /// use hyperdb_api::{Connection, Result};
1115    ///
1116    /// fn main() -> Result<()> {
1117    ///     let conn = Connection::without_database("localhost:7483")?;
1118    ///     conn.create_database("new_database.hyper")?;
1119    ///     Ok(())
1120    /// }
1121    /// ```
1122    ///
1123    /// # Errors
1124    ///
1125    /// Returns [`Error::Server`] if the server rejects the
1126    /// `CREATE DATABASE IF NOT EXISTS` statement (e.g. the path is not
1127    /// writable on the server).
1128    pub fn create_database(&self, path: &str) -> Result<()> {
1129        let sql = format!("CREATE DATABASE IF NOT EXISTS {}", escape_sql_path(path));
1130        self.execute_command(&sql)?;
1131        Ok(())
1132    }
1133
1134    /// Drops (deletes) a database file.
1135    ///
1136    /// # Example
1137    ///
1138    /// ```no_run
1139    /// use hyperdb_api::{Connection, Result};
1140    ///
1141    /// fn main() -> Result<()> {
1142    ///     let conn = Connection::without_database("localhost:7483")?;
1143    ///     conn.drop_database("old_database.hyper")?;
1144    ///     Ok(())
1145    /// }
1146    /// ```
1147    ///
1148    /// # Errors
1149    ///
1150    /// Returns [`Error::Server`] if the server rejects the
1151    /// `DROP DATABASE IF EXISTS` statement (e.g. the database is still
1152    /// attached or permissions deny deletion).
1153    pub fn drop_database(&self, path: &str) -> Result<()> {
1154        let sql = format!("DROP DATABASE IF EXISTS {}", escape_sql_path(path));
1155        self.execute_command(&sql)?;
1156        Ok(())
1157    }
1158
1159    /// Attaches a database file to the connection.
1160    ///
1161    /// Once attached, the database can be queried and modified.
1162    /// The database is identified by its alias (or by its path if no alias is provided).
1163    ///
1164    /// # Arguments
1165    ///
1166    /// * `path` - The path to the database file to attach.
1167    /// * `alias` - Optional alias for the database. If `None`, the database is
1168    ///   attached without an explicit alias (typically using its filename).
1169    ///
1170    /// # Errors
1171    ///
1172    /// Returns an error if the database file doesn't exist or if attachment fails.
1173    ///
1174    /// # Example
1175    ///
1176    /// ```no_run
1177    /// use hyperdb_api::{Connection, Result};
1178    ///
1179    /// fn main() -> Result<()> {
1180    ///     let conn = Connection::without_database("localhost:7483")?;
1181    ///
1182    ///     // Attach with an alias
1183    ///     conn.attach_database("data.hyper", Some("mydata"))?;
1184    ///
1185    ///     // Attach without an alias
1186    ///     conn.attach_database("other.hyper", None)?;
1187    ///     Ok(())
1188    /// }
1189    /// ```
1190    pub fn attach_database(&self, path: &str, alias: Option<&str>) -> Result<()> {
1191        let sql = if let Some(alias) = alias {
1192            format!(
1193                "ATTACH DATABASE {} AS {}",
1194                escape_sql_path(path),
1195                escape_sql_path(alias)
1196            )
1197        } else {
1198            format!("ATTACH DATABASE {}", escape_sql_path(path))
1199        };
1200        self.execute_command(&sql)?;
1201        Ok(())
1202    }
1203
1204    /// Detaches a database from this connection.
1205    ///
1206    /// After detaching, the database file is released and can be accessed
1207    /// externally (e.g., copied, moved, etc.). All pending updates are
1208    /// written to disk before detaching.
1209    ///
1210    /// # Arguments
1211    ///
1212    /// * `alias` - The alias of the database to detach.
1213    ///
1214    /// # Errors
1215    ///
1216    /// Returns an error if the database is not attached or if detachment fails.
1217    ///
1218    /// # Example
1219    ///
1220    /// ```no_run
1221    /// use hyperdb_api::{Connection, Result};
1222    ///
1223    /// fn main() -> Result<()> {
1224    ///     let conn = Connection::without_database("localhost:7483")?;
1225    ///     conn.attach_database("data.hyper", Some("mydata"))?;
1226    ///     // ... work with the database ...
1227    ///     conn.detach_database("mydata")?;
1228    ///     Ok(())
1229    /// }
1230    /// ```
1231    pub fn detach_database(&self, alias: &str) -> Result<()> {
1232        let sql = format!("DETACH DATABASE {}", escape_sql_path(alias));
1233        self.execute_command(&sql)?;
1234        Ok(())
1235    }
1236
1237    /// Detaches all databases from this connection.
1238    ///
1239    /// This is useful for cleanup before closing a connection or when
1240    /// you need to release all database files.
1241    ///
1242    /// # Errors
1243    ///
1244    /// Returns [`Error::Server`] if the server rejects the
1245    /// `DETACH ALL DATABASES` statement (e.g. a database is still in use by
1246    /// another session).
1247    pub fn detach_all_databases(&self) -> Result<()> {
1248        self.execute_command("DETACH ALL DATABASES")?;
1249        Ok(())
1250    }
1251
1252    /// Creates a schema in the database.
1253    ///
1254    /// # Errors
1255    ///
1256    /// - Returns an error if `schema_name` cannot be converted into a
1257    ///   [`SchemaName`](crate::SchemaName) (invalid identifier).
1258    /// - Returns [`Error::Server`] if the server rejects the
1259    ///   `CREATE SCHEMA` statement (e.g. the schema already exists).
1260    pub fn create_schema<T>(&self, schema_name: T) -> Result<()>
1261    where
1262        T: TryInto<crate::SchemaName>,
1263        crate::Error: From<T::Error>,
1264    {
1265        crate::catalog::Catalog::new(self).create_schema(schema_name)
1266    }
1267
1268    /// Checks whether a schema exists.
1269    ///
1270    /// # Arguments
1271    ///
1272    /// * `schema` - The schema name (can include database qualifier).
1273    ///
1274    /// # Example
1275    ///
1276    /// ```no_run
1277    /// use hyperdb_api::{Connection, Result};
1278    ///
1279    /// fn main() -> Result<()> {
1280    ///     let conn = Connection::without_database("localhost:7483")?;
1281    ///     if conn.has_schema("public")? {
1282    ///         println!("Schema 'public' exists");
1283    ///     }
1284    ///     Ok(())
1285    /// }
1286    /// ```
1287    ///
1288    /// # Errors
1289    ///
1290    /// - Returns an error if `schema` cannot be converted into a
1291    ///   [`SchemaName`](crate::SchemaName).
1292    /// - Returns [`Error::Server`] if the catalog lookup query fails.
1293    pub fn has_schema<T>(&self, schema: T) -> Result<bool>
1294    where
1295        T: TryInto<crate::SchemaName>,
1296        crate::Error: From<T::Error>,
1297    {
1298        use crate::catalog::Catalog;
1299        Catalog::new(self).has_schema(schema)
1300    }
1301
1302    /// Checks whether a table exists.
1303    ///
1304    /// # Arguments
1305    ///
1306    /// * `table_name` - The table name (can include database and schema qualifiers).
1307    ///
1308    /// # Example
1309    ///
1310    /// ```no_run
1311    /// use hyperdb_api::{Connection, Result};
1312    ///
1313    /// fn main() -> Result<()> {
1314    ///     let conn = Connection::without_database("localhost:7483")?;
1315    ///     if conn.has_table("public.users")? {
1316    ///         println!("Table 'users' exists");
1317    ///     }
1318    ///     Ok(())
1319    /// }
1320    /// ```
1321    ///
1322    /// # Errors
1323    ///
1324    /// - Returns an error if `table_name` cannot be converted into a
1325    ///   [`TableName`](crate::TableName).
1326    /// - Returns [`Error::Server`] if the catalog lookup query fails.
1327    pub fn has_table<T>(&self, table_name: T) -> Result<bool>
1328    where
1329        T: TryInto<crate::TableName>,
1330        crate::Error: From<T::Error>,
1331    {
1332        use crate::catalog::Catalog;
1333        Catalog::new(self).has_table(table_name)
1334    }
1335
1336    /// Returns the server version as a parsed struct.
1337    ///
1338    /// Returns `None` if the version cannot be determined (e.g., gRPC connection).
1339    ///
1340    /// # Example
1341    ///
1342    /// ```no_run
1343    /// use hyperdb_api::{Connection, CreateMode, HyperProcess, ServerVersion, Result};
1344    ///
1345    /// fn main() -> Result<()> {
1346    ///     let hyper = HyperProcess::new(None, None)?;
1347    ///     let conn = Connection::new(&hyper, "test.hyper", CreateMode::CreateIfNotExists)?;
1348    ///     if let Some(version) = conn.server_version() {
1349    ///         println!("Hyper {}", version);
1350    ///         if version >= ServerVersion::new(0, 1, 0) {
1351    ///             println!("Has feature X");
1352    ///         }
1353    ///     }
1354    ///     Ok(())
1355    /// }
1356    /// ```
1357    pub fn server_version(&self) -> Option<crate::ServerVersion> {
1358        let version_str = self.parameter_status("server_version")?;
1359        crate::ServerVersion::parse(&version_str)
1360    }
1361
1362    /// Copies a database file to a new path.
1363    ///
1364    /// The source database must be attached to this connection.
1365    ///
1366    /// # Example
1367    ///
1368    /// ```no_run
1369    /// use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
1370    ///
1371    /// fn main() -> Result<()> {
1372    ///     let hyper = HyperProcess::new(None, None)?;
1373    ///     let conn = Connection::new(&hyper, "source.hyper", CreateMode::DoNotCreate)?;
1374    ///     conn.copy_database("source.hyper", "backup.hyper")?;
1375    ///     Ok(())
1376    /// }
1377    /// ```
1378    ///
1379    /// # Errors
1380    ///
1381    /// Returns [`Error::Server`] if the server rejects the
1382    /// `COPY DATABASE` statement — e.g. the source is not attached, the
1383    /// destination path is not writable, or it already exists.
1384    pub fn copy_database(&self, source: &str, destination: &str) -> Result<()> {
1385        let sql = format!(
1386            "COPY DATABASE {} TO {}",
1387            escape_sql_path(source),
1388            escape_sql_path(destination)
1389        );
1390        self.execute_command(&sql)?;
1391        Ok(())
1392    }
1393
1394    /// Executes EXPLAIN on a query and returns the plan as a string.
1395    ///
1396    /// # Example
1397    ///
1398    /// ```no_run
1399    /// use hyperdb_api::{Connection, CreateMode, Result};
1400    ///
1401    /// fn main() -> Result<()> {
1402    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
1403    ///     let plan = conn.explain("SELECT * FROM users WHERE id = 1")?;
1404    ///     println!("{}", plan);
1405    ///     Ok(())
1406    /// }
1407    /// ```
1408    ///
1409    /// # Errors
1410    ///
1411    /// Returns [`Error::Server`] if `EXPLAIN <query>` fails to parse or
1412    /// plan, or if the streamed result cannot be consumed.
1413    pub fn explain(&self, query: &str) -> Result<String> {
1414        let explain_sql = format!("EXPLAIN {query}");
1415        let result = self.execute_query(&explain_sql)?;
1416        let mut lines = Vec::new();
1417        for row in result.rows() {
1418            let row = row?;
1419            if let Some(line) = row.get::<String>(0) {
1420                lines.push(line);
1421            }
1422        }
1423        Ok(lines.join("\n"))
1424    }
1425
1426    /// Executes EXPLAIN ANALYZE on a query and returns the plan with timing info.
1427    ///
1428    /// **Note:** This actually executes the query to collect timing information.
1429    ///
1430    /// # Errors
1431    ///
1432    /// Returns [`Error::Server`] if `EXPLAIN ANALYZE <query>` fails — this
1433    /// includes any runtime error raised by actually executing `query`.
1434    pub fn explain_analyze(&self, query: &str) -> Result<String> {
1435        let explain_sql = format!("EXPLAIN ANALYZE {query}");
1436        let result = self.execute_query(&explain_sql)?;
1437        let mut lines = Vec::new();
1438        for row in result.rows() {
1439            let row = row?;
1440            if let Some(line) = row.get::<String>(0) {
1441                lines.push(line);
1442            }
1443        }
1444        Ok(lines.join("\n"))
1445    }
1446
1447    /// Returns a reference to the underlying TCP client.
1448    ///
1449    /// # Panics
1450    ///
1451    /// This method returns `None` if the connection is using gRPC transport.
1452    pub fn tcp_client(&self) -> Option<&Client> {
1453        match &self.transport {
1454            Transport::Tcp(tcp) => Some(&tcp.client),
1455            Transport::Grpc(_) => None,
1456        }
1457    }
1458
1459    /// Crate-internal accessor for the transport. Used by
1460    /// [`PreparedStatement`](crate::PreparedStatement) to reach the
1461    /// underlying `hyperdb_api_core::client::Client`.
1462    pub(crate) fn transport(&self) -> &Transport {
1463        &self.transport
1464    }
1465
1466    /// Prepares a SQL statement with automatic parameter type inference.
1467    ///
1468    /// The returned [`PreparedStatement`](crate::PreparedStatement) can
1469    /// be executed many times with different parameter values; the
1470    /// server caches the parsed plan. This is the preferred way to
1471    /// execute a statement repeatedly inside a loop.
1472    ///
1473    /// For explicit parameter types (necessary when `$N` placeholders
1474    /// would otherwise be ambiguous), use
1475    /// [`prepare_typed`](Self::prepare_typed).
1476    ///
1477    /// # Example
1478    ///
1479    /// ```no_run
1480    /// # use hyperdb_api::{Connection, CreateMode, Result};
1481    /// # fn example(conn: &Connection) -> Result<()> {
1482    /// let stmt = conn.prepare("SELECT name FROM users WHERE id = $1")?;
1483    /// for id in [1_i32, 2, 3] {
1484    ///     let name: String = stmt.fetch_scalar(&[&id])?;
1485    ///     println!("{id}: {name}");
1486    /// }
1487    /// # Ok(())
1488    /// # }
1489    /// ```
1490    ///
1491    /// # Errors
1492    ///
1493    /// See [`prepare_typed`](Self::prepare_typed) — this method delegates
1494    /// to it with an empty OID list.
1495    pub fn prepare(&self, query: &str) -> Result<crate::PreparedStatement<'_>> {
1496        self.prepare_typed(query, &[])
1497    }
1498
1499    /// Prepares a SQL statement with explicit parameter type OIDs.
1500    ///
1501    /// Use this when the server cannot infer parameter types from the
1502    /// SQL alone (e.g. a bare `$1` in a `WHERE v > $1` clause with no
1503    /// other context). Constants for common types live in
1504    /// [`hyperdb_api_core::types::oids`].
1505    ///
1506    /// # Errors
1507    ///
1508    /// - Returns [`Error::FeatureNotSupported`] if the connection is using gRPC transport
1509    ///   (prepared statements are TCP-only).
1510    /// - Returns [`Error::Server`] if the server rejects the `Parse`
1511    ///   message, e.g. SQL syntax error or unknown OID.
1512    /// - Returns [`Error::Io`] on transport-level I/O failures.
1513    pub fn prepare_typed(
1514        &self,
1515        query: &str,
1516        param_types: &[crate::Oid],
1517    ) -> Result<crate::PreparedStatement<'_>> {
1518        let client = match &self.transport {
1519            Transport::Tcp(tcp) => &tcp.client,
1520            Transport::Grpc(_) => {
1521                return Err(Error::feature_not_supported(
1522                    "prepared statements are not supported over gRPC transport",
1523                ));
1524            }
1525        };
1526        let inner = client.prepare_typed(query, param_types)?;
1527        crate::PreparedStatement::new(self, inner)
1528    }
1529
1530    /// Returns true if the connection is alive (passive check).
1531    ///
1532    /// This is a lightweight check that does not send any data to the server.
1533    /// For an active health check, use [`ping`](Self::ping).
1534    pub fn is_alive(&self) -> bool {
1535        match &self.transport {
1536            Transport::Tcp(tcp) => tcp.client.is_alive(),
1537            Transport::Grpc(_) => true, // gRPC connections are stateless
1538        }
1539    }
1540
1541    /// Actively checks that the connection is healthy by executing a trivial query.
1542    ///
1543    /// Unlike [`is_alive`](Self::is_alive) which only checks local state,
1544    /// this method sends `SELECT 1` to the server and verifies a response.
1545    ///
1546    /// # Example
1547    ///
1548    /// ```no_run
1549    /// # use hyperdb_api::{Connection, CreateMode, Result};
1550    /// # fn example(conn: &Connection) -> Result<()> {
1551    /// if conn.ping().is_ok() {
1552    ///     println!("Connection is healthy");
1553    /// }
1554    /// # Ok(())
1555    /// # }
1556    /// ```
1557    ///
1558    /// # Errors
1559    ///
1560    /// Returns [`Error::Server`] or [`Error::Io`] if the `SELECT 1`
1561    /// round-trip fails — i.e. the connection is no longer usable.
1562    pub fn ping(&self) -> Result<()> {
1563        self.execute_command("SELECT 1")?;
1564        Ok(())
1565    }
1566
1567    /// Returns the process ID of the backend server connection.
1568    ///
1569    /// Returns 0 for gRPC connections (not applicable).
1570    pub fn process_id(&self) -> i32 {
1571        match &self.transport {
1572            Transport::Tcp(tcp) => tcp.client.process_id(),
1573            Transport::Grpc(_) => 0,
1574        }
1575    }
1576
1577    /// Returns the secret key for the backend server connection.
1578    ///
1579    /// This is used for cancellation requests.
1580    /// Returns 0 for gRPC connections (not applicable).
1581    pub fn secret_key(&self) -> i32 {
1582        match &self.transport {
1583            Transport::Tcp(tcp) => tcp.client.secret_key(),
1584            Transport::Grpc(_) => 0,
1585        }
1586    }
1587
1588    /// Returns a server parameter value by name.
1589    ///
1590    /// Server parameters are sent by the server during connection startup.
1591    /// Common parameters include:
1592    /// - `server_version` - The server version string
1593    /// - `server_encoding` - The server's character encoding
1594    /// - `client_encoding` - The client's character encoding
1595    /// - `DateStyle` - Date display format
1596    /// - `TimeZone` - Server timezone
1597    /// - `session_identifier` - Session ID for connection migration (if routing enabled)
1598    ///
1599    /// Returns `None` if the parameter is not known.
1600    ///
1601    /// # Example
1602    ///
1603    /// ```no_run
1604    /// use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
1605    ///
1606    /// fn main() -> Result<()> {
1607    ///     let hyper = HyperProcess::new(None, None)?;
1608    ///     let conn = Connection::new(&hyper, "test.hyper", CreateMode::CreateIfNotExists)?;
1609    ///
1610    ///     if let Some(version) = conn.parameter_status("server_version") {
1611    ///         println!("Connected to Hyper version: {}", version);
1612    ///     }
1613    ///     Ok(())
1614    /// }
1615    /// ```
1616    pub fn parameter_status(&self, name: &str) -> Option<String> {
1617        match &self.transport {
1618            Transport::Tcp(tcp) => tcp.client.parameter_status(name),
1619            Transport::Grpc(_) => None, // gRPC doesn't have server parameters
1620        }
1621    }
1622
1623    /// Sets the notice receiver for this connection.
1624    ///
1625    /// Server notices and warnings are passed to this callback instead of being
1626    /// logged. Pass `None` to restore default logging behavior.
1627    pub fn set_notice_receiver(
1628        &mut self,
1629        receiver: Option<hyperdb_api_core::client::NoticeReceiver>,
1630    ) {
1631        match &mut self.transport {
1632            Transport::Tcp(tcp) => tcp.client.set_notice_receiver(receiver),
1633            Transport::Grpc(_) => {} // gRPC doesn't support notice receivers
1634        }
1635    }
1636
1637    /// Cancels the currently executing query (thread-safe).
1638    ///
1639    /// # Errors
1640    ///
1641    /// - Returns [`Error::FeatureNotSupported`] on gRPC connections — cancellation is not
1642    ///   yet implemented for gRPC transport.
1643    /// - Returns [`Error::Connection`] or [`Error::Io`] if the separate
1644    ///   cancel-request connection to the server fails.
1645    pub fn cancel(&self) -> Result<()> {
1646        match &self.transport {
1647            Transport::Tcp(tcp) => tcp.client.cancel().map_err(Error::from),
1648            Transport::Grpc(_) => Err(Error::feature_not_supported(
1649                "Query cancellation is not yet supported for gRPC connections.",
1650            )),
1651        }
1652    }
1653
1654    /// Closes the connection, detaching all databases first.
1655    ///
1656    /// # Errors
1657    ///
1658    /// - Returns [`Error::Internal`] wrapping the underlying close failure
1659    ///   (its `source` is the transport error) if the client cannot be
1660    ///   shut down cleanly.
1661    /// - Returns [`Error::Internal`] wrapping the detach failure if the
1662    ///   attached database could not be detached but close itself
1663    ///   succeeded.
1664    pub fn close(self) -> Result<()> {
1665        // Detach the attached database to ensure files are flushed and released.
1666        // Always attempt close, even if detach fails.
1667        let detach_err = if let Some(ref db_path) = self.database {
1668            let db_alias = std::path::Path::new(db_path)
1669                .file_stem()
1670                .and_then(|s| s.to_str())
1671                .unwrap_or("db");
1672            self.execute_command(&format!("DETACH DATABASE {}", escape_sql_path(db_alias)))
1673                .err()
1674        } else {
1675            None
1676        };
1677
1678        // Always attempt to close the client to release the connection.
1679        let close_result = match self.transport {
1680            Transport::Tcp(tcp) => tcp.client.close(),
1681            Transport::Grpc(_) => Ok(()), // gRPC connections are stateless
1682        };
1683
1684        if let Err(e) = close_result {
1685            return Err(Error::internal(format!("Failed to close connection: {e}")));
1686        }
1687
1688        if let Some(e) = detach_err {
1689            // Detach failed but close succeeded; surface the detach error.
1690            return Err(Error::internal(format!(
1691                "Failed to detach database during close: {e}"
1692            )));
1693        }
1694
1695        Ok(())
1696    }
1697
1698    /// Unloads the database from memory while keeping the connection active.
1699    ///
1700    /// This executes the `UNLOAD DATABASE` command, which releases the database
1701    /// from memory but keeps the session and connection open. The database can
1702    /// be accessed again by subsequent queries that will automatically reload it.
1703    ///
1704    /// This is useful for releasing memory locks when switching between databases
1705    /// or when working with multiple database files.
1706    ///
1707    /// # Example
1708    ///
1709    /// ```no_run
1710    /// use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
1711    ///
1712    /// fn main() -> Result<()> {
1713    ///     let hyper = HyperProcess::new(None, None)?;
1714    ///     let conn = Connection::new(&hyper, "test.hyper", CreateMode::Create)?;
1715    ///     
1716    ///     // Do some work with the database
1717    ///     conn.execute_command("CREATE TABLE test (id INT)")?;
1718    ///     
1719    ///     // Unload from memory (but keep connection)
1720    ///     conn.unload_database()?;
1721    ///     
1722    ///     // Database can still be accessed (will be reloaded automatically)
1723    ///     let count: i64 = conn.fetch_scalar("SELECT COUNT(*) FROM test")?;
1724    ///     println!("Count: {}", count);
1725    ///     
1726    ///     Ok(())
1727    /// }
1728    /// ```
1729    ///
1730    /// # Errors
1731    ///
1732    /// Returns [`Error::Server`] if the server rejects the `UNLOAD DATABASE`
1733    /// command (e.g. the database is still in use by another session).
1734    pub fn unload_database(&self) -> Result<()> {
1735        self.execute_command("UNLOAD DATABASE")?;
1736        Ok(())
1737    }
1738
1739    /// Releases the database completely from the session.
1740    ///
1741    /// This executes the `UNLOAD RELEASE` command, which completely releases
1742    /// the database from the session. After this call, the database cannot
1743    /// be accessed until a new connection is established.
1744    ///
1745    /// This is useful for completely freeing database resources when you're
1746    /// done with a database and want to ensure no locks are held.
1747    ///
1748    /// **Note:** This should only be used when the session has exactly one
1749    /// database attached. Hyper does not support `UNLOAD RELEASE` with
1750    /// multiple databases attached to the same session.
1751    ///
1752    /// # Example
1753    ///
1754    /// ```no_run
1755    /// use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
1756    ///
1757    /// fn main() -> Result<()> {
1758    ///     let hyper = HyperProcess::new(None, None)?;
1759    ///     let conn = Connection::new(&hyper, "test.hyper", CreateMode::Create)?;
1760    ///     
1761    ///     // Do some work with the database
1762    ///     conn.execute_command("CREATE TABLE test (id INT)")?;
1763    ///     
1764    ///     // Release database completely from session
1765    ///     conn.unload_release()?;
1766    ///     
1767    ///     // Database cannot be accessed after this point without new connection
1768    ///     // conn.execute_command("SELECT * FROM test")?; // This would fail
1769    ///     
1770    ///     Ok(())
1771    /// }
1772    /// ```
1773    ///
1774    /// # Errors
1775    ///
1776    /// Returns [`Error::Server`] if the server rejects `UNLOAD RELEASE`, most
1777    /// commonly because multiple databases are attached to the same session
1778    /// (Hyper only supports `UNLOAD RELEASE` with exactly one attached DB).
1779    pub fn unload_release(&self) -> Result<()> {
1780        self.execute_command("UNLOAD RELEASE")?;
1781        Ok(())
1782    }
1783
1784    // =========================================================================
1785    // Query Statistics
1786    // =========================================================================
1787
1788    /// Enables query statistics collection for this connection.
1789    ///
1790    /// After enabling, each `execute_command()` or `execute_query()` call will
1791    /// capture detailed performance metrics from Hyper. Retrieve them via
1792    /// [`last_query_stats()`](Self::last_query_stats).
1793    ///
1794    /// The provider determines how stats are collected. Use
1795    /// [`LogFileStatsProvider`](crate::LogFileStatsProvider) to parse Hyper's log file (requires local
1796    /// `hyperd.log`), or implement a custom [`QueryStatsProvider`](crate::QueryStatsProvider).
1797    ///
1798    /// # Example
1799    ///
1800    /// ```no_run
1801    /// # use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
1802    /// # fn main() -> Result<()> {
1803    /// # let hyper = HyperProcess::new(None, None)?;
1804    /// # let mut conn = Connection::new(&hyper, "test.hyper", CreateMode::CreateIfNotExists)?;
1805    /// use hyperdb_api::LogFileStatsProvider;
1806    ///
1807    /// // Auto-detect log path from HyperProcess
1808    /// conn.enable_query_stats(LogFileStatsProvider::from_process(&hyper));
1809    ///
1810    /// // Or specify an explicit log path
1811    /// // conn.enable_query_stats(LogFileStatsProvider::new("/path/to/hyperd.log"));
1812    /// # Ok(())
1813    /// # }
1814    /// ```
1815    pub fn enable_query_stats(&mut self, provider: impl QueryStatsProvider + 'static) {
1816        self.stats_provider = Some(Arc::new(provider));
1817    }
1818
1819    /// Disables query statistics collection.
1820    ///
1821    /// After calling this, `last_query_stats()` will return `None`.
1822    pub fn disable_query_stats(&mut self) {
1823        self.stats_provider = None;
1824        if let Ok(mut guard) = self.pending_stats.lock() {
1825            *guard = None;
1826        }
1827    }
1828
1829    /// Returns the query statistics from the most recent query execution.
1830    ///
1831    /// Stats are resolved **lazily** — the log file is read when this method
1832    /// is called, not when the query executes. This is important for streaming
1833    /// queries (`execute_query`), where Hyper writes the execution stats only
1834    /// after the result set is fully consumed.
1835    ///
1836    /// **Call this after consuming the result set** (e.g., after `collect_rows()`,
1837    /// iterating all chunks, or dropping the `Rowset`).
1838    ///
1839    /// Returns `None` if:
1840    /// - Query stats collection is not enabled
1841    /// - No query has been executed yet
1842    /// - Stats could not be found for the last query (e.g., log entry not matched)
1843    ///
1844    /// # Example
1845    ///
1846    /// ```no_run
1847    /// # use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
1848    /// # fn main() -> Result<()> {
1849    /// # let hyper = HyperProcess::new(None, None)?;
1850    /// # let mut conn = Connection::new(&hyper, "test.hyper", CreateMode::CreateIfNotExists)?;
1851    /// # use hyperdb_api::LogFileStatsProvider;
1852    /// # conn.enable_query_stats(LogFileStatsProvider::from_process(&hyper));
1853    /// conn.execute_command("CREATE TABLE t (id INT)")?;
1854    ///
1855    /// if let Some(stats) = conn.last_query_stats() {
1856    ///     println!("Total: {}s", stats.elapsed_s);
1857    ///     if let Some(ref pre) = stats.pre_execution {
1858    ///         println!("  Parse: {:?}s", pre.parsing_time_s);
1859    ///         println!("  Compile: {:?}s", pre.compilation_time_s);
1860    ///     }
1861    ///     if let Some(ref exec) = stats.execution {
1862    ///         println!("  Execute: {:?}s", exec.elapsed_s);
1863    ///         println!("  Peak mem: {:?} MB", exec.peak_memory_mb);
1864    ///     }
1865    /// }
1866    /// # Ok(())
1867    /// # }
1868    /// ```
1869    pub fn last_query_stats(&self) -> Option<QueryStats> {
1870        let provider = self.stats_provider.as_ref()?;
1871        let mut guard = self.pending_stats.lock().ok()?;
1872        let (token, sql) = guard.take()?;
1873        provider.after_query(token, &sql)
1874    }
1875
1876    /// Internal: call provider's `before_query` if stats are enabled.
1877    fn stats_before_query(&self, sql: &str) -> Option<Box<dyn Any + Send>> {
1878        self.stats_provider.as_ref().map(|p| p.before_query(sql))
1879    }
1880
1881    /// Internal: store the pending token+sql for lazy resolution.
1882    fn stats_store_pending(&self, token: Option<Box<dyn Any + Send>>, sql: &str) {
1883        if let Some(token) = token {
1884            if let Ok(mut guard) = self.pending_stats.lock() {
1885                *guard = Some((token, sql.to_string()));
1886            }
1887        }
1888    }
1889}
1890
1891impl Connection {
1892    // =========================================================================
1893    // Transaction Control
1894    // =========================================================================
1895
1896    // -------------------------------------------------------------------
1897    // Raw transaction control (internal)
1898    // -------------------------------------------------------------------
1899    //
1900    // The `*_raw` methods below are `pub(crate)` and form the canonical
1901    // implementation of session-level transaction control. The RAII
1902    // guard at `crate::Transaction` and any internal helper that
1903    // genuinely needs `&self` (rather than the guard's `&mut self`)
1904    // delegate to these.
1905    //
1906    // The matching `pub` methods (`begin_transaction`, `commit`,
1907    // `rollback`) are thin `#[doc(hidden)] #[deprecated]` wrappers
1908    // retained only so any pre-existing downstream caller sees a
1909    // compiler warning rather than a hard break. They will be deleted
1910    // in a future release; the `_raw` methods stay.
1911
1912    /// Issues `BEGIN TRANSACTION`. Crate-internal use only.
1913    pub(crate) fn begin_transaction_raw(&self) -> Result<()> {
1914        self.execute_command("BEGIN TRANSACTION")?;
1915        Ok(())
1916    }
1917
1918    /// Issues `COMMIT`. Crate-internal use only.
1919    pub(crate) fn commit_raw(&self) -> Result<()> {
1920        self.execute_command("COMMIT")?;
1921        Ok(())
1922    }
1923
1924    /// Issues `ROLLBACK`. Crate-internal use only.
1925    pub(crate) fn rollback_raw(&self) -> Result<()> {
1926        self.execute_command("ROLLBACK")?;
1927        Ok(())
1928    }
1929
1930    /// Begins an explicit transaction.
1931    ///
1932    /// **Prefer [`transaction()`](Self::transaction)** — the RAII guard
1933    /// auto-rolls back on drop and cannot leak a half-open transaction
1934    /// across error paths. This method is hidden from generated
1935    /// rustdoc and marked deprecated; it will be removed in a future
1936    /// release.
1937    ///
1938    /// # Errors
1939    ///
1940    /// Returns [`Error::Server`] if the server rejects `BEGIN TRANSACTION`
1941    /// (e.g. a transaction is already open on this session).
1942    #[doc(hidden)]
1943    #[deprecated(
1944        note = "Use `Connection::transaction()` for an RAII guard. This method will be removed \
1945                in a future release."
1946    )]
1947    pub fn begin_transaction(&self) -> Result<()> {
1948        self.begin_transaction_raw()
1949    }
1950
1951    /// Commits the current transaction.
1952    ///
1953    /// **Prefer [`Transaction::commit`](crate::Transaction::commit)** on
1954    /// the RAII guard returned by [`transaction()`](Self::transaction).
1955    /// Hidden from generated rustdoc and deprecated; slated for removal.
1956    ///
1957    /// # Errors
1958    ///
1959    /// Returns [`Error::Server`] if the server rejects `COMMIT`.
1960    #[doc(hidden)]
1961    #[deprecated(
1962        note = "Use `Transaction::commit()` on the RAII guard from `Connection::transaction()`. \
1963                This method will be removed in a future release."
1964    )]
1965    pub fn commit(&self) -> Result<()> {
1966        self.commit_raw()
1967    }
1968
1969    /// Rolls back the current transaction.
1970    ///
1971    /// **Prefer [`Transaction::rollback`](crate::Transaction::rollback)**
1972    /// on the RAII guard returned by [`transaction()`](Self::transaction).
1973    /// Hidden from generated rustdoc and deprecated; slated for removal.
1974    ///
1975    /// # Errors
1976    ///
1977    /// Returns [`Error::Server`] if the server rejects `ROLLBACK`.
1978    #[doc(hidden)]
1979    #[deprecated(
1980        note = "Use `Transaction::rollback()` on the RAII guard from `Connection::transaction()`. \
1981                This method will be removed in a future release."
1982    )]
1983    pub fn rollback(&self) -> Result<()> {
1984        self.rollback_raw()
1985    }
1986
1987    /// Starts a transaction and returns an RAII guard that auto-rolls back on drop.
1988    ///
1989    /// The returned [`Transaction`](crate::Transaction) exclusively borrows this connection,
1990    /// preventing any other use of the connection while the transaction is active.
1991    /// This is enforced at compile time by Rust's borrow checker. The guard provides
1992    /// `commit()` and `rollback()` methods. If dropped without calling either, the
1993    /// transaction is automatically rolled back.
1994    ///
1995    /// # Example
1996    ///
1997    /// ```no_run
1998    /// # use hyperdb_api::{Connection, CreateMode, Result};
1999    /// # fn main() -> Result<()> {
2000    /// # let mut conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
2001    /// let txn = conn.transaction()?;
2002    /// txn.execute_command("INSERT INTO users VALUES (1, 'Alice')")?;
2003    /// txn.commit()?; // or drop `txn` to auto-rollback
2004    /// # Ok(())
2005    /// # }
2006    /// ```
2007    ///
2008    /// # Errors
2009    ///
2010    /// Returns [`Error::Server`] if the server rejects the `BEGIN`
2011    /// statement issued internally by
2012    /// [`Transaction::new`](crate::Transaction).
2013    pub fn transaction(&mut self) -> Result<crate::Transaction<'_>> {
2014        crate::Transaction::new(self)
2015    }
2016}
2017
2018/// Checks if an error indicates an "already exists" condition based on SQLSTATE codes.
2019///
2020/// This function uses `PostgreSQL` SQLSTATE codes to reliably detect duplicate object errors
2021/// regardless of server locale or message formatting. The codes checked are:
2022/// - `42P04`: Database already exists
2023/// - `42710`: Duplicate object
2024/// - `42P06`: Duplicate schema
2025/// - `42P07`: Duplicate table
2026///
2027/// See: <https://www.postgresql.org/docs/current/errcodes-appendix.html>
2028fn is_already_exists_error(err: &Error) -> bool {
2029    err.sqlstate()
2030        .is_some_and(|code| matches!(code, "42P04" | "42710" | "42P06" | "42P07"))
2031}