Skip to main content

hyperdb_api/
connection.rs

1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Database connection management.
5//!
6//! The [`Connection`] type provides a unified interface for connecting to Hyper
7//! databases via either TCP (`PostgreSQL` wire protocol) or gRPC transport.
8//! The transport is automatically detected from the endpoint URL:
9//!
10//! - `https://` or `http://` → gRPC transport
11//! - Otherwise → TCP transport (e.g., `localhost:7483`)
12
13use std::path::Path;
14
15use hyperdb_api_core::client::Client;
16
17use crate::error::{Error, Result};
18use crate::names::escape_sql_path;
19use crate::process::HyperProcess;
20use crate::result::{Row, Rowset, DEFAULT_BINARY_CHUNK_SIZE};
21use crate::transport::Transport;
22
23use std::any::Any;
24use std::sync::{Arc, Mutex};
25
26use crate::query_stats::{QueryStats, QueryStatsProvider};
27
28/// Trait for types that can be extracted from a scalar query result.
29///
30/// This trait enables the generic [`Connection::execute_scalar_query`] method,
31/// similar to C++'s `executeScalarQuery<T>()` template.
32///
33/// # Implementing Custom Types
34///
35/// You can implement this trait for custom types to use them with `execute_scalar_query`:
36///
37/// ```no_run
38/// # use hyperdb_api::{Row, ScalarValue};
39/// # struct MyType;
40/// # impl MyType { fn parse(s: &str) -> Self { MyType } }
41/// impl ScalarValue for MyType {
42///     fn from_row(row: &Row, col: usize) -> Option<Self> {
43///         row.get_string(col).map(|s| MyType::parse(&s))
44///     }
45/// }
46/// ```
47pub trait ScalarValue: Sized {
48    /// Extracts a value of this type from a row at the given column.
49    fn from_row(row: &Row, col: usize) -> Option<Self>;
50}
51
52impl ScalarValue for i64 {
53    fn from_row(row: &Row, col: usize) -> Option<Self> {
54        row.get_i64(col)
55    }
56}
57
58impl ScalarValue for i32 {
59    fn from_row(row: &Row, col: usize) -> Option<Self> {
60        row.get_i32(col)
61    }
62}
63
64impl ScalarValue for i16 {
65    fn from_row(row: &Row, col: usize) -> Option<Self> {
66        row.get_i16(col)
67    }
68}
69
70impl ScalarValue for f64 {
71    fn from_row(row: &Row, col: usize) -> Option<Self> {
72        row.get_f64(col)
73    }
74}
75
76impl ScalarValue for bool {
77    fn from_row(row: &Row, col: usize) -> Option<Self> {
78        row.get_bool(col)
79    }
80}
81
82impl ScalarValue for String {
83    fn from_row(row: &Row, col: usize) -> Option<Self> {
84        row.get_string(col)
85    }
86}
87
88/// Database creation mode when connecting.
89#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
90pub enum CreateMode {
91    /// Do not create the database. Method will fail if database doesn't exist.
92    #[default]
93    DoNotCreate,
94    /// Create the database. Method will fail if the database already exists.
95    Create,
96    /// Create the database if it doesn't exist.
97    CreateIfNotExists,
98    /// Create the database. If it already exists, drop the old one first.
99    CreateAndReplace,
100}
101
102/// A connection to a Hyper database.
103///
104/// This struct represents an active connection to a Hyper server and optionally
105/// an attached database. The connection is automatically closed when dropped.
106///
107/// # Transport Auto-Detection
108///
109/// The transport is automatically detected from the endpoint URL:
110/// - `https://` or `http://` → gRPC transport (read-only until server supports writes)
111/// - Otherwise → TCP transport (full read/write support)
112///
113/// # CSV / Text Import & Export
114///
115/// For CSV, TSV, and other delimited-text formats, see the [`copy`](crate::copy)
116/// module which provides [`export_csv()`](Self::export_csv),
117/// [`import_csv()`](Self::import_csv), and related methods on this struct.
118///
119/// # Example
120///
121/// ```no_run
122/// use hyperdb_api::{Connection, CreateMode, Result};
123///
124/// fn main() -> Result<()> {
125///     // TCP connection (full read/write)
126///     let conn = Connection::connect("localhost:7483", "example.hyper", CreateMode::CreateIfNotExists)?;
127///
128///     // Execute SQL commands
129///     conn.execute_command("CREATE TABLE test (id INT, name TEXT)")?;
130///     conn.execute_command("INSERT INTO test VALUES (1, 'Hello')")?;
131///
132///     Ok(())
133/// }
134/// ```
135///
136/// ```no_run
137/// # use hyperdb_api::{Connection, CreateMode, Result};
138/// # fn example() -> Result<()> {
139/// // gRPC connection (read-only, auto-detected from URL)
140/// let conn = Connection::connect(
141///     "https://hyper-server.example.com:443",
142///     "example.hyper",
143///     CreateMode::DoNotCreate,  // Must be DoNotCreate for gRPC
144/// )?;
145/// # Ok(())
146/// # }
147/// ```
148pub struct Connection {
149    transport: Transport,
150    database: Option<String>,
151    stats_provider: Option<Arc<dyn QueryStatsProvider>>,
152    /// Pending stats token + SQL from the most recent query, resolved lazily.
153    pending_stats: Mutex<Option<(Box<dyn Any + Send>, String)>>,
154}
155
156impl std::fmt::Debug for Connection {
157    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158        f.debug_struct("Connection")
159            .field("database", &self.database)
160            .finish_non_exhaustive()
161    }
162}
163
164impl Connection {
165    /// Creates a new connection to a Hyper instance with a database.
166    ///
167    /// This is the primary way to connect to a running [`HyperProcess`].
168    ///
169    /// # Arguments
170    ///
171    /// * `instance` - The Hyper server instance to connect to.
172    /// * `database_path` - Path to the database file.
173    /// * `create_mode` - How to handle database creation.
174    ///
175    /// # Errors
176    ///
177    /// Returns an error if the connection could not be established.
178    ///
179    /// # Example
180    ///
181    /// ```no_run
182    /// use hyperdb_api::{HyperProcess, Connection, CreateMode, Result};
183    ///
184    /// fn main() -> Result<()> {
185    ///     let hyper = HyperProcess::new(None, None)?;
186    ///     let conn = Connection::new(&hyper, "database.hyper", CreateMode::CreateIfNotExists)?;
187    ///     Ok(())
188    /// }
189    /// ```
190    pub fn new(
191        instance: &HyperProcess,
192        database_path: impl AsRef<Path>,
193        create_mode: CreateMode,
194    ) -> Result<Self> {
195        // Prefer using the connection_endpoint which properly handles UDS/Named Pipes
196        if let Some(conn_endpoint) = instance.connection_endpoint() {
197            return Self::connect_with_endpoint(
198                conn_endpoint,
199                &database_path.as_ref().to_string_lossy(),
200                create_mode,
201            );
202        }
203
204        // Fall back to string endpoint (TCP)
205        let endpoint = instance.require_endpoint()?;
206        Self::connect(
207            endpoint,
208            &database_path.as_ref().to_string_lossy(),
209            create_mode,
210        )
211    }
212
213    /// Connects using a `ConnectionEndpoint` (supports TCP, UDS, and Named Pipes).
214    fn connect_with_endpoint(
215        endpoint: &hyperdb_api_core::client::ConnectionEndpoint,
216        database_path: &str,
217        create_mode: CreateMode,
218    ) -> Result<Self> {
219        let db_path_str = Some(database_path.to_string());
220
221        let config = hyperdb_api_core::client::Config::new().with_user("tableau_internal_user");
222
223        let client = hyperdb_api_core::client::Client::connect_endpoint(endpoint, &config)?;
224
225        let conn = Connection::from_client(client, db_path_str.clone());
226
227        // Handle database creation
228        if let Some(db_path) = db_path_str {
229            conn.handle_creation_mode(&db_path, create_mode)?;
230            conn.attach_and_set_path(&db_path)?;
231        }
232
233        Ok(conn)
234    }
235
236    /// Connects to a Hyper server and optionally attaches a database.
237    ///
238    /// # Arguments
239    ///
240    /// * `endpoint` - The server endpoint (host:port).
241    /// * `database_path` - Path to the database file.
242    /// * `create_mode` - How to handle database creation.
243    ///
244    /// # Errors
245    ///
246    /// Returns an error if the connection could not be established.
247    pub fn connect(endpoint: &str, database_path: &str, create_mode: CreateMode) -> Result<Self> {
248        crate::ConnectionBuilder::new(endpoint)
249            .database(database_path)
250            .create_mode(create_mode)
251            .build()
252    }
253
254    /// Returns a connection builder for advanced configuration.
255    ///
256    /// This is useful when you need to set authentication, timeouts, or
257    /// other advanced options before connecting.
258    #[must_use]
259    pub fn builder(endpoint: &str) -> crate::ConnectionBuilder {
260        crate::ConnectionBuilder::new(endpoint)
261    }
262
263    /// Creates a Connection from a low-level Client (internal use, TCP only).
264    pub(crate) fn from_client(client: Client, database: Option<String>) -> Self {
265        Connection {
266            transport: Transport::Tcp(Box::new(crate::transport::TcpTransport { client })),
267            database,
268            stats_provider: None,
269            pending_stats: Mutex::new(None),
270        }
271    }
272
273    /// Creates a Connection from a Transport (internal use).
274    #[allow(
275        dead_code,
276        reason = "used by ConnectionBuilder for the gRPC path; not reached under non-gRPC feature builds"
277    )]
278    pub(crate) fn from_transport(transport: Transport, database: Option<String>) -> Self {
279        Connection {
280            transport,
281            database,
282            stats_provider: None,
283            pending_stats: Mutex::new(None),
284        }
285    }
286
287    /// Returns the transport type name (e.g., "TCP", "gRPC", "Unix Socket").
288    pub fn transport_type(&self) -> &'static str {
289        self.transport.transport_type().as_str()
290    }
291
292    /// Returns true if this connection supports write operations.
293    ///
294    /// Currently, only TCP connections support writes. gRPC connections are
295    /// read-only until the server supports write operations over gRPC.
296    pub fn supports_writes(&self) -> bool {
297        self.transport.supports_writes()
298    }
299
300    /// Handles database creation logic (internal use).
301    pub(crate) fn handle_creation_mode(
302        &self,
303        database_path: &str,
304        create_mode: CreateMode,
305    ) -> Result<()> {
306        match create_mode {
307            CreateMode::DoNotCreate => {}
308            CreateMode::Create => {
309                self.execute_command(&format!(
310                    "CREATE DATABASE {}",
311                    escape_sql_path(database_path)
312                ))?;
313            }
314            CreateMode::CreateIfNotExists => {
315                if let Err(e) = self.execute_command(&format!(
316                    "CREATE DATABASE IF NOT EXISTS {}",
317                    escape_sql_path(database_path)
318                )) {
319                    if !is_already_exists_error(&e) {
320                        return Err(Error::with_cause(
321                            format!("Failed to create database '{database_path}': {e}"),
322                            e,
323                        ));
324                    }
325                }
326            }
327            CreateMode::CreateAndReplace => {
328                let _ = self.execute_command(&format!(
329                    "DROP DATABASE IF EXISTS {}",
330                    escape_sql_path(database_path)
331                ));
332                self.execute_command(&format!(
333                    "CREATE DATABASE {}",
334                    escape_sql_path(database_path)
335                ))?;
336            }
337        }
338        Ok(())
339    }
340
341    /// Attaches and sets the database path (internal use).
342    pub(crate) fn attach_and_set_path(&self, database_path: &str) -> Result<()> {
343        let db_alias = std::path::Path::new(database_path)
344            .file_stem()
345            .and_then(|s| s.to_str())
346            .unwrap_or("db");
347
348        self.execute_command(&format!(
349            "ATTACH DATABASE {} AS {}",
350            escape_sql_path(database_path),
351            escape_sql_path(db_alias)
352        ))?;
353
354        self.execute_command(&format!(
355            "SET search_path TO {}, public",
356            escape_sql_path(db_alias)
357        ))?;
358
359        Ok(())
360    }
361
362    /// Connects to a Hyper server with authentication.
363    ///
364    /// # Arguments
365    ///
366    /// * `endpoint` - The server endpoint (host:port).
367    /// * `database_path` - Path to the database file.
368    /// * `create_mode` - How to handle database creation.
369    /// * `user` - Username for authentication.
370    /// * `password` - Password for authentication.
371    ///
372    /// # Errors
373    ///
374    /// Returns an error if the connection or authentication fails.
375    pub fn connect_with_auth(
376        endpoint: &str,
377        database_path: &str,
378        create_mode: CreateMode,
379        user: &str,
380        password: &str,
381    ) -> Result<Self> {
382        crate::ConnectionBuilder::new(endpoint)
383            .database(database_path)
384            .create_mode(create_mode)
385            .user(user.to_string())
386            .password(password)
387            .build()
388    }
389
390    /// Creates a connection to a Hyper server without attaching a database.
391    ///
392    /// # Errors
393    ///
394    /// Returns [`Error::Client`] if the TCP or gRPC handshake fails, and
395    /// [`Error::Io`] if the endpoint cannot be reached.
396    pub fn without_database(endpoint: &str) -> Result<Self> {
397        crate::ConnectionBuilder::new(endpoint).build()
398    }
399
400    /// Executes a SQL command that doesn't return results.
401    ///
402    /// Use this for DDL statements (CREATE, ALTER, DROP) and DML statements
403    /// (INSERT, UPDATE, DELETE).
404    ///
405    /// # Arguments
406    ///
407    /// * `command` - The SQL command to execute.
408    ///
409    /// # Returns
410    ///
411    /// The number of affected rows, or 0 if not applicable.
412    ///
413    /// # Errors
414    ///
415    /// Returns an error if:
416    /// - The connection is using gRPC transport (write operations not yet supported)
417    /// - The command fails to execute
418    pub fn execute_command(&self, command: &str) -> Result<u64> {
419        let token = self.stats_before_query(command);
420
421        let result = self.transport.execute_command(command);
422
423        // For commands, the query is fully executed synchronously, so we
424        // can store the pending token immediately for lazy resolution.
425        self.stats_store_pending(token, command);
426
427        result
428    }
429
430    /// Executes a SQL query and returns a streaming result set.
431    ///
432    /// Results are streamed in chunks (default 64K rows), keeping memory usage
433    /// constant regardless of result set size. This makes it safe for any
434    /// result size, from a single row to billions of rows.
435    ///
436    /// # Example
437    ///
438    /// ```no_run
439    /// # use hyperdb_api::{Connection, Result};
440    /// # fn example(conn: &Connection) -> Result<()> {
441    /// let mut result = conn.execute_query("SELECT id, value FROM measurements")?;
442    /// while let Some(chunk) = result.next_chunk()? {
443    ///     for row in &chunk {
444    ///         // Generic typed access (like C++ row.get<T>())
445    ///         let id: Option<i32> = row.get(0);
446    ///         let value: Option<f64> = row.get(1);
447    ///
448    ///         // Or direct accessors
449    ///         let id = row.get_i32(0);
450    ///         let value = row.get_f64(1);
451    ///     }
452    /// }
453    /// # Ok(())
454    /// # }
455    /// ```
456    ///
457    /// # Memory Behavior
458    ///
459    /// - Only one chunk is held in memory at a time (~few MB for 64K rows)
460    /// - Safe for result sets of any size (millions/billions of rows)
461    /// - Memory usage is `O(chunk_size)`, not `O(total_rows)`
462    ///
463    /// # Errors
464    ///
465    /// - Returns [`Error::Client`] wrapping a `hyperdb_api_core::client::Error` if the
466    ///   SQL fails to parse, execute, or if the server reports an error
467    ///   while streaming.
468    /// - Returns [`Error::Io`] on transport-level I/O failures.
469    pub fn execute_query(&self, query: &str) -> Result<Rowset<'_>> {
470        let token = self.stats_before_query(query);
471
472        let result = match &self.transport {
473            Transport::Tcp(tcp) => {
474                let stream = tcp
475                    .client
476                    .query_streaming(query, DEFAULT_BINARY_CHUNK_SIZE)?;
477                Ok(Rowset::new(stream))
478            }
479            Transport::Grpc(grpc) => {
480                // gRPC streaming: pull chunks lazily so peak memory is
481                // bounded by one gRPC message (tonic default 64 MB), not
482                // by the full result size. Matches TCP's
483                // constant-memory streaming shape.
484                //
485                // The transport module already creates a fresh gRPC client
486                // per query (gRPC client needs &mut self to execute), so we
487                // do the same here: connect, start the stream, wrap as a
488                // `ChunkSource`. The stream keeps the channel and runtime
489                // alive via refcounted handles inside `GrpcChunkStreamSync`.
490                let mut client =
491                    hyperdb_api_core::client::grpc::GrpcClientSync::connect(grpc.config.clone())?;
492                let stream = client.execute_query_stream(query)?;
493                let source = Box::new(crate::grpc_connection::GrpcChunkStreamSource::new(stream));
494                let arrow_rowset = crate::arrow_result::ArrowRowset::from_stream(source)?;
495                Ok(Rowset::from_arrow(arrow_rowset))
496            }
497        };
498
499        // Store the pending token — Hyper logs the execution stats after the
500        // result is consumed (streamed), so we defer resolution until
501        // last_query_stats() is called.
502        self.stats_store_pending(token, query);
503
504        result
505    }
506
507    // =========================================================================
508    // Arrow Format Queries
509    // =========================================================================
510
511    /// Executes a SELECT query and returns results as Arrow IPC stream bytes.
512    ///
513    /// # Example
514    ///
515    /// ```no_run
516    /// use hyperdb_api::{Connection, CreateMode, Result};
517    ///
518    /// fn main() -> Result<()> {
519    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
520    ///
521    ///     // Create and populate a table
522    ///     conn.execute_command("CREATE TABLE data (id INT, value DOUBLE PRECISION)")?;
523    ///     conn.execute_command("INSERT INTO data VALUES (1, 1.5), (2, 2.5)")?;
524    ///
525    ///     // Get results as Arrow IPC stream
526    ///     let arrow_data = conn.execute_query_to_arrow("SELECT * FROM data")?;
527    ///     println!("Got {} bytes of Arrow IPC data", arrow_data.len());
528    ///
529    ///     Ok(())
530    /// }
531    /// ```
532    ///
533    /// # Errors
534    ///
535    /// Propagates any [`Error::Client`] from the TCP or gRPC transport when
536    /// the query fails or the server cannot produce Arrow IPC output.
537    pub fn execute_query_to_arrow(&self, select_query: &str) -> Result<bytes::Bytes> {
538        self.transport.execute_query_to_arrow(select_query)
539    }
540
541    /// Exports an entire table to Arrow IPC stream format.
542    ///
543    /// This is a convenience method equivalent to
544    /// `execute_query_to_arrow("SELECT * FROM table_name")`.
545    ///
546    /// # Arguments
547    ///
548    /// * `table_name` - The table name
549    ///
550    /// # Returns
551    ///
552    /// Raw Arrow IPC stream bytes containing all rows from the table.
553    ///
554    /// # Example
555    ///
556    /// ```no_run
557    /// use hyperdb_api::{Connection, CreateMode, Result};
558    ///
559    /// fn main() -> Result<()> {
560    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
561    ///     let arrow_data = conn.export_table_to_arrow("my_table")?;
562    ///     Ok(())
563    /// }
564    /// ```
565    ///
566    /// # Errors
567    ///
568    /// Returns whatever [`execute_query_to_arrow`](Self::execute_query_to_arrow)
569    /// would return for `SELECT * FROM <table_name>` — typically
570    /// [`Error::Client`] if the table does not exist or the query is rejected.
571    pub fn export_table_to_arrow(&self, table_name: &str) -> Result<bytes::Bytes> {
572        self.execute_query_to_arrow(&format!("SELECT * FROM {table_name}"))
573    }
574
575    /// Executes a SELECT query and returns results as Arrow `RecordBatch`es.
576    ///
577    /// This is the recommended method for Arrow-native workflows (`DataFusion`,
578    /// Polars, etc.) where you want direct `RecordBatch` access without going
579    /// through the `Row` abstraction.
580    ///
581    /// # Example
582    ///
583    /// ```no_run
584    /// use hyperdb_api::{Connection, CreateMode, Result};
585    /// use arrow::record_batch::RecordBatch;
586    ///
587    /// fn main() -> Result<()> {
588    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
589    ///
590    ///     let batches: Vec<RecordBatch> = conn.execute_query_to_batches("SELECT * FROM data")?;
591    ///     for batch in &batches {
592    ///         println!("batch: {} rows x {} cols", batch.num_rows(), batch.num_columns());
593    ///     }
594    ///     Ok(())
595    /// }
596    /// ```
597    ///
598    /// # Errors
599    ///
600    /// - Returns [`Error::Client`] if the query itself fails.
601    /// - Returns [`Error::Other`] if the Arrow IPC payload returned by the
602    ///   server is malformed and cannot be decoded into record batches.
603    pub fn execute_query_to_batches(
604        &self,
605        select_query: &str,
606    ) -> Result<Vec<arrow::record_batch::RecordBatch>> {
607        let arrow_data = self.execute_query_to_arrow(select_query)?;
608        crate::arrow_result::parse_arrow_ipc(arrow_data)
609    }
610
611    /// Fetches a single row from a query.
612    ///
613    /// Returns an error if the query returns no rows.
614    ///
615    /// # Example
616    ///
617    /// ```no_run
618    /// use hyperdb_api::{Connection, CreateMode, Result};
619    ///
620    /// fn main() -> Result<()> {
621    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
622    ///     let row = conn.fetch_one("SELECT * FROM users WHERE id = 1")?;
623    ///     let id: Option<i32> = row.get(0);
624    ///     let name: Option<String> = row.get(1);
625    ///     Ok(())
626    /// }
627    /// ```
628    ///
629    /// # Errors
630    ///
631    /// - Returns the error from [`execute_query`](Self::execute_query) if
632    ///   the query itself fails.
633    /// - Returns [`Error::Other`] with message `"Query returned no rows"` if
634    ///   the query produced zero rows.
635    pub fn fetch_one<Q>(&self, query: Q) -> Result<crate::Row>
636    where
637        Q: AsRef<str>,
638    {
639        let query = query.as_ref();
640        let result = self.execute_query(query)?;
641        result.require_first_row()
642    }
643
644    /// Fetches an optional single row from a query.
645    ///
646    /// Returns `None` if the query returns no rows.
647    ///
648    /// # Example
649    ///
650    /// ```no_run
651    /// use hyperdb_api::{Connection, CreateMode, Result};
652    ///
653    /// fn main() -> Result<()> {
654    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
655    ///     if let Some(row) = conn.fetch_optional("SELECT * FROM users WHERE id = 999")? {
656    ///         let name: Option<String> = row.get(1);
657    ///         println!("Found user: {:?}", name);
658    ///     }
659    ///     Ok(())
660    /// }
661    /// ```
662    ///
663    /// # Errors
664    ///
665    /// Returns the error from [`execute_query`](Self::execute_query) if the
666    /// query itself fails. An empty result set is not an error — it yields
667    /// `Ok(None)`.
668    pub fn fetch_optional<Q>(&self, query: Q) -> Result<Option<crate::Row>>
669    where
670        Q: AsRef<str>,
671    {
672        let query = query.as_ref();
673        let result = self.execute_query(query)?;
674        result.first_row()
675    }
676
677    /// Fetches all rows from a query.
678    ///
679    /// # Example
680    ///
681    /// ```no_run
682    /// use hyperdb_api::{Connection, CreateMode, Result};
683    ///
684    /// fn main() -> Result<()> {
685    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
686    ///     let rows = conn.fetch_all("SELECT * FROM users WHERE active = true ORDER BY name")?;
687    ///     for row in rows {
688    ///         let id: Option<i32> = row.get(0);
689    ///         let name: Option<String> = row.get(1);
690    ///         println!("User {}: {:?}", id.unwrap_or(-1), name);
691    ///     }
692    ///     Ok(())
693    /// }
694    /// ```
695    ///
696    /// # Errors
697    ///
698    /// Returns the error from [`execute_query`](Self::execute_query), or a
699    /// transport error produced while draining every chunk of the streamed
700    /// result set.
701    pub fn fetch_all<Q>(&self, query: Q) -> Result<Vec<crate::Row>>
702    where
703        Q: AsRef<str>,
704    {
705        let query = query.as_ref();
706        let result = self.execute_query(query)?;
707        result.collect_rows()
708    }
709
710    /// Fetches a single row and maps it to a struct using [`FromRow`](crate::FromRow).
711    ///
712    /// Returns an error if the query returns no rows or if mapping fails.
713    ///
714    /// # Example
715    ///
716    /// ```no_run
717    /// use hyperdb_api::{Connection, CreateMode, Row, FromRow, Result};
718    ///
719    /// struct User { id: i32, name: String }
720    ///
721    /// impl FromRow for User {
722    ///     fn from_row(row: &Row) -> Result<Self> {
723    ///         Ok(User {
724    ///             id: row.get::<i32>(0).ok_or_else(|| hyperdb_api::Error::new("NULL id"))?,
725    ///             name: row.get::<String>(1).unwrap_or_default(),
726    ///         })
727    ///     }
728    /// }
729    ///
730    /// fn main() -> Result<()> {
731    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
732    ///     let user: User = conn.fetch_one_as("SELECT id, name FROM users WHERE id = 1")?;
733    ///     Ok(())
734    /// }
735    /// ```
736    ///
737    /// # Errors
738    ///
739    /// - Returns the error from [`fetch_one`](Self::fetch_one) if the query
740    ///   fails or returns no rows.
741    /// - Returns whatever error [`FromRow::from_row`](crate::FromRow::from_row)
742    ///   produces when the row cannot be mapped into `T`.
743    pub fn fetch_one_as<T: crate::FromRow>(&self, query: &str) -> Result<T> {
744        let row = self.fetch_one(query)?;
745        T::from_row(&row)
746    }
747
748    /// Fetches all rows and maps them to structs using [`FromRow`](crate::FromRow).
749    ///
750    /// # Example
751    ///
752    /// ```no_run
753    /// # use hyperdb_api::{Connection, CreateMode, Row, FromRow, Result};
754    /// # struct User { id: i32, name: String }
755    /// # impl FromRow for User {
756    /// #     fn from_row(row: &Row) -> Result<Self> {
757    /// #         Ok(User { id: row.get::<i32>(0).unwrap_or(0), name: row.get::<String>(1).unwrap_or_default() })
758    /// #     }
759    /// # }
760    /// # fn example(conn: &Connection) -> Result<()> {
761    /// let users: Vec<User> = conn.fetch_all_as("SELECT id, name FROM users")?;
762    /// # Ok(())
763    /// # }
764    /// ```
765    ///
766    /// # Errors
767    ///
768    /// - Returns the error from [`fetch_all`](Self::fetch_all) if the query
769    ///   fails.
770    /// - Returns the first error produced by
771    ///   [`FromRow::from_row`](crate::FromRow::from_row) on any of the rows.
772    pub fn fetch_all_as<T: crate::FromRow>(&self, query: &str) -> Result<Vec<T>> {
773        let rows = self.fetch_all(query)?;
774        rows.iter().map(|r| T::from_row(r)).collect()
775    }
776
777    /// Fetches a single scalar value from a query.
778    ///
779    /// Returns an error if the query returns no rows or NULL.
780    ///
781    /// # Example
782    ///
783    /// ```no_run
784    /// use hyperdb_api::{Connection, CreateMode, Result};
785    ///
786    /// fn main() -> Result<()> {
787    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
788    ///     let count: i64 = conn.fetch_scalar("SELECT COUNT(*) FROM users")?;
789    ///     println!("User count: {}", count);
790    ///     Ok(())
791    /// }
792    /// ```
793    ///
794    /// # Errors
795    ///
796    /// - Returns the error from [`execute_query`](Self::execute_query) if
797    ///   the query itself fails.
798    /// - Returns [`Error::Other`] with message `"Query returned no rows"` if
799    ///   the query produced zero rows.
800    /// - Returns [`Error::Other`] with message `"Scalar query returned NULL"`
801    ///   if the single cell is SQL `NULL`.
802    pub fn fetch_scalar<T, Q>(&self, query: Q) -> Result<T>
803    where
804        T: crate::connection::ScalarValue + crate::result::RowValue,
805        Q: AsRef<str>,
806    {
807        let query = query.as_ref();
808        let result = self.execute_query(query)?;
809        result.require_scalar()
810    }
811
812    /// Fetches an optional scalar value from a query.
813    ///
814    /// Returns `None` if the query returns no rows or NULL.
815    ///
816    /// # Example
817    ///
818    /// ```no_run
819    /// use hyperdb_api::{Connection, CreateMode, Result};
820    ///
821    /// fn main() -> Result<()> {
822    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
823    ///     let max_id: Option<i32> = conn.fetch_optional_scalar("SELECT MAX(id) FROM users")?;
824    ///     println!("Max ID: {:?}", max_id);
825    ///     Ok(())
826    /// }
827    /// ```
828    ///
829    /// # Errors
830    ///
831    /// - Returns the error from [`execute_query`](Self::execute_query) if
832    ///   the query itself fails.
833    /// - Returns [`Error::Other`] with message `"Query returned no rows"` if
834    ///   the query produced zero rows. (An empty result is treated as an
835    ///   error here because we need at least one row to inspect; SQL `NULL`
836    ///   in the single cell yields `Ok(None)`.)
837    pub fn fetch_optional_scalar<T, Q>(&self, query: Q) -> Result<Option<T>>
838    where
839        T: crate::connection::ScalarValue + crate::result::RowValue,
840        Q: AsRef<str>,
841    {
842        let query = query.as_ref();
843        let result = self.execute_query(query)?;
844        result.scalar()
845    }
846
847    /// Executes a scalar query and returns a single value of type `T`.
848    ///
849    /// Alias for [`fetch_optional_scalar`](Self::fetch_optional_scalar) for C++ API compatibility.
850    ///
851    /// # Errors
852    ///
853    /// See [`fetch_optional_scalar`](Self::fetch_optional_scalar).
854    #[inline]
855    pub fn execute_scalar_query<T>(&self, query: &str) -> Result<Option<T>>
856    where
857        T: ScalarValue + crate::result::RowValue,
858    {
859        self.fetch_optional_scalar(query)
860    }
861
862    /// Queries for a count value, defaulting to 0 if NULL.
863    ///
864    /// This is optimized for COUNT queries which typically return 0
865    /// instead of NULL when there are no matching rows.
866    ///
867    /// # Example
868    ///
869    /// ```no_run
870    /// use hyperdb_api::{Connection, CreateMode, Result};
871    ///
872    /// fn main() -> Result<()> {
873    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
874    ///     let count = conn.query_count("SELECT COUNT(*) FROM users WHERE active = true")?;
875    ///     println!("Active users: {}", count);
876    ///     Ok(())
877    /// }
878    /// ```
879    ///
880    /// # Errors
881    ///
882    /// Returns the error from [`execute_query`](Self::execute_query) if the
883    /// query fails or produces no rows. SQL `NULL` is mapped to `0`, not an
884    /// error.
885    pub fn query_count(&self, query: &str) -> Result<i64> {
886        self.fetch_optional_scalar::<i64, _>(query)
887            .map(|opt| opt.unwrap_or(0))
888    }
889
890    // =========================================================================
891    // Parameterized Queries (SQL Injection Safe)
892    // =========================================================================
893
894    /// Executes a parameterized query, returning streaming results.
895    ///
896    /// This is safe to use with untrusted user input: parameters travel
897    /// through the extended query protocol (Parse/Bind/Execute) as
898    /// binary `HyperBinary` values and are never interpolated into the
899    /// SQL string. For repeated executions of the same SQL with different
900    /// values, prefer the explicit [`prepare`](Self::prepare) API — it
901    /// returns a reusable [`PreparedStatement`](crate::PreparedStatement)
902    /// that skips the Parse round-trip on every call.
903    ///
904    /// Under the hood, `query_params` is a one-shot
905    /// prepare+execute+close: it prepares an unnamed statement, binds
906    /// the parameters, starts streaming, and closes the statement when
907    /// the returned [`Rowset`] is dropped.
908    ///
909    /// # Arguments
910    ///
911    /// * `query` - The SQL query with parameter placeholders (`$1`, `$2`, etc.)
912    /// * `params` - Parameter values matching the placeholders
913    ///
914    /// # SQL Injection Prevention
915    ///
916    /// ```no_run
917    /// use hyperdb_api::{Connection, CreateMode, Result};
918    ///
919    /// fn search_users(conn: &Connection, user_input: &str) -> Result<()> {
920    ///     // DANGEROUS - vulnerable to SQL injection:
921    ///     // let query = format!("SELECT * FROM users WHERE name = '{}'", user_input);
922    ///
923    ///     // SAFE - parameterized query:
924    ///     let mut result = conn.query_params(
925    ///         "SELECT * FROM users WHERE name = $1",
926    ///         &[&user_input],
927    ///     )?;
928    ///
929    ///     while let Some(chunk) = result.next_chunk()? {
930    ///         for row in &chunk {
931    ///             let id: Option<i32> = row.get(0);
932    ///             let name: Option<String> = row.get(1);
933    ///             println!("Found: {:?} - {:?}", id, name);
934    ///         }
935    ///     }
936    ///     Ok(())
937    /// }
938    /// ```
939    ///
940    /// # Multiple Parameters
941    ///
942    /// ```no_run
943    /// use hyperdb_api::{Connection, CreateMode, Result};
944    ///
945    /// fn main() -> Result<()> {
946    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
947    ///
948    ///     // Multiple parameters of different types
949    ///     let result = conn.query_params(
950    ///         "SELECT * FROM orders WHERE customer_id = $1 AND total > $2",
951    ///         &[&42i32, &100.0f64],
952    ///     )?;
953    ///     Ok(())
954    /// }
955    /// ```
956    ///
957    /// # Errors
958    ///
959    /// - Returns [`Error::Other`] if the connection is using gRPC transport
960    ///   (prepared statements are TCP-only).
961    /// - Returns [`Error::Client`] if the server rejects the statement at
962    ///   `Parse`, `Bind`, or `Execute` time, including on type-mismatch
963    ///   between `params` and the inferred OIDs.
964    /// - Returns [`Error::Io`] on transport-level I/O failures.
965    pub fn query_params(
966        &self,
967        query: &str,
968        params: &[&dyn crate::params::ToSqlParam],
969    ) -> Result<Rowset<'_>> {
970        // Implementation note: routes through the extended query protocol
971        // via Parse/Bind/Execute so parameters travel in HyperBinary
972        // format — no SQL escaping, full SQL-injection safety regardless of
973        // parameter content. The statement handle is stashed inside the
974        // returned Rowset so its Drop-time close_statement fires *after*
975        // the rowset releases its connection lock (otherwise the close
976        // would deadlock on the still-held mutex).
977        let client = match &self.transport {
978            Transport::Tcp(tcp) => &tcp.client,
979            Transport::Grpc(_) => {
980                return Err(Error::new(
981                    "prepared statements are not supported over gRPC transport",
982                ));
983            }
984        };
985        let oids: Vec<crate::Oid> = params.iter().map(|p| p.sql_oid()).collect();
986        let stmt = client.prepare_typed(query, &oids)?;
987        let encoded: Vec<Option<Vec<u8>>> = params.iter().map(|p| p.encode_param()).collect();
988        let stream =
989            client.execute_streaming(&stmt, encoded, crate::result::DEFAULT_BINARY_CHUNK_SIZE)?;
990        Ok(Rowset::from_prepared(stream).with_statement_guard(stmt))
991    }
992
993    /// Executes a parameterized command that doesn't return rows.
994    ///
995    /// Use this for INSERT, UPDATE, DELETE, or DDL statements with parameters.
996    /// Returns the number of affected rows.
997    ///
998    /// See [`query_params`](Self::query_params) for details on parameter
999    /// handling and SQL injection prevention.
1000    ///
1001    /// # Example
1002    ///
1003    /// ```no_run
1004    /// use hyperdb_api::{Connection, CreateMode, Result};
1005    ///
1006    /// fn delete_user(conn: &Connection, user_id: i32) -> Result<u64> {
1007    ///     // Safe from SQL injection
1008    ///     conn.command_params("DELETE FROM users WHERE id = $1", &[&user_id])
1009    /// }
1010    /// ```
1011    ///
1012    /// # Errors
1013    ///
1014    /// - Returns [`Error::Other`] if the connection is using gRPC transport.
1015    /// - Returns [`Error::Client`] if the server rejects the statement at
1016    ///   `Parse`, `Bind`, or `Execute` time.
1017    /// - Returns [`Error::Io`] on transport-level I/O failures.
1018    pub fn command_params(
1019        &self,
1020        query: &str,
1021        params: &[&dyn crate::params::ToSqlParam],
1022    ) -> Result<u64> {
1023        // One-shot prepare+execute with explicit OIDs — see `query_params`
1024        // for why we collect OIDs from each parameter.
1025        let client = match &self.transport {
1026            Transport::Tcp(tcp) => &tcp.client,
1027            Transport::Grpc(_) => {
1028                return Err(Error::new(
1029                    "prepared statements are not supported over gRPC transport",
1030                ));
1031            }
1032        };
1033        let oids: Vec<crate::Oid> = params.iter().map(|p| p.sql_oid()).collect();
1034        let stmt = client.prepare_typed(query, &oids)?;
1035        let encoded: Vec<Option<Vec<u8>>> = params.iter().map(|p| p.encode_param()).collect();
1036        Ok(client.execute_no_result(&stmt, encoded)?)
1037    }
1038
1039    /// Executes multiple SQL statements in a single call.
1040    ///
1041    /// Each statement is executed sequentially. If any statement fails,
1042    /// execution stops and the error is returned. Returns the total number
1043    /// of affected rows across all statements.
1044    ///
1045    /// This is more efficient than calling `execute_command` in a loop
1046    /// because it reduces round-trips for DDL scripts and multi-statement setup.
1047    ///
1048    /// # Example
1049    ///
1050    /// ```no_run
1051    /// use hyperdb_api::{Connection, CreateMode, Result};
1052    ///
1053    /// fn main() -> Result<()> {
1054    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
1055    ///     let total = conn.execute_batch(&[
1056    ///         "CREATE TABLE users (id INT, name TEXT)",
1057    ///         "INSERT INTO users VALUES (1, 'Alice')",
1058    ///         "INSERT INTO users VALUES (2, 'Bob')",
1059    ///     ])?;
1060    ///     println!("Total affected: {}", total);
1061    ///     Ok(())
1062    /// }
1063    /// ```
1064    ///
1065    /// # Errors
1066    ///
1067    /// Returns a wrapped [`Error::Other`] on the first statement that fails;
1068    /// its `source` is the original [`Error::Client`] from
1069    /// [`execute_command`](Self::execute_command). The error message
1070    /// includes the failing statement's ordinal and an 80-character preview
1071    /// of its SQL.
1072    pub fn execute_batch(&self, statements: &[&str]) -> Result<u64> {
1073        let mut total = 0u64;
1074        for (i, stmt) in statements.iter().enumerate() {
1075            if !stmt.trim().is_empty() {
1076                total += self.execute_command(stmt).map_err(|e| {
1077                    let preview: String = stmt.chars().take(80).collect();
1078                    Error::with_cause(
1079                        format!(
1080                            "execute_batch failed at statement {} of {}: {}",
1081                            i + 1,
1082                            statements.len(),
1083                            preview,
1084                        ),
1085                        e,
1086                    )
1087                })?;
1088            }
1089        }
1090        Ok(total)
1091    }
1092
1093    /// Returns the attached database path, if any.
1094    pub fn database(&self) -> Option<&str> {
1095        self.database.as_deref()
1096    }
1097
1098    /// Creates a new database file.
1099    ///
1100    /// # Example
1101    ///
1102    /// ```no_run
1103    /// use hyperdb_api::{Connection, Result};
1104    ///
1105    /// fn main() -> Result<()> {
1106    ///     let conn = Connection::without_database("localhost:7483")?;
1107    ///     conn.create_database("new_database.hyper")?;
1108    ///     Ok(())
1109    /// }
1110    /// ```
1111    ///
1112    /// # Errors
1113    ///
1114    /// Returns [`Error::Client`] if the server rejects the
1115    /// `CREATE DATABASE IF NOT EXISTS` statement (e.g. the path is not
1116    /// writable on the server).
1117    pub fn create_database(&self, path: &str) -> Result<()> {
1118        let sql = format!("CREATE DATABASE IF NOT EXISTS {}", escape_sql_path(path));
1119        self.execute_command(&sql)?;
1120        Ok(())
1121    }
1122
1123    /// Drops (deletes) a database file.
1124    ///
1125    /// # Example
1126    ///
1127    /// ```no_run
1128    /// use hyperdb_api::{Connection, Result};
1129    ///
1130    /// fn main() -> Result<()> {
1131    ///     let conn = Connection::without_database("localhost:7483")?;
1132    ///     conn.drop_database("old_database.hyper")?;
1133    ///     Ok(())
1134    /// }
1135    /// ```
1136    ///
1137    /// # Errors
1138    ///
1139    /// Returns [`Error::Client`] if the server rejects the
1140    /// `DROP DATABASE IF EXISTS` statement (e.g. the database is still
1141    /// attached or permissions deny deletion).
1142    pub fn drop_database(&self, path: &str) -> Result<()> {
1143        let sql = format!("DROP DATABASE IF EXISTS {}", escape_sql_path(path));
1144        self.execute_command(&sql)?;
1145        Ok(())
1146    }
1147
1148    /// Attaches a database file to the connection.
1149    ///
1150    /// Once attached, the database can be queried and modified.
1151    /// The database is identified by its alias (or by its path if no alias is provided).
1152    ///
1153    /// # Arguments
1154    ///
1155    /// * `path` - The path to the database file to attach.
1156    /// * `alias` - Optional alias for the database. If `None`, the database is
1157    ///   attached without an explicit alias (typically using its filename).
1158    ///
1159    /// # Errors
1160    ///
1161    /// Returns an error if the database file doesn't exist or if attachment fails.
1162    ///
1163    /// # Example
1164    ///
1165    /// ```no_run
1166    /// use hyperdb_api::{Connection, Result};
1167    ///
1168    /// fn main() -> Result<()> {
1169    ///     let conn = Connection::without_database("localhost:7483")?;
1170    ///
1171    ///     // Attach with an alias
1172    ///     conn.attach_database("data.hyper", Some("mydata"))?;
1173    ///
1174    ///     // Attach without an alias
1175    ///     conn.attach_database("other.hyper", None)?;
1176    ///     Ok(())
1177    /// }
1178    /// ```
1179    pub fn attach_database(&self, path: &str, alias: Option<&str>) -> Result<()> {
1180        let sql = if let Some(alias) = alias {
1181            format!(
1182                "ATTACH DATABASE {} AS {}",
1183                escape_sql_path(path),
1184                escape_sql_path(alias)
1185            )
1186        } else {
1187            format!("ATTACH DATABASE {}", escape_sql_path(path))
1188        };
1189        self.execute_command(&sql)?;
1190        Ok(())
1191    }
1192
1193    /// Detaches a database from this connection.
1194    ///
1195    /// After detaching, the database file is released and can be accessed
1196    /// externally (e.g., copied, moved, etc.). All pending updates are
1197    /// written to disk before detaching.
1198    ///
1199    /// # Arguments
1200    ///
1201    /// * `alias` - The alias of the database to detach.
1202    ///
1203    /// # Errors
1204    ///
1205    /// Returns an error if the database is not attached or if detachment fails.
1206    ///
1207    /// # Example
1208    ///
1209    /// ```no_run
1210    /// use hyperdb_api::{Connection, Result};
1211    ///
1212    /// fn main() -> Result<()> {
1213    ///     let conn = Connection::without_database("localhost:7483")?;
1214    ///     conn.attach_database("data.hyper", Some("mydata"))?;
1215    ///     // ... work with the database ...
1216    ///     conn.detach_database("mydata")?;
1217    ///     Ok(())
1218    /// }
1219    /// ```
1220    pub fn detach_database(&self, alias: &str) -> Result<()> {
1221        let sql = format!("DETACH DATABASE {}", escape_sql_path(alias));
1222        self.execute_command(&sql)?;
1223        Ok(())
1224    }
1225
1226    /// Detaches all databases from this connection.
1227    ///
1228    /// This is useful for cleanup before closing a connection or when
1229    /// you need to release all database files.
1230    ///
1231    /// # Errors
1232    ///
1233    /// Returns [`Error::Client`] if the server rejects the
1234    /// `DETACH ALL DATABASES` statement (e.g. a database is still in use by
1235    /// another session).
1236    pub fn detach_all_databases(&self) -> Result<()> {
1237        self.execute_command("DETACH ALL DATABASES")?;
1238        Ok(())
1239    }
1240
1241    /// Creates a schema in the database.
1242    ///
1243    /// # Errors
1244    ///
1245    /// - Returns an error if `schema_name` cannot be converted into a
1246    ///   [`SchemaName`](crate::SchemaName) (invalid identifier).
1247    /// - Returns [`Error::Client`] if the server rejects the
1248    ///   `CREATE SCHEMA` statement (e.g. the schema already exists).
1249    pub fn create_schema<T>(&self, schema_name: T) -> Result<()>
1250    where
1251        T: TryInto<crate::SchemaName>,
1252        crate::Error: From<T::Error>,
1253    {
1254        crate::catalog::Catalog::new(self).create_schema(schema_name)
1255    }
1256
1257    /// Checks whether a schema exists.
1258    ///
1259    /// # Arguments
1260    ///
1261    /// * `schema` - The schema name (can include database qualifier).
1262    ///
1263    /// # Example
1264    ///
1265    /// ```no_run
1266    /// use hyperdb_api::{Connection, Result};
1267    ///
1268    /// fn main() -> Result<()> {
1269    ///     let conn = Connection::without_database("localhost:7483")?;
1270    ///     if conn.has_schema("public")? {
1271    ///         println!("Schema 'public' exists");
1272    ///     }
1273    ///     Ok(())
1274    /// }
1275    /// ```
1276    ///
1277    /// # Errors
1278    ///
1279    /// - Returns an error if `schema` cannot be converted into a
1280    ///   [`SchemaName`](crate::SchemaName).
1281    /// - Returns [`Error::Client`] if the catalog lookup query fails.
1282    pub fn has_schema<T>(&self, schema: T) -> Result<bool>
1283    where
1284        T: TryInto<crate::SchemaName>,
1285        crate::Error: From<T::Error>,
1286    {
1287        use crate::catalog::Catalog;
1288        Catalog::new(self).has_schema(schema)
1289    }
1290
1291    /// Checks whether a table exists.
1292    ///
1293    /// # Arguments
1294    ///
1295    /// * `table_name` - The table name (can include database and schema qualifiers).
1296    ///
1297    /// # Example
1298    ///
1299    /// ```no_run
1300    /// use hyperdb_api::{Connection, Result};
1301    ///
1302    /// fn main() -> Result<()> {
1303    ///     let conn = Connection::without_database("localhost:7483")?;
1304    ///     if conn.has_table("public.users")? {
1305    ///         println!("Table 'users' exists");
1306    ///     }
1307    ///     Ok(())
1308    /// }
1309    /// ```
1310    ///
1311    /// # Errors
1312    ///
1313    /// - Returns an error if `table_name` cannot be converted into a
1314    ///   [`TableName`](crate::TableName).
1315    /// - Returns [`Error::Client`] if the catalog lookup query fails.
1316    pub fn has_table<T>(&self, table_name: T) -> Result<bool>
1317    where
1318        T: TryInto<crate::TableName>,
1319        crate::Error: From<T::Error>,
1320    {
1321        use crate::catalog::Catalog;
1322        Catalog::new(self).has_table(table_name)
1323    }
1324
1325    /// Returns the server version as a parsed struct.
1326    ///
1327    /// Returns `None` if the version cannot be determined (e.g., gRPC connection).
1328    ///
1329    /// # Example
1330    ///
1331    /// ```no_run
1332    /// use hyperdb_api::{Connection, CreateMode, HyperProcess, ServerVersion, Result};
1333    ///
1334    /// fn main() -> Result<()> {
1335    ///     let hyper = HyperProcess::new(None, None)?;
1336    ///     let conn = Connection::new(&hyper, "test.hyper", CreateMode::CreateIfNotExists)?;
1337    ///     if let Some(version) = conn.server_version() {
1338    ///         println!("Hyper {}", version);
1339    ///         if version >= ServerVersion::new(0, 1, 0) {
1340    ///             println!("Has feature X");
1341    ///         }
1342    ///     }
1343    ///     Ok(())
1344    /// }
1345    /// ```
1346    pub fn server_version(&self) -> Option<crate::ServerVersion> {
1347        let version_str = self.parameter_status("server_version")?;
1348        crate::ServerVersion::parse(&version_str)
1349    }
1350
1351    /// Copies a database file to a new path.
1352    ///
1353    /// The source database must be attached to this connection.
1354    ///
1355    /// # Example
1356    ///
1357    /// ```no_run
1358    /// use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
1359    ///
1360    /// fn main() -> Result<()> {
1361    ///     let hyper = HyperProcess::new(None, None)?;
1362    ///     let conn = Connection::new(&hyper, "source.hyper", CreateMode::DoNotCreate)?;
1363    ///     conn.copy_database("source.hyper", "backup.hyper")?;
1364    ///     Ok(())
1365    /// }
1366    /// ```
1367    ///
1368    /// # Errors
1369    ///
1370    /// Returns [`Error::Client`] if the server rejects the
1371    /// `COPY DATABASE` statement — e.g. the source is not attached, the
1372    /// destination path is not writable, or it already exists.
1373    pub fn copy_database(&self, source: &str, destination: &str) -> Result<()> {
1374        let sql = format!(
1375            "COPY DATABASE {} TO {}",
1376            escape_sql_path(source),
1377            escape_sql_path(destination)
1378        );
1379        self.execute_command(&sql)?;
1380        Ok(())
1381    }
1382
1383    /// Executes EXPLAIN on a query and returns the plan as a string.
1384    ///
1385    /// # Example
1386    ///
1387    /// ```no_run
1388    /// use hyperdb_api::{Connection, CreateMode, Result};
1389    ///
1390    /// fn main() -> Result<()> {
1391    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
1392    ///     let plan = conn.explain("SELECT * FROM users WHERE id = 1")?;
1393    ///     println!("{}", plan);
1394    ///     Ok(())
1395    /// }
1396    /// ```
1397    ///
1398    /// # Errors
1399    ///
1400    /// Returns [`Error::Client`] if `EXPLAIN <query>` fails to parse or
1401    /// plan, or if the streamed result cannot be consumed.
1402    pub fn explain(&self, query: &str) -> Result<String> {
1403        let explain_sql = format!("EXPLAIN {query}");
1404        let result = self.execute_query(&explain_sql)?;
1405        let mut lines = Vec::new();
1406        for row in result.rows() {
1407            let row = row?;
1408            if let Some(line) = row.get::<String>(0) {
1409                lines.push(line);
1410            }
1411        }
1412        Ok(lines.join("\n"))
1413    }
1414
1415    /// Executes EXPLAIN ANALYZE on a query and returns the plan with timing info.
1416    ///
1417    /// **Note:** This actually executes the query to collect timing information.
1418    ///
1419    /// # Errors
1420    ///
1421    /// Returns [`Error::Client`] if `EXPLAIN ANALYZE <query>` fails — this
1422    /// includes any runtime error raised by actually executing `query`.
1423    pub fn explain_analyze(&self, query: &str) -> Result<String> {
1424        let explain_sql = format!("EXPLAIN ANALYZE {query}");
1425        let result = self.execute_query(&explain_sql)?;
1426        let mut lines = Vec::new();
1427        for row in result.rows() {
1428            let row = row?;
1429            if let Some(line) = row.get::<String>(0) {
1430                lines.push(line);
1431            }
1432        }
1433        Ok(lines.join("\n"))
1434    }
1435
1436    /// Returns a reference to the underlying TCP client.
1437    ///
1438    /// # Panics
1439    ///
1440    /// This method returns `None` if the connection is using gRPC transport.
1441    pub fn tcp_client(&self) -> Option<&Client> {
1442        match &self.transport {
1443            Transport::Tcp(tcp) => Some(&tcp.client),
1444            Transport::Grpc(_) => None,
1445        }
1446    }
1447
1448    /// Crate-internal accessor for the transport. Used by
1449    /// [`PreparedStatement`](crate::PreparedStatement) to reach the
1450    /// underlying `hyperdb_api_core::client::Client`.
1451    pub(crate) fn transport(&self) -> &Transport {
1452        &self.transport
1453    }
1454
1455    /// Prepares a SQL statement with automatic parameter type inference.
1456    ///
1457    /// The returned [`PreparedStatement`](crate::PreparedStatement) can
1458    /// be executed many times with different parameter values; the
1459    /// server caches the parsed plan. This is the preferred way to
1460    /// execute a statement repeatedly inside a loop.
1461    ///
1462    /// For explicit parameter types (necessary when `$N` placeholders
1463    /// would otherwise be ambiguous), use
1464    /// [`prepare_typed`](Self::prepare_typed).
1465    ///
1466    /// # Example
1467    ///
1468    /// ```no_run
1469    /// # use hyperdb_api::{Connection, CreateMode, Result};
1470    /// # fn example(conn: &Connection) -> Result<()> {
1471    /// let stmt = conn.prepare("SELECT name FROM users WHERE id = $1")?;
1472    /// for id in [1_i32, 2, 3] {
1473    ///     let name: String = stmt.fetch_scalar(&[&id])?;
1474    ///     println!("{id}: {name}");
1475    /// }
1476    /// # Ok(())
1477    /// # }
1478    /// ```
1479    ///
1480    /// # Errors
1481    ///
1482    /// See [`prepare_typed`](Self::prepare_typed) — this method delegates
1483    /// to it with an empty OID list.
1484    pub fn prepare(&self, query: &str) -> Result<crate::PreparedStatement<'_>> {
1485        self.prepare_typed(query, &[])
1486    }
1487
1488    /// Prepares a SQL statement with explicit parameter type OIDs.
1489    ///
1490    /// Use this when the server cannot infer parameter types from the
1491    /// SQL alone (e.g. a bare `$1` in a `WHERE v > $1` clause with no
1492    /// other context). Constants for common types live in
1493    /// [`hyperdb_api_core::types::oids`].
1494    ///
1495    /// # Errors
1496    ///
1497    /// - Returns [`Error::Other`] if the connection is using gRPC transport
1498    ///   (prepared statements are TCP-only).
1499    /// - Returns [`Error::Client`] if the server rejects the `Parse`
1500    ///   message, e.g. SQL syntax error or unknown OID.
1501    /// - Returns [`Error::Io`] on transport-level I/O failures.
1502    pub fn prepare_typed(
1503        &self,
1504        query: &str,
1505        param_types: &[crate::Oid],
1506    ) -> Result<crate::PreparedStatement<'_>> {
1507        let client = match &self.transport {
1508            Transport::Tcp(tcp) => &tcp.client,
1509            Transport::Grpc(_) => {
1510                return Err(Error::new(
1511                    "prepared statements are not supported over gRPC transport",
1512                ));
1513            }
1514        };
1515        let inner = client.prepare_typed(query, param_types)?;
1516        crate::PreparedStatement::new(self, inner)
1517    }
1518
1519    /// Returns true if the connection is alive (passive check).
1520    ///
1521    /// This is a lightweight check that does not send any data to the server.
1522    /// For an active health check, use [`ping`](Self::ping).
1523    pub fn is_alive(&self) -> bool {
1524        match &self.transport {
1525            Transport::Tcp(tcp) => tcp.client.is_alive(),
1526            Transport::Grpc(_) => true, // gRPC connections are stateless
1527        }
1528    }
1529
1530    /// Actively checks that the connection is healthy by executing a trivial query.
1531    ///
1532    /// Unlike [`is_alive`](Self::is_alive) which only checks local state,
1533    /// this method sends `SELECT 1` to the server and verifies a response.
1534    ///
1535    /// # Example
1536    ///
1537    /// ```no_run
1538    /// # use hyperdb_api::{Connection, CreateMode, Result};
1539    /// # fn example(conn: &Connection) -> Result<()> {
1540    /// if conn.ping().is_ok() {
1541    ///     println!("Connection is healthy");
1542    /// }
1543    /// # Ok(())
1544    /// # }
1545    /// ```
1546    ///
1547    /// # Errors
1548    ///
1549    /// Returns [`Error::Client`] or [`Error::Io`] if the `SELECT 1`
1550    /// round-trip fails — i.e. the connection is no longer usable.
1551    pub fn ping(&self) -> Result<()> {
1552        self.execute_command("SELECT 1")?;
1553        Ok(())
1554    }
1555
1556    /// Returns the process ID of the backend server connection.
1557    ///
1558    /// Returns 0 for gRPC connections (not applicable).
1559    pub fn process_id(&self) -> i32 {
1560        match &self.transport {
1561            Transport::Tcp(tcp) => tcp.client.process_id(),
1562            Transport::Grpc(_) => 0,
1563        }
1564    }
1565
1566    /// Returns the secret key for the backend server connection.
1567    ///
1568    /// This is used for cancellation requests.
1569    /// Returns 0 for gRPC connections (not applicable).
1570    pub fn secret_key(&self) -> i32 {
1571        match &self.transport {
1572            Transport::Tcp(tcp) => tcp.client.secret_key(),
1573            Transport::Grpc(_) => 0,
1574        }
1575    }
1576
1577    /// Returns a server parameter value by name.
1578    ///
1579    /// Server parameters are sent by the server during connection startup.
1580    /// Common parameters include:
1581    /// - `server_version` - The server version string
1582    /// - `server_encoding` - The server's character encoding
1583    /// - `client_encoding` - The client's character encoding
1584    /// - `DateStyle` - Date display format
1585    /// - `TimeZone` - Server timezone
1586    /// - `session_identifier` - Session ID for connection migration (if routing enabled)
1587    ///
1588    /// Returns `None` if the parameter is not known.
1589    ///
1590    /// # Example
1591    ///
1592    /// ```no_run
1593    /// use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
1594    ///
1595    /// fn main() -> Result<()> {
1596    ///     let hyper = HyperProcess::new(None, None)?;
1597    ///     let conn = Connection::new(&hyper, "test.hyper", CreateMode::CreateIfNotExists)?;
1598    ///
1599    ///     if let Some(version) = conn.parameter_status("server_version") {
1600    ///         println!("Connected to Hyper version: {}", version);
1601    ///     }
1602    ///     Ok(())
1603    /// }
1604    /// ```
1605    pub fn parameter_status(&self, name: &str) -> Option<String> {
1606        match &self.transport {
1607            Transport::Tcp(tcp) => tcp.client.parameter_status(name),
1608            Transport::Grpc(_) => None, // gRPC doesn't have server parameters
1609        }
1610    }
1611
1612    /// Sets the notice receiver for this connection.
1613    ///
1614    /// Server notices and warnings are passed to this callback instead of being
1615    /// logged. Pass `None` to restore default logging behavior.
1616    pub fn set_notice_receiver(
1617        &mut self,
1618        receiver: Option<hyperdb_api_core::client::NoticeReceiver>,
1619    ) {
1620        match &mut self.transport {
1621            Transport::Tcp(tcp) => tcp.client.set_notice_receiver(receiver),
1622            Transport::Grpc(_) => {} // gRPC doesn't support notice receivers
1623        }
1624    }
1625
1626    /// Cancels the currently executing query (thread-safe).
1627    ///
1628    /// # Errors
1629    ///
1630    /// - Returns [`Error::Other`] on gRPC connections — cancellation is not
1631    ///   yet implemented for gRPC transport.
1632    /// - Returns [`Error::Client`] or [`Error::Io`] if the separate
1633    ///   cancel-request connection to the server fails.
1634    pub fn cancel(&self) -> Result<()> {
1635        match &self.transport {
1636            Transport::Tcp(tcp) => tcp.client.cancel().map_err(Error::from),
1637            Transport::Grpc(_) => Err(Error::new(
1638                "Query cancellation is not yet supported for gRPC connections.",
1639            )),
1640        }
1641    }
1642
1643    /// Closes the connection, detaching all databases first.
1644    ///
1645    /// # Errors
1646    ///
1647    /// - Returns [`Error::Other`] wrapping the underlying close failure
1648    ///   (its `source` is the transport error) if the client cannot be
1649    ///   shut down cleanly.
1650    /// - Returns [`Error::Other`] wrapping the detach failure if the
1651    ///   attached database could not be detached but close itself
1652    ///   succeeded.
1653    pub fn close(self) -> Result<()> {
1654        // Detach the attached database to ensure files are flushed and released.
1655        // Always attempt close, even if detach fails.
1656        let detach_err = if let Some(ref db_path) = self.database {
1657            let db_alias = std::path::Path::new(db_path)
1658                .file_stem()
1659                .and_then(|s| s.to_str())
1660                .unwrap_or("db");
1661            self.execute_command(&format!("DETACH DATABASE {}", escape_sql_path(db_alias)))
1662                .err()
1663        } else {
1664            None
1665        };
1666
1667        // Always attempt to close the client to release the connection.
1668        let close_result = match self.transport {
1669            Transport::Tcp(tcp) => tcp.client.close(),
1670            Transport::Grpc(_) => Ok(()), // gRPC connections are stateless
1671        };
1672
1673        if let Err(e) = close_result {
1674            return Err(Error::with_cause("Failed to close connection", e));
1675        }
1676
1677        if let Some(e) = detach_err {
1678            // Detach failed but close succeeded; surface the detach error.
1679            return Err(Error::with_cause(
1680                "Failed to detach database during close",
1681                e,
1682            ));
1683        }
1684
1685        Ok(())
1686    }
1687
1688    /// Unloads the database from memory while keeping the connection active.
1689    ///
1690    /// This executes the `UNLOAD DATABASE` command, which releases the database
1691    /// from memory but keeps the session and connection open. The database can
1692    /// be accessed again by subsequent queries that will automatically reload it.
1693    ///
1694    /// This is useful for releasing memory locks when switching between databases
1695    /// or when working with multiple database files.
1696    ///
1697    /// # Example
1698    ///
1699    /// ```no_run
1700    /// use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
1701    ///
1702    /// fn main() -> Result<()> {
1703    ///     let hyper = HyperProcess::new(None, None)?;
1704    ///     let conn = Connection::new(&hyper, "test.hyper", CreateMode::Create)?;
1705    ///     
1706    ///     // Do some work with the database
1707    ///     conn.execute_command("CREATE TABLE test (id INT)")?;
1708    ///     
1709    ///     // Unload from memory (but keep connection)
1710    ///     conn.unload_database()?;
1711    ///     
1712    ///     // Database can still be accessed (will be reloaded automatically)
1713    ///     let count: i64 = conn.fetch_scalar("SELECT COUNT(*) FROM test")?;
1714    ///     println!("Count: {}", count);
1715    ///     
1716    ///     Ok(())
1717    /// }
1718    /// ```
1719    ///
1720    /// # Errors
1721    ///
1722    /// Returns [`Error::Client`] if the server rejects the `UNLOAD DATABASE`
1723    /// command (e.g. the database is still in use by another session).
1724    pub fn unload_database(&self) -> Result<()> {
1725        self.execute_command("UNLOAD DATABASE")?;
1726        Ok(())
1727    }
1728
1729    /// Releases the database completely from the session.
1730    ///
1731    /// This executes the `UNLOAD RELEASE` command, which completely releases
1732    /// the database from the session. After this call, the database cannot
1733    /// be accessed until a new connection is established.
1734    ///
1735    /// This is useful for completely freeing database resources when you're
1736    /// done with a database and want to ensure no locks are held.
1737    ///
1738    /// **Note:** This should only be used when the session has exactly one
1739    /// database attached. Hyper does not support `UNLOAD RELEASE` with
1740    /// multiple databases attached to the same session.
1741    ///
1742    /// # Example
1743    ///
1744    /// ```no_run
1745    /// use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
1746    ///
1747    /// fn main() -> Result<()> {
1748    ///     let hyper = HyperProcess::new(None, None)?;
1749    ///     let conn = Connection::new(&hyper, "test.hyper", CreateMode::Create)?;
1750    ///     
1751    ///     // Do some work with the database
1752    ///     conn.execute_command("CREATE TABLE test (id INT)")?;
1753    ///     
1754    ///     // Release database completely from session
1755    ///     conn.unload_release()?;
1756    ///     
1757    ///     // Database cannot be accessed after this point without new connection
1758    ///     // conn.execute_command("SELECT * FROM test")?; // This would fail
1759    ///     
1760    ///     Ok(())
1761    /// }
1762    /// ```
1763    ///
1764    /// # Errors
1765    ///
1766    /// Returns [`Error::Client`] if the server rejects `UNLOAD RELEASE`, most
1767    /// commonly because multiple databases are attached to the same session
1768    /// (Hyper only supports `UNLOAD RELEASE` with exactly one attached DB).
1769    pub fn unload_release(&self) -> Result<()> {
1770        self.execute_command("UNLOAD RELEASE")?;
1771        Ok(())
1772    }
1773
1774    // =========================================================================
1775    // Query Statistics
1776    // =========================================================================
1777
1778    /// Enables query statistics collection for this connection.
1779    ///
1780    /// After enabling, each `execute_command()` or `execute_query()` call will
1781    /// capture detailed performance metrics from Hyper. Retrieve them via
1782    /// [`last_query_stats()`](Self::last_query_stats).
1783    ///
1784    /// The provider determines how stats are collected. Use
1785    /// [`LogFileStatsProvider`](crate::LogFileStatsProvider) to parse Hyper's log file (requires local
1786    /// `hyperd.log`), or implement a custom [`QueryStatsProvider`](crate::QueryStatsProvider).
1787    ///
1788    /// # Example
1789    ///
1790    /// ```no_run
1791    /// # use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
1792    /// # fn main() -> Result<()> {
1793    /// # let hyper = HyperProcess::new(None, None)?;
1794    /// # let mut conn = Connection::new(&hyper, "test.hyper", CreateMode::CreateIfNotExists)?;
1795    /// use hyperdb_api::LogFileStatsProvider;
1796    ///
1797    /// // Auto-detect log path from HyperProcess
1798    /// conn.enable_query_stats(LogFileStatsProvider::from_process(&hyper));
1799    ///
1800    /// // Or specify an explicit log path
1801    /// // conn.enable_query_stats(LogFileStatsProvider::new("/path/to/hyperd.log"));
1802    /// # Ok(())
1803    /// # }
1804    /// ```
1805    pub fn enable_query_stats(&mut self, provider: impl QueryStatsProvider + 'static) {
1806        self.stats_provider = Some(Arc::new(provider));
1807    }
1808
1809    /// Disables query statistics collection.
1810    ///
1811    /// After calling this, `last_query_stats()` will return `None`.
1812    pub fn disable_query_stats(&mut self) {
1813        self.stats_provider = None;
1814        if let Ok(mut guard) = self.pending_stats.lock() {
1815            *guard = None;
1816        }
1817    }
1818
1819    /// Returns the query statistics from the most recent query execution.
1820    ///
1821    /// Stats are resolved **lazily** — the log file is read when this method
1822    /// is called, not when the query executes. This is important for streaming
1823    /// queries (`execute_query`), where Hyper writes the execution stats only
1824    /// after the result set is fully consumed.
1825    ///
1826    /// **Call this after consuming the result set** (e.g., after `collect_rows()`,
1827    /// iterating all chunks, or dropping the `Rowset`).
1828    ///
1829    /// Returns `None` if:
1830    /// - Query stats collection is not enabled
1831    /// - No query has been executed yet
1832    /// - Stats could not be found for the last query (e.g., log entry not matched)
1833    ///
1834    /// # Example
1835    ///
1836    /// ```no_run
1837    /// # use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
1838    /// # fn main() -> Result<()> {
1839    /// # let hyper = HyperProcess::new(None, None)?;
1840    /// # let mut conn = Connection::new(&hyper, "test.hyper", CreateMode::CreateIfNotExists)?;
1841    /// # use hyperdb_api::LogFileStatsProvider;
1842    /// # conn.enable_query_stats(LogFileStatsProvider::from_process(&hyper));
1843    /// conn.execute_command("CREATE TABLE t (id INT)")?;
1844    ///
1845    /// if let Some(stats) = conn.last_query_stats() {
1846    ///     println!("Total: {}s", stats.elapsed_s);
1847    ///     if let Some(ref pre) = stats.pre_execution {
1848    ///         println!("  Parse: {:?}s", pre.parsing_time_s);
1849    ///         println!("  Compile: {:?}s", pre.compilation_time_s);
1850    ///     }
1851    ///     if let Some(ref exec) = stats.execution {
1852    ///         println!("  Execute: {:?}s", exec.elapsed_s);
1853    ///         println!("  Peak mem: {:?} MB", exec.peak_memory_mb);
1854    ///     }
1855    /// }
1856    /// # Ok(())
1857    /// # }
1858    /// ```
1859    pub fn last_query_stats(&self) -> Option<QueryStats> {
1860        let provider = self.stats_provider.as_ref()?;
1861        let mut guard = self.pending_stats.lock().ok()?;
1862        let (token, sql) = guard.take()?;
1863        provider.after_query(token, &sql)
1864    }
1865
1866    /// Internal: call provider's `before_query` if stats are enabled.
1867    fn stats_before_query(&self, sql: &str) -> Option<Box<dyn Any + Send>> {
1868        self.stats_provider.as_ref().map(|p| p.before_query(sql))
1869    }
1870
1871    /// Internal: store the pending token+sql for lazy resolution.
1872    fn stats_store_pending(&self, token: Option<Box<dyn Any + Send>>, sql: &str) {
1873        if let Some(token) = token {
1874            if let Ok(mut guard) = self.pending_stats.lock() {
1875                *guard = Some((token, sql.to_string()));
1876            }
1877        }
1878    }
1879}
1880
1881impl Connection {
1882    // =========================================================================
1883    // Transaction Control
1884    // =========================================================================
1885
1886    /// Begins an explicit transaction.
1887    ///
1888    /// For an RAII guard that auto-rolls back on drop, use [`transaction()`](Self::transaction) instead.
1889    ///
1890    /// # Example
1891    ///
1892    /// ```no_run
1893    /// # use hyperdb_api::{Connection, CreateMode, Result};
1894    /// # fn main() -> Result<()> {
1895    /// # let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
1896    /// conn.begin_transaction()?;
1897    /// conn.execute_command("INSERT INTO users VALUES (1, 'Alice')")?;
1898    /// conn.commit()?;
1899    /// # Ok(())
1900    /// # }
1901    /// ```
1902    ///
1903    /// # Errors
1904    ///
1905    /// Returns [`Error::Client`] if the server rejects `BEGIN TRANSACTION`
1906    /// (e.g. a transaction is already open on this session).
1907    pub fn begin_transaction(&self) -> Result<()> {
1908        self.execute_command("BEGIN TRANSACTION")?;
1909        Ok(())
1910    }
1911
1912    /// Commits the current transaction.
1913    ///
1914    /// # Errors
1915    ///
1916    /// Returns [`Error::Client`] if the server rejects `COMMIT` — most
1917    /// commonly because no transaction is currently open.
1918    pub fn commit(&self) -> Result<()> {
1919        self.execute_command("COMMIT")?;
1920        Ok(())
1921    }
1922
1923    /// Rolls back the current transaction.
1924    ///
1925    /// # Errors
1926    ///
1927    /// Returns [`Error::Client`] if the server rejects `ROLLBACK` — most
1928    /// commonly because no transaction is currently open.
1929    pub fn rollback(&self) -> Result<()> {
1930        self.execute_command("ROLLBACK")?;
1931        Ok(())
1932    }
1933
1934    /// Starts a transaction and returns an RAII guard that auto-rolls back on drop.
1935    ///
1936    /// The returned [`Transaction`](crate::Transaction) exclusively borrows this connection,
1937    /// preventing any other use of the connection while the transaction is active.
1938    /// This is enforced at compile time by Rust's borrow checker. The guard provides
1939    /// `commit()` and `rollback()` methods. If dropped without calling either, the
1940    /// transaction is automatically rolled back.
1941    ///
1942    /// # Example
1943    ///
1944    /// ```no_run
1945    /// # use hyperdb_api::{Connection, CreateMode, Result};
1946    /// # fn main() -> Result<()> {
1947    /// # let mut conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
1948    /// let txn = conn.transaction()?;
1949    /// txn.execute_command("INSERT INTO users VALUES (1, 'Alice')")?;
1950    /// txn.commit()?; // or drop `txn` to auto-rollback
1951    /// # Ok(())
1952    /// # }
1953    /// ```
1954    ///
1955    /// # Errors
1956    ///
1957    /// Returns [`Error::Client`] if the server rejects the `BEGIN`
1958    /// statement issued internally by
1959    /// [`Transaction::new`](crate::Transaction).
1960    pub fn transaction(&mut self) -> Result<crate::Transaction<'_>> {
1961        crate::Transaction::new(self)
1962    }
1963}
1964
1965/// Checks if an error indicates an "already exists" condition based on SQLSTATE codes.
1966///
1967/// This function uses `PostgreSQL` SQLSTATE codes to reliably detect duplicate object errors
1968/// regardless of server locale or message formatting. The codes checked are:
1969/// - `42P04`: Database already exists
1970/// - `42710`: Duplicate object
1971/// - `42P06`: Duplicate schema
1972/// - `42P07`: Duplicate table
1973///
1974/// See: <https://www.postgresql.org/docs/current/errcodes-appendix.html>
1975fn is_already_exists_error(err: &Error) -> bool {
1976    err.sqlstate()
1977        .is_some_and(|code| matches!(code, "42P04" | "42710" | "42P06" | "42P07"))
1978}