Skip to main content

hyperdb_api/
connection.rs

1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Database connection management.
5//!
6//! The [`Connection`] type provides a unified interface for connecting to Hyper
7//! databases via either TCP (`PostgreSQL` wire protocol) or gRPC transport.
8//! The transport is automatically detected from the endpoint URL:
9//!
10//! - `https://` or `http://` → gRPC transport
11//! - Otherwise → TCP transport (e.g., `localhost:7483`)
12
13use std::path::Path;
14
15use hyperdb_api_core::client::Client;
16
17use crate::error::{Error, Result};
18use crate::names::escape_sql_path;
19use crate::process::HyperProcess;
20use crate::result::{Row, Rowset, DEFAULT_BINARY_CHUNK_SIZE};
21use crate::transport::Transport;
22
23use std::any::Any;
24use std::sync::{Arc, Mutex};
25
26use crate::query_stats::{QueryStats, QueryStatsProvider};
27
28/// Trait for types that can be extracted from a scalar query result.
29///
30/// This trait enables the generic [`Connection::execute_scalar_query`] method,
31/// similar to C++'s `executeScalarQuery<T>()` template.
32///
33/// # Implementing Custom Types
34///
35/// You can implement this trait for custom types to use them with `execute_scalar_query`:
36///
37/// ```no_run
38/// # use hyperdb_api::{Row, ScalarValue};
39/// # struct MyType;
40/// # impl MyType { fn parse(s: &str) -> Self { MyType } }
41/// impl ScalarValue for MyType {
42///     fn from_row(row: &Row, col: usize) -> Option<Self> {
43///         row.get_string(col).map(|s| MyType::parse(&s))
44///     }
45/// }
46/// ```
47pub trait ScalarValue: Sized {
48    /// Extracts a value of this type from a row at the given column.
49    fn from_row(row: &Row, col: usize) -> Option<Self>;
50}
51
52impl ScalarValue for i64 {
53    fn from_row(row: &Row, col: usize) -> Option<Self> {
54        row.get_i64(col)
55    }
56}
57
58impl ScalarValue for i32 {
59    fn from_row(row: &Row, col: usize) -> Option<Self> {
60        row.get_i32(col)
61    }
62}
63
64impl ScalarValue for i16 {
65    fn from_row(row: &Row, col: usize) -> Option<Self> {
66        row.get_i16(col)
67    }
68}
69
70impl ScalarValue for f64 {
71    fn from_row(row: &Row, col: usize) -> Option<Self> {
72        row.get_f64(col)
73    }
74}
75
76impl ScalarValue for bool {
77    fn from_row(row: &Row, col: usize) -> Option<Self> {
78        row.get_bool(col)
79    }
80}
81
82impl ScalarValue for String {
83    fn from_row(row: &Row, col: usize) -> Option<Self> {
84        row.get_string(col)
85    }
86}
87
88/// Database creation mode when connecting.
89#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
90pub enum CreateMode {
91    /// Do not create the database. Method will fail if database doesn't exist.
92    #[default]
93    DoNotCreate,
94    /// Create the database. Method will fail if the database already exists.
95    Create,
96    /// Create the database if it doesn't exist.
97    CreateIfNotExists,
98    /// Create the database. If it already exists, drop the old one first.
99    CreateAndReplace,
100}
101
102/// A connection to a Hyper database.
103///
104/// This struct represents an active connection to a Hyper server and optionally
105/// an attached database. The connection is automatically closed when dropped.
106///
107/// # Transport Auto-Detection
108///
109/// The transport is automatically detected from the endpoint URL:
110/// - `https://` or `http://` → gRPC transport (read-only until server supports writes)
111/// - Otherwise → TCP transport (full read/write support)
112///
113/// # CSV / Text Import & Export
114///
115/// For CSV, TSV, and other delimited-text formats, see the [`copy`](crate::copy)
116/// module which provides [`export_csv()`](Self::export_csv),
117/// [`import_csv()`](Self::import_csv), and related methods on this struct.
118///
119/// # Example
120///
121/// ```no_run
122/// use hyperdb_api::{Connection, CreateMode, Result};
123///
124/// fn main() -> Result<()> {
125///     // TCP connection (full read/write)
126///     let conn = Connection::connect("localhost:7483", "example.hyper", CreateMode::CreateIfNotExists)?;
127///
128///     // Execute SQL commands
129///     conn.execute_command("CREATE TABLE test (id INT, name TEXT)")?;
130///     conn.execute_command("INSERT INTO test VALUES (1, 'Hello')")?;
131///
132///     Ok(())
133/// }
134/// ```
135///
136/// ```no_run
137/// # use hyperdb_api::{Connection, CreateMode, Result};
138/// # fn example() -> Result<()> {
139/// // gRPC connection (read-only, auto-detected from URL)
140/// let conn = Connection::connect(
141///     "https://hyper-server.example.com:443",
142///     "example.hyper",
143///     CreateMode::DoNotCreate,  // Must be DoNotCreate for gRPC
144/// )?;
145/// # Ok(())
146/// # }
147/// ```
148pub struct Connection {
149    transport: Transport,
150    database: Option<String>,
151    stats_provider: Option<Arc<dyn QueryStatsProvider>>,
152    /// Pending stats token + SQL from the most recent query, resolved lazily.
153    pending_stats: Mutex<Option<(Box<dyn Any + Send>, String)>>,
154}
155
156impl std::fmt::Debug for Connection {
157    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158        f.debug_struct("Connection")
159            .field("database", &self.database)
160            .finish_non_exhaustive()
161    }
162}
163
164impl Connection {
165    /// Creates a new connection to a Hyper instance with a database.
166    ///
167    /// This is the primary way to connect to a running [`HyperProcess`].
168    ///
169    /// # Arguments
170    ///
171    /// * `instance` - The Hyper server instance to connect to.
172    /// * `database_path` - Path to the database file.
173    /// * `create_mode` - How to handle database creation.
174    ///
175    /// # Errors
176    ///
177    /// Returns an error if the connection could not be established.
178    ///
179    /// # Example
180    ///
181    /// ```no_run
182    /// use hyperdb_api::{HyperProcess, Connection, CreateMode, Result};
183    ///
184    /// fn main() -> Result<()> {
185    ///     let hyper = HyperProcess::new(None, None)?;
186    ///     let conn = Connection::new(&hyper, "database.hyper", CreateMode::CreateIfNotExists)?;
187    ///     Ok(())
188    /// }
189    /// ```
190    pub fn new(
191        instance: &HyperProcess,
192        database_path: impl AsRef<Path>,
193        create_mode: CreateMode,
194    ) -> Result<Self> {
195        // Prefer using the connection_endpoint which properly handles UDS/Named Pipes
196        if let Some(conn_endpoint) = instance.connection_endpoint() {
197            return Self::connect_with_endpoint(
198                conn_endpoint,
199                &database_path.as_ref().to_string_lossy(),
200                create_mode,
201            );
202        }
203
204        // Fall back to string endpoint (TCP)
205        let endpoint = instance.require_endpoint()?;
206        Self::connect(
207            endpoint,
208            &database_path.as_ref().to_string_lossy(),
209            create_mode,
210        )
211    }
212
213    /// Connects using a `ConnectionEndpoint` (supports TCP, UDS, and Named Pipes).
214    fn connect_with_endpoint(
215        endpoint: &hyperdb_api_core::client::ConnectionEndpoint,
216        database_path: &str,
217        create_mode: CreateMode,
218    ) -> Result<Self> {
219        let db_path_str = Some(database_path.to_string());
220
221        let config = hyperdb_api_core::client::Config::new().with_user("tableau_internal_user");
222
223        let client = hyperdb_api_core::client::Client::connect_endpoint(endpoint, &config)?;
224
225        let conn = Connection::from_client(client, db_path_str.clone());
226
227        // Handle database creation
228        if let Some(db_path) = db_path_str {
229            conn.handle_creation_mode(&db_path, create_mode)?;
230            conn.attach_and_set_path(&db_path)?;
231        }
232
233        Ok(conn)
234    }
235
236    /// Connects to a Hyper server and optionally attaches a database.
237    ///
238    /// # Arguments
239    ///
240    /// * `endpoint` - The server endpoint (host:port).
241    /// * `database_path` - Path to the database file.
242    /// * `create_mode` - How to handle database creation.
243    ///
244    /// # Errors
245    ///
246    /// Returns an error if the connection could not be established.
247    pub fn connect(endpoint: &str, database_path: &str, create_mode: CreateMode) -> Result<Self> {
248        crate::ConnectionBuilder::new(endpoint)
249            .database(database_path)
250            .create_mode(create_mode)
251            .build()
252    }
253
254    /// Returns a connection builder for advanced configuration.
255    ///
256    /// This is useful when you need to set authentication, timeouts, or
257    /// other advanced options before connecting.
258    #[must_use]
259    pub fn builder(endpoint: &str) -> crate::ConnectionBuilder {
260        crate::ConnectionBuilder::new(endpoint)
261    }
262
263    /// Creates a Connection from a low-level Client (internal use, TCP only).
264    pub(crate) fn from_client(client: Client, database: Option<String>) -> Self {
265        Connection {
266            transport: Transport::Tcp(Box::new(crate::transport::TcpTransport { client })),
267            database,
268            stats_provider: None,
269            pending_stats: Mutex::new(None),
270        }
271    }
272
273    /// Creates a Connection from a Transport (internal use).
274    #[allow(
275        dead_code,
276        reason = "used by ConnectionBuilder for the gRPC path; not reached under non-gRPC feature builds"
277    )]
278    pub(crate) fn from_transport(transport: Transport, database: Option<String>) -> Self {
279        Connection {
280            transport,
281            database,
282            stats_provider: None,
283            pending_stats: Mutex::new(None),
284        }
285    }
286
287    /// Returns the transport type name (e.g., "TCP", "gRPC", "Unix Socket").
288    pub fn transport_type(&self) -> &'static str {
289        self.transport.transport_type().as_str()
290    }
291
292    /// Returns true if this connection supports write operations.
293    ///
294    /// Currently, only TCP connections support writes. gRPC connections are
295    /// read-only until the server supports write operations over gRPC.
296    pub fn supports_writes(&self) -> bool {
297        self.transport.supports_writes()
298    }
299
300    /// Handles database creation logic (internal use).
301    pub(crate) fn handle_creation_mode(
302        &self,
303        database_path: &str,
304        create_mode: CreateMode,
305    ) -> Result<()> {
306        match create_mode {
307            CreateMode::DoNotCreate => {}
308            CreateMode::Create => {
309                self.execute_command(&format!(
310                    "CREATE DATABASE {}",
311                    escape_sql_path(database_path)
312                ))?;
313            }
314            CreateMode::CreateIfNotExists => {
315                if let Err(e) = self.execute_command(&format!(
316                    "CREATE DATABASE IF NOT EXISTS {}",
317                    escape_sql_path(database_path)
318                )) {
319                    if !is_already_exists_error(&e) {
320                        return Err(Error::internal(format!(
321                            "Failed to create database '{database_path}': {e}"
322                        )));
323                    }
324                }
325            }
326            CreateMode::CreateAndReplace => {
327                let _ = self.execute_command(&format!(
328                    "DROP DATABASE IF EXISTS {}",
329                    escape_sql_path(database_path)
330                ));
331                self.execute_command(&format!(
332                    "CREATE DATABASE {}",
333                    escape_sql_path(database_path)
334                ))?;
335            }
336        }
337        Ok(())
338    }
339
340    /// Attaches and sets the database path (internal use).
341    pub(crate) fn attach_and_set_path(&self, database_path: &str) -> Result<()> {
342        let db_alias = std::path::Path::new(database_path)
343            .file_stem()
344            .and_then(|s| s.to_str())
345            .unwrap_or("db");
346
347        self.execute_command(&format!(
348            "ATTACH DATABASE {} AS {}",
349            escape_sql_path(database_path),
350            escape_sql_path(db_alias)
351        ))?;
352
353        self.execute_command(&format!(
354            "SET search_path TO {}, public",
355            escape_sql_path(db_alias)
356        ))?;
357
358        Ok(())
359    }
360
361    /// Connects to a Hyper server with authentication.
362    ///
363    /// # Arguments
364    ///
365    /// * `endpoint` - The server endpoint (host:port).
366    /// * `database_path` - Path to the database file.
367    /// * `create_mode` - How to handle database creation.
368    /// * `user` - Username for authentication.
369    /// * `password` - Password for authentication.
370    ///
371    /// # Errors
372    ///
373    /// Returns an error if the connection or authentication fails.
374    pub fn connect_with_auth(
375        endpoint: &str,
376        database_path: &str,
377        create_mode: CreateMode,
378        user: &str,
379        password: &str,
380    ) -> Result<Self> {
381        crate::ConnectionBuilder::new(endpoint)
382            .database(database_path)
383            .create_mode(create_mode)
384            .user(user.to_string())
385            .password(password)
386            .build()
387    }
388
389    /// Creates a connection to a Hyper server without attaching a database.
390    ///
391    /// # Errors
392    ///
393    /// Returns [`Error::Connection`] if the TCP or gRPC handshake fails, and
394    /// [`Error::Io`] if the endpoint cannot be reached.
395    pub fn without_database(endpoint: &str) -> Result<Self> {
396        crate::ConnectionBuilder::new(endpoint).build()
397    }
398
399    /// Executes a SQL command that doesn't return results.
400    ///
401    /// Use this for DDL statements (CREATE, ALTER, DROP) and DML statements
402    /// (INSERT, UPDATE, DELETE).
403    ///
404    /// # Arguments
405    ///
406    /// * `command` - The SQL command to execute.
407    ///
408    /// # Returns
409    ///
410    /// The number of affected rows, or 0 if not applicable.
411    ///
412    /// # Errors
413    ///
414    /// Returns an error if:
415    /// - The connection is using gRPC transport (write operations not yet supported)
416    /// - The command fails to execute
417    pub fn execute_command(&self, command: &str) -> Result<u64> {
418        let token = self.stats_before_query(command);
419
420        let result = self.transport.execute_command(command);
421
422        // For commands, the query is fully executed synchronously, so we
423        // can store the pending token immediately for lazy resolution.
424        self.stats_store_pending(token, command);
425
426        result
427    }
428
429    /// Executes a SQL query and returns a streaming result set.
430    ///
431    /// Results are streamed in chunks (default 64K rows), keeping memory usage
432    /// constant regardless of result set size. This makes it safe for any
433    /// result size, from a single row to billions of rows.
434    ///
435    /// # Example
436    ///
437    /// ```no_run
438    /// # use hyperdb_api::{Connection, Result};
439    /// # fn example(conn: &Connection) -> Result<()> {
440    /// let mut result = conn.execute_query("SELECT id, value FROM measurements")?;
441    /// while let Some(chunk) = result.next_chunk()? {
442    ///     for row in &chunk {
443    ///         // Generic typed access (like C++ row.get<T>())
444    ///         let id: Option<i32> = row.get(0);
445    ///         let value: Option<f64> = row.get(1);
446    ///
447    ///         // Or direct accessors
448    ///         let id = row.get_i32(0);
449    ///         let value = row.get_f64(1);
450    ///     }
451    /// }
452    /// # Ok(())
453    /// # }
454    /// ```
455    ///
456    /// # Memory Behavior
457    ///
458    /// - Only one chunk is held in memory at a time (~few MB for 64K rows)
459    /// - Safe for result sets of any size (millions/billions of rows)
460    /// - Memory usage is `O(chunk_size)`, not `O(total_rows)`
461    ///
462    /// # Errors
463    ///
464    /// - Returns [`Error::Server`] wrapping a `hyperdb_api_core::client::Error` if the
465    ///   SQL fails to parse, execute, or if the server reports an error
466    ///   while streaming.
467    /// - Returns [`Error::Io`] on transport-level I/O failures.
468    pub fn execute_query(&self, query: &str) -> Result<Rowset<'_>> {
469        let token = self.stats_before_query(query);
470
471        let result = match &self.transport {
472            Transport::Tcp(tcp) => {
473                let stream = tcp
474                    .client
475                    .query_streaming(query, DEFAULT_BINARY_CHUNK_SIZE)?;
476                Ok(Rowset::new(stream))
477            }
478            Transport::Grpc(grpc) => {
479                // gRPC streaming: pull chunks lazily so peak memory is
480                // bounded by one gRPC message (tonic default 64 MB), not
481                // by the full result size. Matches TCP's
482                // constant-memory streaming shape.
483                //
484                // The transport module already creates a fresh gRPC client
485                // per query (gRPC client needs &mut self to execute), so we
486                // do the same here: connect, start the stream, wrap as a
487                // `ChunkSource`. The stream keeps the channel and runtime
488                // alive via refcounted handles inside `GrpcChunkStreamSync`.
489                let mut client =
490                    hyperdb_api_core::client::grpc::GrpcClientSync::connect(grpc.config.clone())?;
491                let stream = client.execute_query_stream(query)?;
492                let source = Box::new(crate::grpc_connection::GrpcChunkStreamSource::new(stream));
493                let arrow_rowset = crate::arrow_result::ArrowRowset::from_stream(source)?;
494                Ok(Rowset::from_arrow(arrow_rowset))
495            }
496        };
497
498        // Store the pending token — Hyper logs the execution stats after the
499        // result is consumed (streamed), so we defer resolution until
500        // last_query_stats() is called.
501        self.stats_store_pending(token, query);
502
503        result
504    }
505
506    // =========================================================================
507    // Arrow Format Queries
508    // =========================================================================
509
510    /// Executes a SELECT query and returns results as Arrow IPC stream bytes.
511    ///
512    /// # Example
513    ///
514    /// ```no_run
515    /// use hyperdb_api::{Connection, CreateMode, Result};
516    ///
517    /// fn main() -> Result<()> {
518    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
519    ///
520    ///     // Create and populate a table
521    ///     conn.execute_command("CREATE TABLE data (id INT, value DOUBLE PRECISION)")?;
522    ///     conn.execute_command("INSERT INTO data VALUES (1, 1.5), (2, 2.5)")?;
523    ///
524    ///     // Get results as Arrow IPC stream
525    ///     let arrow_data = conn.execute_query_to_arrow("SELECT * FROM data")?;
526    ///     println!("Got {} bytes of Arrow IPC data", arrow_data.len());
527    ///
528    ///     Ok(())
529    /// }
530    /// ```
531    ///
532    /// # Errors
533    ///
534    /// Propagates any [`Error::Server`] from the TCP or gRPC transport when
535    /// the query fails or the server cannot produce Arrow IPC output.
536    pub fn execute_query_to_arrow(&self, select_query: &str) -> Result<bytes::Bytes> {
537        self.transport.execute_query_to_arrow(select_query)
538    }
539
540    /// Exports an entire table to Arrow IPC stream format.
541    ///
542    /// This is a convenience method equivalent to
543    /// `execute_query_to_arrow("SELECT * FROM table_name")`.
544    ///
545    /// # Arguments
546    ///
547    /// * `table_name` - The table name
548    ///
549    /// # Returns
550    ///
551    /// Raw Arrow IPC stream bytes containing all rows from the table.
552    ///
553    /// # Example
554    ///
555    /// ```no_run
556    /// use hyperdb_api::{Connection, CreateMode, Result};
557    ///
558    /// fn main() -> Result<()> {
559    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
560    ///     let arrow_data = conn.export_table_to_arrow("my_table")?;
561    ///     Ok(())
562    /// }
563    /// ```
564    ///
565    /// # Errors
566    ///
567    /// Returns whatever [`execute_query_to_arrow`](Self::execute_query_to_arrow)
568    /// would return for `SELECT * FROM <table_name>` — typically
569    /// [`Error::Server`] if the table does not exist or the query is rejected.
570    pub fn export_table_to_arrow(&self, table_name: &str) -> Result<bytes::Bytes> {
571        self.execute_query_to_arrow(&format!("SELECT * FROM {table_name}"))
572    }
573
574    /// Executes a SELECT query and returns results as Arrow `RecordBatch`es.
575    ///
576    /// This is the recommended method for Arrow-native workflows (`DataFusion`,
577    /// Polars, etc.) where you want direct `RecordBatch` access without going
578    /// through the `Row` abstraction.
579    ///
580    /// # Example
581    ///
582    /// ```no_run
583    /// use hyperdb_api::{Connection, CreateMode, Result};
584    /// use arrow::record_batch::RecordBatch;
585    ///
586    /// fn main() -> Result<()> {
587    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
588    ///
589    ///     let batches: Vec<RecordBatch> = conn.execute_query_to_batches("SELECT * FROM data")?;
590    ///     for batch in &batches {
591    ///         println!("batch: {} rows x {} cols", batch.num_rows(), batch.num_columns());
592    ///     }
593    ///     Ok(())
594    /// }
595    /// ```
596    ///
597    /// # Errors
598    ///
599    /// - Returns [`Error::Server`] if the query itself fails.
600    /// - Returns [`Error::Conversion`] if the Arrow IPC payload returned by the
601    ///   server is malformed and cannot be decoded into record batches.
602    pub fn execute_query_to_batches(
603        &self,
604        select_query: &str,
605    ) -> Result<Vec<arrow::record_batch::RecordBatch>> {
606        let arrow_data = self.execute_query_to_arrow(select_query)?;
607        crate::arrow_result::parse_arrow_ipc(arrow_data)
608    }
609
610    /// Fetches a single row from a query.
611    ///
612    /// Returns an error if the query returns no rows.
613    ///
614    /// # Example
615    ///
616    /// ```no_run
617    /// use hyperdb_api::{Connection, CreateMode, Result};
618    ///
619    /// fn main() -> Result<()> {
620    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
621    ///     let row = conn.fetch_one("SELECT * FROM users WHERE id = 1")?;
622    ///     let id: Option<i32> = row.get(0);
623    ///     let name: Option<String> = row.get(1);
624    ///     Ok(())
625    /// }
626    /// ```
627    ///
628    /// # Errors
629    ///
630    /// - Returns the error from [`execute_query`](Self::execute_query) if
631    ///   the query itself fails.
632    /// - Returns [`Error::Conversion`] with message `"Query returned no rows"` if
633    ///   the query produced zero rows.
634    pub fn fetch_one<Q>(&self, query: Q) -> Result<crate::Row>
635    where
636        Q: AsRef<str>,
637    {
638        let query = query.as_ref();
639        let result = self.execute_query(query)?;
640        result.require_first_row()
641    }
642
643    /// Fetches an optional single row from a query.
644    ///
645    /// Returns `None` if the query returns no rows.
646    ///
647    /// # Example
648    ///
649    /// ```no_run
650    /// use hyperdb_api::{Connection, CreateMode, Result};
651    ///
652    /// fn main() -> Result<()> {
653    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
654    ///     if let Some(row) = conn.fetch_optional("SELECT * FROM users WHERE id = 999")? {
655    ///         let name: Option<String> = row.get(1);
656    ///         println!("Found user: {:?}", name);
657    ///     }
658    ///     Ok(())
659    /// }
660    /// ```
661    ///
662    /// # Errors
663    ///
664    /// Returns the error from [`execute_query`](Self::execute_query) if the
665    /// query itself fails. An empty result set is not an error — it yields
666    /// `Ok(None)`.
667    pub fn fetch_optional<Q>(&self, query: Q) -> Result<Option<crate::Row>>
668    where
669        Q: AsRef<str>,
670    {
671        let query = query.as_ref();
672        let result = self.execute_query(query)?;
673        result.first_row()
674    }
675
676    /// Fetches all rows from a query.
677    ///
678    /// # Example
679    ///
680    /// ```no_run
681    /// use hyperdb_api::{Connection, CreateMode, Result};
682    ///
683    /// fn main() -> Result<()> {
684    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
685    ///     let rows = conn.fetch_all("SELECT * FROM users WHERE active = true ORDER BY name")?;
686    ///     for row in rows {
687    ///         let id: Option<i32> = row.get(0);
688    ///         let name: Option<String> = row.get(1);
689    ///         println!("User {}: {:?}", id.unwrap_or(-1), name);
690    ///     }
691    ///     Ok(())
692    /// }
693    /// ```
694    ///
695    /// # Errors
696    ///
697    /// Returns the error from [`execute_query`](Self::execute_query), or a
698    /// transport error produced while draining every chunk of the streamed
699    /// result set.
700    pub fn fetch_all<Q>(&self, query: Q) -> Result<Vec<crate::Row>>
701    where
702        Q: AsRef<str>,
703    {
704        let query = query.as_ref();
705        let result = self.execute_query(query)?;
706        result.collect_rows()
707    }
708
709    /// Fetches a single row and maps it to a struct using [`FromRow`](crate::FromRow).
710    ///
711    /// Returns an error if the query returns no rows or if mapping fails.
712    ///
713    /// # Example
714    ///
715    /// ```no_run
716    /// use hyperdb_api::{Connection, CreateMode, FromRow, RowAccessor, Result};
717    ///
718    /// struct User { id: i32, name: String }
719    ///
720    /// impl FromRow for User {
721    ///     fn from_row(row: RowAccessor<'_>) -> Result<Self> {
722    ///         Ok(User {
723    ///             id: row.get("id")?,
724    ///             name: row.get_opt("name")?.unwrap_or_default(),
725    ///         })
726    ///     }
727    /// }
728    ///
729    /// fn main() -> Result<()> {
730    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
731    ///     let user: User = conn.fetch_one_as("SELECT id, name FROM users WHERE id = 1")?;
732    ///     Ok(())
733    /// }
734    /// ```
735    ///
736    /// # Errors
737    ///
738    /// - Returns the error from [`fetch_one`](Self::fetch_one) if the query
739    ///   fails or returns no rows.
740    /// - Returns whatever error [`FromRow::from_row`](crate::FromRow::from_row)
741    ///   produces when the row cannot be mapped into `T`.
742    pub fn fetch_one_as<T: crate::FromRow>(&self, query: &str) -> Result<T> {
743        let row = self.fetch_one(query)?;
744        let indices = row
745            .schema()
746            .map(crate::row_accessor::RowAccessor::build_indices)
747            .unwrap_or_default();
748        T::from_row(crate::RowAccessor::new(&row, &indices))
749    }
750
751    /// Fetches all rows and maps them to structs using [`FromRow`](crate::FromRow).
752    ///
753    /// # Example
754    ///
755    /// ```no_run
756    /// # use hyperdb_api::{Connection, FromRow, RowAccessor, Result};
757    /// # struct User { id: i32, name: String }
758    /// # impl FromRow for User {
759    /// #     fn from_row(row: RowAccessor<'_>) -> Result<Self> {
760    /// #         Ok(User { id: row.get("id")?, name: row.get_opt("name")?.unwrap_or_default() })
761    /// #     }
762    /// # }
763    /// # fn example(conn: &Connection) -> Result<()> {
764    /// let users: Vec<User> = conn.fetch_all_as("SELECT id, name FROM users")?;
765    /// # Ok(())
766    /// # }
767    /// ```
768    ///
769    /// # Errors
770    ///
771    /// - Returns the error from [`fetch_all`](Self::fetch_all) if the query
772    ///   fails.
773    /// - Returns the first error produced by
774    ///   [`FromRow::from_row`](crate::FromRow::from_row) on any of the rows.
775    pub fn fetch_all_as<T: crate::FromRow>(&self, query: &str) -> Result<Vec<T>> {
776        let rows = self.fetch_all(query)?;
777        // Build the column-name → index lookup once from the first
778        // row's schema; reuse for every row. All rows in a result set
779        // share the same `Arc<ResultSchema>`, so this is safe.
780        let indices = rows
781            .first()
782            .and_then(crate::result::Row::schema)
783            .map(crate::row_accessor::RowAccessor::build_indices)
784            .unwrap_or_default();
785        rows.iter()
786            .map(|r| T::from_row(crate::RowAccessor::new(r, &indices)))
787            .collect()
788    }
789
790    /// Returns a lazy iterator over rows, mapping each to `T` via
791    /// [`FromRow`].
792    ///
793    /// This is the streaming variant of [`fetch_all_as`](Self::fetch_all_as):
794    /// memory usage is bounded by the chunk size (default 64K rows), not by
795    /// the total row count. Use this for large result sets where collecting
796    /// all rows into a `Vec` would exceed memory limits.
797    ///
798    /// The column-name → index lookup table is built exactly once (on the
799    /// first non-empty chunk) and reused for all rows, so per-row mapping is
800    /// O(1) in column count.
801    ///
802    /// # Example
803    ///
804    /// ```no_run
805    /// # use hyperdb_api::{Connection, CreateMode, FromRow, RowAccessor, Result};
806    /// # struct User { id: i32, name: String }
807    /// # impl FromRow for User {
808    /// #     fn from_row(row: RowAccessor<'_>) -> Result<Self> {
809    /// #         Ok(User { id: row.get("id")?, name: row.get("name")? })
810    /// #     }
811    /// # }
812    /// # fn example(conn: &Connection) -> Result<()> {
813    /// for row_result in conn.stream_as::<User>("SELECT id, name FROM users")? {
814    ///     let user = row_result?;
815    ///     println!("{}: {}", user.id, user.name);
816    /// }
817    /// # Ok(())
818    /// # }
819    /// ```
820    ///
821    /// # Errors
822    ///
823    /// - The returned `Result` wraps errors detected while *opening* the
824    ///   result stream — transport/connection failures, and (on the gRPC
825    ///   transport, which establishes the query stream eagerly) SQL parse and
826    ///   server errors. On the default TCP transport the query is streamed
827    ///   lazily, so SQL errors such as a missing table are typically reported
828    ///   as the **first yielded item** rather than by this outer `Result`.
829    /// - Each yielded item is itself a `Result<T>`:
830    ///   - `Ok(T)` if the row was successfully mapped via `FromRow`.
831    ///   - `Err(e)` for a server/transport error encountered while streaming a
832    ///     later chunk, or for a per-row mapping failure (missing column, type
833    ///     mismatch, NULL in a non-optional field).
834    ///
835    /// In short: always handle errors *both* on the outer `Result` and on each
836    /// item — do not assume a successfully-returned iterator means the query
837    /// succeeded.
838    ///
839    /// [`FromRow`]: crate::FromRow
840    pub fn stream_as<'a, T>(&'a self, query: &str) -> Result<impl Iterator<Item = Result<T>> + 'a>
841    where
842        T: crate::FromRow + 'a,
843    {
844        let rowset = self.execute_query(query)?;
845        Ok(crate::result::TypedRowIterator::<T>::new(rowset))
846    }
847
848    /// Fetches a single scalar value from a query.
849    ///
850    /// Returns an error if the query returns no rows or NULL.
851    ///
852    /// # Example
853    ///
854    /// ```no_run
855    /// use hyperdb_api::{Connection, CreateMode, Result};
856    ///
857    /// fn main() -> Result<()> {
858    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
859    ///     let count: i64 = conn.fetch_scalar("SELECT COUNT(*) FROM users")?;
860    ///     println!("User count: {}", count);
861    ///     Ok(())
862    /// }
863    /// ```
864    ///
865    /// # Errors
866    ///
867    /// - Returns the error from [`execute_query`](Self::execute_query) if
868    ///   the query itself fails.
869    /// - Returns [`Error::Conversion`] with message `"Query returned no rows"` if
870    ///   the query produced zero rows.
871    /// - Returns [`Error::Conversion`] with message `"Scalar query returned NULL"`
872    ///   if the single cell is SQL `NULL`.
873    pub fn fetch_scalar<T, Q>(&self, query: Q) -> Result<T>
874    where
875        T: crate::connection::ScalarValue + crate::result::RowValue,
876        Q: AsRef<str>,
877    {
878        let query = query.as_ref();
879        let result = self.execute_query(query)?;
880        result.require_scalar()
881    }
882
883    /// Fetches an optional scalar value from a query.
884    ///
885    /// Returns `None` if the query returns no rows or NULL.
886    ///
887    /// # Example
888    ///
889    /// ```no_run
890    /// use hyperdb_api::{Connection, CreateMode, Result};
891    ///
892    /// fn main() -> Result<()> {
893    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
894    ///     let max_id: Option<i32> = conn.fetch_optional_scalar("SELECT MAX(id) FROM users")?;
895    ///     println!("Max ID: {:?}", max_id);
896    ///     Ok(())
897    /// }
898    /// ```
899    ///
900    /// # Errors
901    ///
902    /// - Returns the error from [`execute_query`](Self::execute_query) if
903    ///   the query itself fails.
904    /// - Returns [`Error::Conversion`] with message `"Query returned no rows"` if
905    ///   the query produced zero rows. (An empty result is treated as an
906    ///   error here because we need at least one row to inspect; SQL `NULL`
907    ///   in the single cell yields `Ok(None)`.)
908    pub fn fetch_optional_scalar<T, Q>(&self, query: Q) -> Result<Option<T>>
909    where
910        T: crate::connection::ScalarValue + crate::result::RowValue,
911        Q: AsRef<str>,
912    {
913        let query = query.as_ref();
914        let result = self.execute_query(query)?;
915        result.scalar()
916    }
917
918    /// Executes a scalar query and returns a single value of type `T`.
919    ///
920    /// Alias for [`fetch_optional_scalar`](Self::fetch_optional_scalar) for C++ API compatibility.
921    ///
922    /// # Errors
923    ///
924    /// See [`fetch_optional_scalar`](Self::fetch_optional_scalar).
925    #[inline]
926    pub fn execute_scalar_query<T>(&self, query: &str) -> Result<Option<T>>
927    where
928        T: ScalarValue + crate::result::RowValue,
929    {
930        self.fetch_optional_scalar(query)
931    }
932
933    /// Queries for a count value, defaulting to 0 if NULL.
934    ///
935    /// This is optimized for COUNT queries which typically return 0
936    /// instead of NULL when there are no matching rows.
937    ///
938    /// # Example
939    ///
940    /// ```no_run
941    /// use hyperdb_api::{Connection, CreateMode, Result};
942    ///
943    /// fn main() -> Result<()> {
944    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
945    ///     let count = conn.query_count("SELECT COUNT(*) FROM users WHERE active = true")?;
946    ///     println!("Active users: {}", count);
947    ///     Ok(())
948    /// }
949    /// ```
950    ///
951    /// # Errors
952    ///
953    /// Returns the error from [`execute_query`](Self::execute_query) if the
954    /// query fails or produces no rows. SQL `NULL` is mapped to `0`, not an
955    /// error.
956    pub fn query_count(&self, query: &str) -> Result<i64> {
957        self.fetch_optional_scalar::<i64, _>(query)
958            .map(|opt| opt.unwrap_or(0))
959    }
960
961    // =========================================================================
962    // Parameterized Queries (SQL Injection Safe)
963    // =========================================================================
964
965    /// Executes a parameterized query, returning streaming results.
966    ///
967    /// This is safe to use with untrusted user input: parameters travel
968    /// through the extended query protocol (Parse/Bind/Execute) as
969    /// binary `HyperBinary` values and are never interpolated into the
970    /// SQL string. For repeated executions of the same SQL with different
971    /// values, prefer the explicit [`prepare`](Self::prepare) API — it
972    /// returns a reusable [`PreparedStatement`](crate::PreparedStatement)
973    /// that skips the Parse round-trip on every call.
974    ///
975    /// Under the hood, `query_params` is a one-shot
976    /// prepare+execute+close: it prepares an unnamed statement, binds
977    /// the parameters, starts streaming, and closes the statement when
978    /// the returned [`Rowset`] is dropped.
979    ///
980    /// # Arguments
981    ///
982    /// * `query` - The SQL query with parameter placeholders (`$1`, `$2`, etc.)
983    /// * `params` - Parameter values matching the placeholders
984    ///
985    /// # SQL Injection Prevention
986    ///
987    /// ```no_run
988    /// use hyperdb_api::{Connection, CreateMode, Result};
989    ///
990    /// fn search_users(conn: &Connection, user_input: &str) -> Result<()> {
991    ///     // DANGEROUS - vulnerable to SQL injection:
992    ///     // let query = format!("SELECT * FROM users WHERE name = '{}'", user_input);
993    ///
994    ///     // SAFE - parameterized query:
995    ///     let mut result = conn.query_params(
996    ///         "SELECT * FROM users WHERE name = $1",
997    ///         &[&user_input],
998    ///     )?;
999    ///
1000    ///     while let Some(chunk) = result.next_chunk()? {
1001    ///         for row in &chunk {
1002    ///             let id: Option<i32> = row.get(0);
1003    ///             let name: Option<String> = row.get(1);
1004    ///             println!("Found: {:?} - {:?}", id, name);
1005    ///         }
1006    ///     }
1007    ///     Ok(())
1008    /// }
1009    /// ```
1010    ///
1011    /// # Multiple Parameters
1012    ///
1013    /// ```no_run
1014    /// use hyperdb_api::{Connection, CreateMode, Result};
1015    ///
1016    /// fn main() -> Result<()> {
1017    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
1018    ///
1019    ///     // Multiple parameters of different types
1020    ///     let result = conn.query_params(
1021    ///         "SELECT * FROM orders WHERE customer_id = $1 AND total > $2",
1022    ///         &[&42i32, &100.0f64],
1023    ///     )?;
1024    ///     Ok(())
1025    /// }
1026    /// ```
1027    ///
1028    /// # Errors
1029    ///
1030    /// - Returns [`Error::FeatureNotSupported`] if the connection is using gRPC transport
1031    ///   (prepared statements are TCP-only).
1032    /// - Returns [`Error::Server`] if the server rejects the statement at
1033    ///   `Parse`, `Bind`, or `Execute` time, including on type-mismatch
1034    ///   between `params` and the inferred OIDs.
1035    /// - Returns [`Error::Io`] on transport-level I/O failures.
1036    pub fn query_params(
1037        &self,
1038        query: &str,
1039        params: &[&dyn crate::params::ToSqlParam],
1040    ) -> Result<Rowset<'_>> {
1041        // Implementation note: routes through the extended query protocol
1042        // via Parse/Bind/Execute so parameters travel in HyperBinary
1043        // format — no SQL escaping, full SQL-injection safety regardless of
1044        // parameter content. The statement handle is stashed inside the
1045        // returned Rowset so its Drop-time close_statement fires *after*
1046        // the rowset releases its connection lock (otherwise the close
1047        // would deadlock on the still-held mutex).
1048        let client = match &self.transport {
1049            Transport::Tcp(tcp) => &tcp.client,
1050            Transport::Grpc(_) => {
1051                return Err(Error::feature_not_supported(
1052                    "prepared statements are not supported over gRPC transport",
1053                ));
1054            }
1055        };
1056        let oids: Vec<crate::Oid> = params.iter().map(|p| p.sql_oid()).collect();
1057        let stmt = client.prepare_typed(query, &oids)?;
1058        let encoded: Vec<Option<Vec<u8>>> = params.iter().map(|p| p.encode_param()).collect();
1059        let stream =
1060            client.execute_streaming(&stmt, encoded, crate::result::DEFAULT_BINARY_CHUNK_SIZE)?;
1061        Ok(Rowset::from_prepared(stream).with_statement_guard(stmt))
1062    }
1063
1064    /// Executes a parameterized command that doesn't return rows.
1065    ///
1066    /// Use this for INSERT, UPDATE, DELETE, or DDL statements with parameters.
1067    /// Returns the number of affected rows.
1068    ///
1069    /// See [`query_params`](Self::query_params) for details on parameter
1070    /// handling and SQL injection prevention.
1071    ///
1072    /// # Example
1073    ///
1074    /// ```no_run
1075    /// use hyperdb_api::{Connection, CreateMode, Result};
1076    ///
1077    /// fn delete_user(conn: &Connection, user_id: i32) -> Result<u64> {
1078    ///     // Safe from SQL injection
1079    ///     conn.command_params("DELETE FROM users WHERE id = $1", &[&user_id])
1080    /// }
1081    /// ```
1082    ///
1083    /// # Errors
1084    ///
1085    /// - Returns [`Error::FeatureNotSupported`] if the connection is using gRPC transport.
1086    /// - Returns [`Error::Server`] if the server rejects the statement at
1087    ///   `Parse`, `Bind`, or `Execute` time.
1088    /// - Returns [`Error::Io`] on transport-level I/O failures.
1089    pub fn command_params(
1090        &self,
1091        query: &str,
1092        params: &[&dyn crate::params::ToSqlParam],
1093    ) -> Result<u64> {
1094        // One-shot prepare+execute with explicit OIDs — see `query_params`
1095        // for why we collect OIDs from each parameter.
1096        let client = match &self.transport {
1097            Transport::Tcp(tcp) => &tcp.client,
1098            Transport::Grpc(_) => {
1099                return Err(Error::feature_not_supported(
1100                    "prepared statements are not supported over gRPC transport",
1101                ));
1102            }
1103        };
1104        let oids: Vec<crate::Oid> = params.iter().map(|p| p.sql_oid()).collect();
1105        let stmt = client.prepare_typed(query, &oids)?;
1106        let encoded: Vec<Option<Vec<u8>>> = params.iter().map(|p| p.encode_param()).collect();
1107        Ok(client.execute_no_result(&stmt, encoded)?)
1108    }
1109
1110    /// Executes multiple SQL statements in a single call.
1111    ///
1112    /// Each statement is executed sequentially. If any statement fails,
1113    /// execution stops and the error is returned. Returns the total number
1114    /// of affected rows across all statements.
1115    ///
1116    /// This is more efficient than calling `execute_command` in a loop
1117    /// because it reduces round-trips for DDL scripts and multi-statement setup.
1118    ///
1119    /// # Example
1120    ///
1121    /// ```no_run
1122    /// use hyperdb_api::{Connection, CreateMode, Result};
1123    ///
1124    /// fn main() -> Result<()> {
1125    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
1126    ///     let total = conn.execute_batch(&[
1127    ///         "CREATE TABLE users (id INT, name TEXT)",
1128    ///         "INSERT INTO users VALUES (1, 'Alice')",
1129    ///         "INSERT INTO users VALUES (2, 'Bob')",
1130    ///     ])?;
1131    ///     println!("Total affected: {}", total);
1132    ///     Ok(())
1133    /// }
1134    /// ```
1135    ///
1136    /// # Errors
1137    ///
1138    /// Returns a wrapped [`Error::Internal`] on the first statement that fails;
1139    /// its `source` is the original [`Error::Server`] from
1140    /// [`execute_command`](Self::execute_command). The error message
1141    /// includes the failing statement's ordinal and an 80-character preview
1142    /// of its SQL.
1143    pub fn execute_batch(&self, statements: &[&str]) -> Result<u64> {
1144        let mut total = 0u64;
1145        for (i, stmt) in statements.iter().enumerate() {
1146            if !stmt.trim().is_empty() {
1147                total += self.execute_command(stmt).map_err(|e| {
1148                    let preview: String = stmt.chars().take(80).collect();
1149                    Error::internal(format!(
1150                        "execute_batch failed at statement {} of {}: {}: {}",
1151                        i + 1,
1152                        statements.len(),
1153                        preview,
1154                        e,
1155                    ))
1156                })?;
1157            }
1158        }
1159        Ok(total)
1160    }
1161
1162    /// Returns the attached database path, if any.
1163    pub fn database(&self) -> Option<&str> {
1164        self.database.as_deref()
1165    }
1166
1167    /// Creates a new database file.
1168    ///
1169    /// # Example
1170    ///
1171    /// ```no_run
1172    /// use hyperdb_api::{Connection, Result};
1173    ///
1174    /// fn main() -> Result<()> {
1175    ///     let conn = Connection::without_database("localhost:7483")?;
1176    ///     conn.create_database("new_database.hyper")?;
1177    ///     Ok(())
1178    /// }
1179    /// ```
1180    ///
1181    /// # Errors
1182    ///
1183    /// Returns [`Error::Server`] if the server rejects the
1184    /// `CREATE DATABASE IF NOT EXISTS` statement (e.g. the path is not
1185    /// writable on the server).
1186    pub fn create_database(&self, path: &str) -> Result<()> {
1187        let sql = format!("CREATE DATABASE IF NOT EXISTS {}", escape_sql_path(path));
1188        self.execute_command(&sql)?;
1189        Ok(())
1190    }
1191
1192    /// Drops (deletes) a database file.
1193    ///
1194    /// # Example
1195    ///
1196    /// ```no_run
1197    /// use hyperdb_api::{Connection, Result};
1198    ///
1199    /// fn main() -> Result<()> {
1200    ///     let conn = Connection::without_database("localhost:7483")?;
1201    ///     conn.drop_database("old_database.hyper")?;
1202    ///     Ok(())
1203    /// }
1204    /// ```
1205    ///
1206    /// # Errors
1207    ///
1208    /// Returns [`Error::Server`] if the server rejects the
1209    /// `DROP DATABASE IF EXISTS` statement (e.g. the database is still
1210    /// attached or permissions deny deletion).
1211    pub fn drop_database(&self, path: &str) -> Result<()> {
1212        let sql = format!("DROP DATABASE IF EXISTS {}", escape_sql_path(path));
1213        self.execute_command(&sql)?;
1214        Ok(())
1215    }
1216
1217    /// Attaches a database file to the connection.
1218    ///
1219    /// Once attached, the database can be queried and modified.
1220    /// The database is identified by its alias (or by its path if no alias is provided).
1221    ///
1222    /// # Arguments
1223    ///
1224    /// * `path` - The path to the database file to attach.
1225    /// * `alias` - Optional alias for the database. If `None`, the database is
1226    ///   attached without an explicit alias (typically using its filename).
1227    ///
1228    /// # Errors
1229    ///
1230    /// Returns an error if the database file doesn't exist or if attachment fails.
1231    ///
1232    /// # Example
1233    ///
1234    /// ```no_run
1235    /// use hyperdb_api::{Connection, Result};
1236    ///
1237    /// fn main() -> Result<()> {
1238    ///     let conn = Connection::without_database("localhost:7483")?;
1239    ///
1240    ///     // Attach with an alias
1241    ///     conn.attach_database("data.hyper", Some("mydata"))?;
1242    ///
1243    ///     // Attach without an alias
1244    ///     conn.attach_database("other.hyper", None)?;
1245    ///     Ok(())
1246    /// }
1247    /// ```
1248    pub fn attach_database(&self, path: &str, alias: Option<&str>) -> Result<()> {
1249        let sql = if let Some(alias) = alias {
1250            format!(
1251                "ATTACH DATABASE {} AS {}",
1252                escape_sql_path(path),
1253                escape_sql_path(alias)
1254            )
1255        } else {
1256            format!("ATTACH DATABASE {}", escape_sql_path(path))
1257        };
1258        self.execute_command(&sql)?;
1259        Ok(())
1260    }
1261
1262    /// Detaches a database from this connection.
1263    ///
1264    /// After detaching, the database file is released and can be accessed
1265    /// externally (e.g., copied, moved, etc.). All pending updates are
1266    /// written to disk before detaching.
1267    ///
1268    /// # Arguments
1269    ///
1270    /// * `alias` - The alias of the database to detach.
1271    ///
1272    /// # Errors
1273    ///
1274    /// Returns an error if the database is not attached or if detachment fails.
1275    ///
1276    /// # Example
1277    ///
1278    /// ```no_run
1279    /// use hyperdb_api::{Connection, Result};
1280    ///
1281    /// fn main() -> Result<()> {
1282    ///     let conn = Connection::without_database("localhost:7483")?;
1283    ///     conn.attach_database("data.hyper", Some("mydata"))?;
1284    ///     // ... work with the database ...
1285    ///     conn.detach_database("mydata")?;
1286    ///     Ok(())
1287    /// }
1288    /// ```
1289    pub fn detach_database(&self, alias: &str) -> Result<()> {
1290        let sql = format!("DETACH DATABASE {}", escape_sql_path(alias));
1291        self.execute_command(&sql)?;
1292        Ok(())
1293    }
1294
1295    /// Detaches all databases from this connection.
1296    ///
1297    /// This is useful for cleanup before closing a connection or when
1298    /// you need to release all database files.
1299    ///
1300    /// # Errors
1301    ///
1302    /// Returns [`Error::Server`] if the server rejects the
1303    /// `DETACH ALL DATABASES` statement (e.g. a database is still in use by
1304    /// another session).
1305    pub fn detach_all_databases(&self) -> Result<()> {
1306        self.execute_command("DETACH ALL DATABASES")?;
1307        Ok(())
1308    }
1309
1310    /// Creates a schema in the database.
1311    ///
1312    /// # Errors
1313    ///
1314    /// - Returns an error if `schema_name` cannot be converted into a
1315    ///   [`SchemaName`](crate::SchemaName) (invalid identifier).
1316    /// - Returns [`Error::Server`] if the server rejects the
1317    ///   `CREATE SCHEMA` statement (e.g. the schema already exists).
1318    pub fn create_schema<T>(&self, schema_name: T) -> Result<()>
1319    where
1320        T: TryInto<crate::SchemaName>,
1321        crate::Error: From<T::Error>,
1322    {
1323        crate::catalog::Catalog::new(self).create_schema(schema_name)
1324    }
1325
1326    /// Checks whether a schema exists.
1327    ///
1328    /// # Arguments
1329    ///
1330    /// * `schema` - The schema name (can include database qualifier).
1331    ///
1332    /// # Example
1333    ///
1334    /// ```no_run
1335    /// use hyperdb_api::{Connection, Result};
1336    ///
1337    /// fn main() -> Result<()> {
1338    ///     let conn = Connection::without_database("localhost:7483")?;
1339    ///     if conn.has_schema("public")? {
1340    ///         println!("Schema 'public' exists");
1341    ///     }
1342    ///     Ok(())
1343    /// }
1344    /// ```
1345    ///
1346    /// # Errors
1347    ///
1348    /// - Returns an error if `schema` cannot be converted into a
1349    ///   [`SchemaName`](crate::SchemaName).
1350    /// - Returns [`Error::Server`] if the catalog lookup query fails.
1351    pub fn has_schema<T>(&self, schema: T) -> Result<bool>
1352    where
1353        T: TryInto<crate::SchemaName>,
1354        crate::Error: From<T::Error>,
1355    {
1356        use crate::catalog::Catalog;
1357        Catalog::new(self).has_schema(schema)
1358    }
1359
1360    /// Checks whether a table exists.
1361    ///
1362    /// # Arguments
1363    ///
1364    /// * `table_name` - The table name (can include database and schema qualifiers).
1365    ///
1366    /// # Example
1367    ///
1368    /// ```no_run
1369    /// use hyperdb_api::{Connection, Result};
1370    ///
1371    /// fn main() -> Result<()> {
1372    ///     let conn = Connection::without_database("localhost:7483")?;
1373    ///     if conn.has_table("public.users")? {
1374    ///         println!("Table 'users' exists");
1375    ///     }
1376    ///     Ok(())
1377    /// }
1378    /// ```
1379    ///
1380    /// # Errors
1381    ///
1382    /// - Returns an error if `table_name` cannot be converted into a
1383    ///   [`TableName`](crate::TableName).
1384    /// - Returns [`Error::Server`] if the catalog lookup query fails.
1385    pub fn has_table<T>(&self, table_name: T) -> Result<bool>
1386    where
1387        T: TryInto<crate::TableName>,
1388        crate::Error: From<T::Error>,
1389    {
1390        use crate::catalog::Catalog;
1391        Catalog::new(self).has_table(table_name)
1392    }
1393
1394    /// Returns the server version as a parsed struct.
1395    ///
1396    /// Returns `None` if the version cannot be determined (e.g., gRPC connection).
1397    ///
1398    /// # Example
1399    ///
1400    /// ```no_run
1401    /// use hyperdb_api::{Connection, CreateMode, HyperProcess, ServerVersion, Result};
1402    ///
1403    /// fn main() -> Result<()> {
1404    ///     let hyper = HyperProcess::new(None, None)?;
1405    ///     let conn = Connection::new(&hyper, "test.hyper", CreateMode::CreateIfNotExists)?;
1406    ///     if let Some(version) = conn.server_version() {
1407    ///         println!("Hyper {}", version);
1408    ///         if version >= ServerVersion::new(0, 1, 0) {
1409    ///             println!("Has feature X");
1410    ///         }
1411    ///     }
1412    ///     Ok(())
1413    /// }
1414    /// ```
1415    pub fn server_version(&self) -> Option<crate::ServerVersion> {
1416        let version_str = self.parameter_status("server_version")?;
1417        crate::ServerVersion::parse(&version_str)
1418    }
1419
1420    /// Copies a database file to a new path.
1421    ///
1422    /// The source database must be attached to this connection.
1423    ///
1424    /// # Example
1425    ///
1426    /// ```no_run
1427    /// use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
1428    ///
1429    /// fn main() -> Result<()> {
1430    ///     let hyper = HyperProcess::new(None, None)?;
1431    ///     let conn = Connection::new(&hyper, "source.hyper", CreateMode::DoNotCreate)?;
1432    ///     conn.copy_database("source.hyper", "backup.hyper")?;
1433    ///     Ok(())
1434    /// }
1435    /// ```
1436    ///
1437    /// # Errors
1438    ///
1439    /// Returns [`Error::Server`] if the server rejects the
1440    /// `COPY DATABASE` statement — e.g. the source is not attached, the
1441    /// destination path is not writable, or it already exists.
1442    pub fn copy_database(&self, source: &str, destination: &str) -> Result<()> {
1443        let sql = format!(
1444            "COPY DATABASE {} TO {}",
1445            escape_sql_path(source),
1446            escape_sql_path(destination)
1447        );
1448        self.execute_command(&sql)?;
1449        Ok(())
1450    }
1451
1452    /// Executes EXPLAIN on a query and returns the plan as a string.
1453    ///
1454    /// # Example
1455    ///
1456    /// ```no_run
1457    /// use hyperdb_api::{Connection, CreateMode, Result};
1458    ///
1459    /// fn main() -> Result<()> {
1460    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
1461    ///     let plan = conn.explain("SELECT * FROM users WHERE id = 1")?;
1462    ///     println!("{}", plan);
1463    ///     Ok(())
1464    /// }
1465    /// ```
1466    ///
1467    /// # Errors
1468    ///
1469    /// Returns [`Error::Server`] if `EXPLAIN <query>` fails to parse or
1470    /// plan, or if the streamed result cannot be consumed.
1471    pub fn explain(&self, query: &str) -> Result<String> {
1472        let explain_sql = format!("EXPLAIN {query}");
1473        let result = self.execute_query(&explain_sql)?;
1474        let mut lines = Vec::new();
1475        for row in result.rows() {
1476            let row = row?;
1477            if let Some(line) = row.get::<String>(0) {
1478                lines.push(line);
1479            }
1480        }
1481        Ok(lines.join("\n"))
1482    }
1483
1484    /// Executes EXPLAIN ANALYZE on a query and returns the plan with timing info.
1485    ///
1486    /// **Note:** This actually executes the query to collect timing information.
1487    ///
1488    /// # Errors
1489    ///
1490    /// Returns [`Error::Server`] if `EXPLAIN ANALYZE <query>` fails — this
1491    /// includes any runtime error raised by actually executing `query`.
1492    pub fn explain_analyze(&self, query: &str) -> Result<String> {
1493        let explain_sql = format!("EXPLAIN ANALYZE {query}");
1494        let result = self.execute_query(&explain_sql)?;
1495        let mut lines = Vec::new();
1496        for row in result.rows() {
1497            let row = row?;
1498            if let Some(line) = row.get::<String>(0) {
1499                lines.push(line);
1500            }
1501        }
1502        Ok(lines.join("\n"))
1503    }
1504
1505    /// Returns a reference to the underlying TCP client.
1506    ///
1507    /// # Panics
1508    ///
1509    /// This method returns `None` if the connection is using gRPC transport.
1510    pub fn tcp_client(&self) -> Option<&Client> {
1511        match &self.transport {
1512            Transport::Tcp(tcp) => Some(&tcp.client),
1513            Transport::Grpc(_) => None,
1514        }
1515    }
1516
1517    /// Crate-internal accessor for the transport. Used by
1518    /// [`PreparedStatement`](crate::PreparedStatement) to reach the
1519    /// underlying `hyperdb_api_core::client::Client`.
1520    pub(crate) fn transport(&self) -> &Transport {
1521        &self.transport
1522    }
1523
1524    /// Prepares a SQL statement with automatic parameter type inference.
1525    ///
1526    /// The returned [`PreparedStatement`](crate::PreparedStatement) can
1527    /// be executed many times with different parameter values; the
1528    /// server caches the parsed plan. This is the preferred way to
1529    /// execute a statement repeatedly inside a loop.
1530    ///
1531    /// For explicit parameter types (necessary when `$N` placeholders
1532    /// would otherwise be ambiguous), use
1533    /// [`prepare_typed`](Self::prepare_typed).
1534    ///
1535    /// # Example
1536    ///
1537    /// ```no_run
1538    /// # use hyperdb_api::{Connection, CreateMode, Result};
1539    /// # fn example(conn: &Connection) -> Result<()> {
1540    /// let stmt = conn.prepare("SELECT name FROM users WHERE id = $1")?;
1541    /// for id in [1_i32, 2, 3] {
1542    ///     let name: String = stmt.fetch_scalar(&[&id])?;
1543    ///     println!("{id}: {name}");
1544    /// }
1545    /// # Ok(())
1546    /// # }
1547    /// ```
1548    ///
1549    /// # Errors
1550    ///
1551    /// See [`prepare_typed`](Self::prepare_typed) — this method delegates
1552    /// to it with an empty OID list.
1553    pub fn prepare(&self, query: &str) -> Result<crate::PreparedStatement<'_>> {
1554        self.prepare_typed(query, &[])
1555    }
1556
1557    /// Prepares a SQL statement with explicit parameter type OIDs.
1558    ///
1559    /// Use this when the server cannot infer parameter types from the
1560    /// SQL alone (e.g. a bare `$1` in a `WHERE v > $1` clause with no
1561    /// other context). Constants for common types live in
1562    /// [`hyperdb_api_core::types::oids`].
1563    ///
1564    /// # Errors
1565    ///
1566    /// - Returns [`Error::FeatureNotSupported`] if the connection is using gRPC transport
1567    ///   (prepared statements are TCP-only).
1568    /// - Returns [`Error::Server`] if the server rejects the `Parse`
1569    ///   message, e.g. SQL syntax error or unknown OID.
1570    /// - Returns [`Error::Io`] on transport-level I/O failures.
1571    pub fn prepare_typed(
1572        &self,
1573        query: &str,
1574        param_types: &[crate::Oid],
1575    ) -> Result<crate::PreparedStatement<'_>> {
1576        let client = match &self.transport {
1577            Transport::Tcp(tcp) => &tcp.client,
1578            Transport::Grpc(_) => {
1579                return Err(Error::feature_not_supported(
1580                    "prepared statements are not supported over gRPC transport",
1581                ));
1582            }
1583        };
1584        let inner = client.prepare_typed(query, param_types)?;
1585        crate::PreparedStatement::new(self, inner)
1586    }
1587
1588    /// Returns true if the connection is alive (passive check).
1589    ///
1590    /// This is a lightweight check that does not send any data to the server.
1591    /// For an active health check, use [`ping`](Self::ping).
1592    pub fn is_alive(&self) -> bool {
1593        match &self.transport {
1594            Transport::Tcp(tcp) => tcp.client.is_alive(),
1595            Transport::Grpc(_) => true, // gRPC connections are stateless
1596        }
1597    }
1598
1599    /// Actively checks that the connection is healthy by executing a trivial query.
1600    ///
1601    /// Unlike [`is_alive`](Self::is_alive) which only checks local state,
1602    /// this method sends `SELECT 1` to the server and verifies a response.
1603    ///
1604    /// # Example
1605    ///
1606    /// ```no_run
1607    /// # use hyperdb_api::{Connection, CreateMode, Result};
1608    /// # fn example(conn: &Connection) -> Result<()> {
1609    /// if conn.ping().is_ok() {
1610    ///     println!("Connection is healthy");
1611    /// }
1612    /// # Ok(())
1613    /// # }
1614    /// ```
1615    ///
1616    /// # Errors
1617    ///
1618    /// Returns [`Error::Server`] or [`Error::Io`] if the `SELECT 1`
1619    /// round-trip fails — i.e. the connection is no longer usable.
1620    pub fn ping(&self) -> Result<()> {
1621        self.execute_command("SELECT 1")?;
1622        Ok(())
1623    }
1624
1625    /// Returns the process ID of the backend server connection.
1626    ///
1627    /// Returns 0 for gRPC connections (not applicable).
1628    pub fn process_id(&self) -> i32 {
1629        match &self.transport {
1630            Transport::Tcp(tcp) => tcp.client.process_id(),
1631            Transport::Grpc(_) => 0,
1632        }
1633    }
1634
1635    /// Returns the secret key for the backend server connection.
1636    ///
1637    /// This is used for cancellation requests.
1638    /// Returns 0 for gRPC connections (not applicable).
1639    pub fn secret_key(&self) -> i32 {
1640        match &self.transport {
1641            Transport::Tcp(tcp) => tcp.client.secret_key(),
1642            Transport::Grpc(_) => 0,
1643        }
1644    }
1645
1646    /// Returns a server parameter value by name.
1647    ///
1648    /// Server parameters are sent by the server during connection startup.
1649    /// Common parameters include:
1650    /// - `server_version` - The server version string
1651    /// - `server_encoding` - The server's character encoding
1652    /// - `client_encoding` - The client's character encoding
1653    /// - `DateStyle` - Date display format
1654    /// - `TimeZone` - Server timezone
1655    /// - `session_identifier` - Session ID for connection migration (if routing enabled)
1656    ///
1657    /// Returns `None` if the parameter is not known.
1658    ///
1659    /// # Example
1660    ///
1661    /// ```no_run
1662    /// use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
1663    ///
1664    /// fn main() -> Result<()> {
1665    ///     let hyper = HyperProcess::new(None, None)?;
1666    ///     let conn = Connection::new(&hyper, "test.hyper", CreateMode::CreateIfNotExists)?;
1667    ///
1668    ///     if let Some(version) = conn.parameter_status("server_version") {
1669    ///         println!("Connected to Hyper version: {}", version);
1670    ///     }
1671    ///     Ok(())
1672    /// }
1673    /// ```
1674    pub fn parameter_status(&self, name: &str) -> Option<String> {
1675        match &self.transport {
1676            Transport::Tcp(tcp) => tcp.client.parameter_status(name),
1677            Transport::Grpc(_) => None, // gRPC doesn't have server parameters
1678        }
1679    }
1680
1681    /// Sets the notice receiver for this connection.
1682    ///
1683    /// Server notices and warnings are passed to this callback instead of being
1684    /// logged. Pass `None` to restore default logging behavior.
1685    pub fn set_notice_receiver(
1686        &mut self,
1687        receiver: Option<hyperdb_api_core::client::NoticeReceiver>,
1688    ) {
1689        match &mut self.transport {
1690            Transport::Tcp(tcp) => tcp.client.set_notice_receiver(receiver),
1691            Transport::Grpc(_) => {} // gRPC doesn't support notice receivers
1692        }
1693    }
1694
1695    /// Cancels the currently executing query (thread-safe).
1696    ///
1697    /// # Errors
1698    ///
1699    /// - Returns [`Error::FeatureNotSupported`] on gRPC connections — cancellation is not
1700    ///   yet implemented for gRPC transport.
1701    /// - Returns [`Error::Connection`] or [`Error::Io`] if the separate
1702    ///   cancel-request connection to the server fails.
1703    pub fn cancel(&self) -> Result<()> {
1704        match &self.transport {
1705            Transport::Tcp(tcp) => tcp.client.cancel().map_err(Error::from),
1706            Transport::Grpc(_) => Err(Error::feature_not_supported(
1707                "Query cancellation is not yet supported for gRPC connections.",
1708            )),
1709        }
1710    }
1711
1712    /// Closes the connection, detaching all databases first.
1713    ///
1714    /// # Errors
1715    ///
1716    /// - Returns [`Error::Internal`] wrapping the underlying close failure
1717    ///   (its `source` is the transport error) if the client cannot be
1718    ///   shut down cleanly.
1719    /// - Returns [`Error::Internal`] wrapping the detach failure if the
1720    ///   attached database could not be detached but close itself
1721    ///   succeeded.
1722    pub fn close(self) -> Result<()> {
1723        // Detach the attached database to ensure files are flushed and released.
1724        // Always attempt close, even if detach fails.
1725        let detach_err = if let Some(ref db_path) = self.database {
1726            let db_alias = std::path::Path::new(db_path)
1727                .file_stem()
1728                .and_then(|s| s.to_str())
1729                .unwrap_or("db");
1730            self.execute_command(&format!("DETACH DATABASE {}", escape_sql_path(db_alias)))
1731                .err()
1732        } else {
1733            None
1734        };
1735
1736        // Always attempt to close the client to release the connection.
1737        let close_result = match self.transport {
1738            Transport::Tcp(tcp) => tcp.client.close(),
1739            Transport::Grpc(_) => Ok(()), // gRPC connections are stateless
1740        };
1741
1742        if let Err(e) = close_result {
1743            return Err(Error::internal(format!("Failed to close connection: {e}")));
1744        }
1745
1746        if let Some(e) = detach_err {
1747            // Detach failed but close succeeded; surface the detach error.
1748            return Err(Error::internal(format!(
1749                "Failed to detach database during close: {e}"
1750            )));
1751        }
1752
1753        Ok(())
1754    }
1755
1756    /// Unloads the database from memory while keeping the connection active.
1757    ///
1758    /// This executes the `UNLOAD DATABASE` command, which releases the database
1759    /// from memory but keeps the session and connection open. The database can
1760    /// be accessed again by subsequent queries that will automatically reload it.
1761    ///
1762    /// This is useful for releasing memory locks when switching between databases
1763    /// or when working with multiple database files.
1764    ///
1765    /// # Example
1766    ///
1767    /// ```no_run
1768    /// use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
1769    ///
1770    /// fn main() -> Result<()> {
1771    ///     let hyper = HyperProcess::new(None, None)?;
1772    ///     let conn = Connection::new(&hyper, "test.hyper", CreateMode::Create)?;
1773    ///     
1774    ///     // Do some work with the database
1775    ///     conn.execute_command("CREATE TABLE test (id INT)")?;
1776    ///     
1777    ///     // Unload from memory (but keep connection)
1778    ///     conn.unload_database()?;
1779    ///     
1780    ///     // Database can still be accessed (will be reloaded automatically)
1781    ///     let count: i64 = conn.fetch_scalar("SELECT COUNT(*) FROM test")?;
1782    ///     println!("Count: {}", count);
1783    ///     
1784    ///     Ok(())
1785    /// }
1786    /// ```
1787    ///
1788    /// # Errors
1789    ///
1790    /// Returns [`Error::Server`] if the server rejects the `UNLOAD DATABASE`
1791    /// command (e.g. the database is still in use by another session).
1792    pub fn unload_database(&self) -> Result<()> {
1793        self.execute_command("UNLOAD DATABASE")?;
1794        Ok(())
1795    }
1796
1797    /// Releases the database completely from the session.
1798    ///
1799    /// This executes the `UNLOAD RELEASE` command, which completely releases
1800    /// the database from the session. After this call, the database cannot
1801    /// be accessed until a new connection is established.
1802    ///
1803    /// This is useful for completely freeing database resources when you're
1804    /// done with a database and want to ensure no locks are held.
1805    ///
1806    /// **Note:** This should only be used when the session has exactly one
1807    /// database attached. Hyper does not support `UNLOAD RELEASE` with
1808    /// multiple databases attached to the same session.
1809    ///
1810    /// # Example
1811    ///
1812    /// ```no_run
1813    /// use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
1814    ///
1815    /// fn main() -> Result<()> {
1816    ///     let hyper = HyperProcess::new(None, None)?;
1817    ///     let conn = Connection::new(&hyper, "test.hyper", CreateMode::Create)?;
1818    ///     
1819    ///     // Do some work with the database
1820    ///     conn.execute_command("CREATE TABLE test (id INT)")?;
1821    ///     
1822    ///     // Release database completely from session
1823    ///     conn.unload_release()?;
1824    ///     
1825    ///     // Database cannot be accessed after this point without new connection
1826    ///     // conn.execute_command("SELECT * FROM test")?; // This would fail
1827    ///     
1828    ///     Ok(())
1829    /// }
1830    /// ```
1831    ///
1832    /// # Errors
1833    ///
1834    /// Returns [`Error::Server`] if the server rejects `UNLOAD RELEASE`, most
1835    /// commonly because multiple databases are attached to the same session
1836    /// (Hyper only supports `UNLOAD RELEASE` with exactly one attached DB).
1837    pub fn unload_release(&self) -> Result<()> {
1838        self.execute_command("UNLOAD RELEASE")?;
1839        Ok(())
1840    }
1841
1842    // =========================================================================
1843    // Query Statistics
1844    // =========================================================================
1845
1846    /// Enables query statistics collection for this connection.
1847    ///
1848    /// After enabling, each `execute_command()` or `execute_query()` call will
1849    /// capture detailed performance metrics from Hyper. Retrieve them via
1850    /// [`last_query_stats()`](Self::last_query_stats).
1851    ///
1852    /// The provider determines how stats are collected. Use
1853    /// [`LogFileStatsProvider`](crate::LogFileStatsProvider) to parse Hyper's log file (requires local
1854    /// `hyperd.log`), or implement a custom [`QueryStatsProvider`](crate::QueryStatsProvider).
1855    ///
1856    /// # Example
1857    ///
1858    /// ```no_run
1859    /// # use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
1860    /// # fn main() -> Result<()> {
1861    /// # let hyper = HyperProcess::new(None, None)?;
1862    /// # let mut conn = Connection::new(&hyper, "test.hyper", CreateMode::CreateIfNotExists)?;
1863    /// use hyperdb_api::LogFileStatsProvider;
1864    ///
1865    /// // Auto-detect log path from HyperProcess
1866    /// conn.enable_query_stats(LogFileStatsProvider::from_process(&hyper));
1867    ///
1868    /// // Or specify an explicit log path
1869    /// // conn.enable_query_stats(LogFileStatsProvider::new("/path/to/hyperd.log"));
1870    /// # Ok(())
1871    /// # }
1872    /// ```
1873    pub fn enable_query_stats(&mut self, provider: impl QueryStatsProvider + 'static) {
1874        self.stats_provider = Some(Arc::new(provider));
1875    }
1876
1877    /// Disables query statistics collection.
1878    ///
1879    /// After calling this, `last_query_stats()` will return `None`.
1880    pub fn disable_query_stats(&mut self) {
1881        self.stats_provider = None;
1882        if let Ok(mut guard) = self.pending_stats.lock() {
1883            *guard = None;
1884        }
1885    }
1886
1887    /// Returns the query statistics from the most recent query execution.
1888    ///
1889    /// Stats are resolved **lazily** — the log file is read when this method
1890    /// is called, not when the query executes. This is important for streaming
1891    /// queries (`execute_query`), where Hyper writes the execution stats only
1892    /// after the result set is fully consumed.
1893    ///
1894    /// **Call this after consuming the result set** (e.g., after `collect_rows()`,
1895    /// iterating all chunks, or dropping the `Rowset`).
1896    ///
1897    /// Returns `None` if:
1898    /// - Query stats collection is not enabled
1899    /// - No query has been executed yet
1900    /// - Stats could not be found for the last query (e.g., log entry not matched)
1901    ///
1902    /// # Example
1903    ///
1904    /// ```no_run
1905    /// # use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
1906    /// # fn main() -> Result<()> {
1907    /// # let hyper = HyperProcess::new(None, None)?;
1908    /// # let mut conn = Connection::new(&hyper, "test.hyper", CreateMode::CreateIfNotExists)?;
1909    /// # use hyperdb_api::LogFileStatsProvider;
1910    /// # conn.enable_query_stats(LogFileStatsProvider::from_process(&hyper));
1911    /// conn.execute_command("CREATE TABLE t (id INT)")?;
1912    ///
1913    /// if let Some(stats) = conn.last_query_stats() {
1914    ///     println!("Total: {}s", stats.elapsed_s);
1915    ///     if let Some(ref pre) = stats.pre_execution {
1916    ///         println!("  Parse: {:?}s", pre.parsing_time_s);
1917    ///         println!("  Compile: {:?}s", pre.compilation_time_s);
1918    ///     }
1919    ///     if let Some(ref exec) = stats.execution {
1920    ///         println!("  Execute: {:?}s", exec.elapsed_s);
1921    ///         println!("  Peak mem: {:?} MB", exec.peak_memory_mb);
1922    ///     }
1923    /// }
1924    /// # Ok(())
1925    /// # }
1926    /// ```
1927    pub fn last_query_stats(&self) -> Option<QueryStats> {
1928        let provider = self.stats_provider.as_ref()?;
1929        let mut guard = self.pending_stats.lock().ok()?;
1930        let (token, sql) = guard.take()?;
1931        provider.after_query(token, &sql)
1932    }
1933
1934    /// Internal: call provider's `before_query` if stats are enabled.
1935    fn stats_before_query(&self, sql: &str) -> Option<Box<dyn Any + Send>> {
1936        self.stats_provider.as_ref().map(|p| p.before_query(sql))
1937    }
1938
1939    /// Internal: store the pending token+sql for lazy resolution.
1940    fn stats_store_pending(&self, token: Option<Box<dyn Any + Send>>, sql: &str) {
1941        if let Some(token) = token {
1942            if let Ok(mut guard) = self.pending_stats.lock() {
1943                *guard = Some((token, sql.to_string()));
1944            }
1945        }
1946    }
1947}
1948
1949impl Connection {
1950    // =========================================================================
1951    // Transaction Control
1952    // =========================================================================
1953
1954    // -------------------------------------------------------------------
1955    // Raw transaction control (internal)
1956    // -------------------------------------------------------------------
1957    //
1958    // The `*_raw` methods below are `pub(crate)` and form the canonical
1959    // implementation of session-level transaction control. The RAII
1960    // guard at `crate::Transaction` and any internal helper that
1961    // genuinely needs `&self` (rather than the guard's `&mut self`)
1962    // delegate to these.
1963    //
1964    // The matching `pub` methods (`begin_transaction`, `commit`,
1965    // `rollback`) are thin `#[doc(hidden)] #[deprecated]` wrappers
1966    // retained only so any pre-existing downstream caller sees a
1967    // compiler warning rather than a hard break. They will be deleted
1968    // in a future release; the `_raw` methods stay.
1969
1970    /// Issues `BEGIN TRANSACTION`. Crate-internal use only.
1971    pub(crate) fn begin_transaction_raw(&self) -> Result<()> {
1972        self.execute_command("BEGIN TRANSACTION")?;
1973        Ok(())
1974    }
1975
1976    /// Issues `COMMIT`. Crate-internal use only.
1977    pub(crate) fn commit_raw(&self) -> Result<()> {
1978        self.execute_command("COMMIT")?;
1979        Ok(())
1980    }
1981
1982    /// Issues `ROLLBACK`. Crate-internal use only.
1983    pub(crate) fn rollback_raw(&self) -> Result<()> {
1984        self.execute_command("ROLLBACK")?;
1985        Ok(())
1986    }
1987
1988    /// Begins an explicit transaction.
1989    ///
1990    /// **Prefer [`transaction()`](Self::transaction)** — the RAII guard
1991    /// auto-rolls back on drop and cannot leak a half-open transaction
1992    /// across error paths. This method is hidden from generated
1993    /// rustdoc and marked deprecated; it will be removed in a future
1994    /// release.
1995    ///
1996    /// # Errors
1997    ///
1998    /// Returns [`Error::Server`] if the server rejects `BEGIN TRANSACTION`
1999    /// (e.g. a transaction is already open on this session).
2000    #[doc(hidden)]
2001    #[deprecated(
2002        note = "Use `Connection::transaction()` for an RAII guard. This method will be removed \
2003                in a future release."
2004    )]
2005    pub fn begin_transaction(&self) -> Result<()> {
2006        self.begin_transaction_raw()
2007    }
2008
2009    /// Commits the current transaction.
2010    ///
2011    /// **Prefer [`Transaction::commit`](crate::Transaction::commit)** on
2012    /// the RAII guard returned by [`transaction()`](Self::transaction).
2013    /// Hidden from generated rustdoc and deprecated; slated for removal.
2014    ///
2015    /// # Errors
2016    ///
2017    /// Returns [`Error::Server`] if the server rejects `COMMIT`.
2018    #[doc(hidden)]
2019    #[deprecated(
2020        note = "Use `Transaction::commit()` on the RAII guard from `Connection::transaction()`. \
2021                This method will be removed in a future release."
2022    )]
2023    pub fn commit(&self) -> Result<()> {
2024        self.commit_raw()
2025    }
2026
2027    /// Rolls back the current transaction.
2028    ///
2029    /// **Prefer [`Transaction::rollback`](crate::Transaction::rollback)**
2030    /// on the RAII guard returned by [`transaction()`](Self::transaction).
2031    /// Hidden from generated rustdoc and deprecated; slated for removal.
2032    ///
2033    /// # Errors
2034    ///
2035    /// Returns [`Error::Server`] if the server rejects `ROLLBACK`.
2036    #[doc(hidden)]
2037    #[deprecated(
2038        note = "Use `Transaction::rollback()` on the RAII guard from `Connection::transaction()`. \
2039                This method will be removed in a future release."
2040    )]
2041    pub fn rollback(&self) -> Result<()> {
2042        self.rollback_raw()
2043    }
2044
2045    /// Starts a transaction and returns an RAII guard that auto-rolls back on drop.
2046    ///
2047    /// The returned [`Transaction`](crate::Transaction) exclusively borrows this connection,
2048    /// preventing any other use of the connection while the transaction is active.
2049    /// This is enforced at compile time by Rust's borrow checker. The guard provides
2050    /// `commit()` and `rollback()` methods. If dropped without calling either, the
2051    /// transaction is automatically rolled back.
2052    ///
2053    /// # Example
2054    ///
2055    /// ```no_run
2056    /// # use hyperdb_api::{Connection, CreateMode, Result};
2057    /// # fn main() -> Result<()> {
2058    /// # let mut conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
2059    /// let txn = conn.transaction()?;
2060    /// txn.execute_command("INSERT INTO users VALUES (1, 'Alice')")?;
2061    /// txn.commit()?; // or drop `txn` to auto-rollback
2062    /// # Ok(())
2063    /// # }
2064    /// ```
2065    ///
2066    /// # Errors
2067    ///
2068    /// Returns [`Error::Server`] if the server rejects the `BEGIN`
2069    /// statement issued internally by
2070    /// [`Transaction::new`](crate::Transaction).
2071    pub fn transaction(&mut self) -> Result<crate::Transaction<'_>> {
2072        crate::Transaction::new(self)
2073    }
2074}
2075
2076/// Checks if an error indicates an "already exists" condition based on SQLSTATE codes.
2077///
2078/// This function uses `PostgreSQL` SQLSTATE codes to reliably detect duplicate object errors
2079/// regardless of server locale or message formatting. The codes checked are:
2080/// - `42P04`: Database already exists
2081/// - `42710`: Duplicate object
2082/// - `42P06`: Duplicate schema
2083/// - `42P07`: Duplicate table
2084///
2085/// See: <https://www.postgresql.org/docs/current/errcodes-appendix.html>
2086fn is_already_exists_error(err: &Error) -> bool {
2087    err.sqlstate()
2088        .is_some_and(|code| matches!(code, "42P04" | "42710" | "42P06" | "42P07"))
2089}