Skip to main content

hyperdb_api/
connection.rs

1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Database connection management.
5//!
6//! The [`Connection`] type provides a unified interface for connecting to Hyper
7//! databases via either TCP (`PostgreSQL` wire protocol) or gRPC transport.
8//! The transport is automatically detected from the endpoint URL:
9//!
10//! - `https://` or `http://` → gRPC transport
11//! - Otherwise → TCP transport (e.g., `localhost:7483`)
12
13use std::path::Path;
14
15use hyperdb_api_core::client::Client;
16
17use crate::error::{Error, Result};
18use crate::names::escape_sql_path;
19use crate::process::HyperProcess;
20use crate::result::{Row, Rowset, DEFAULT_BINARY_CHUNK_SIZE};
21use crate::transport::Transport;
22
23use std::any::Any;
24use std::sync::{Arc, Mutex};
25
26use crate::query_stats::{QueryStats, QueryStatsProvider};
27
28/// Trait for types that can be extracted from a scalar query result.
29///
30/// This trait enables the generic [`Connection::execute_scalar_query`] method,
31/// similar to C++'s `executeScalarQuery<T>()` template.
32///
33/// # Implementing Custom Types
34///
35/// You can implement this trait for custom types to use them with `execute_scalar_query`:
36///
37/// ```no_run
38/// # use hyperdb_api::{Row, ScalarValue};
39/// # struct MyType;
40/// # impl MyType { fn parse(s: &str) -> Self { MyType } }
41/// impl ScalarValue for MyType {
42///     fn from_row(row: &Row, col: usize) -> Option<Self> {
43///         row.get_string(col).map(|s| MyType::parse(&s))
44///     }
45/// }
46/// ```
47pub trait ScalarValue: Sized {
48    /// Extracts a value of this type from a row at the given column.
49    fn from_row(row: &Row, col: usize) -> Option<Self>;
50}
51
52impl ScalarValue for i64 {
53    fn from_row(row: &Row, col: usize) -> Option<Self> {
54        row.get_i64(col)
55    }
56}
57
58impl ScalarValue for i32 {
59    fn from_row(row: &Row, col: usize) -> Option<Self> {
60        row.get_i32(col)
61    }
62}
63
64impl ScalarValue for i16 {
65    fn from_row(row: &Row, col: usize) -> Option<Self> {
66        row.get_i16(col)
67    }
68}
69
70impl ScalarValue for f64 {
71    fn from_row(row: &Row, col: usize) -> Option<Self> {
72        row.get_f64(col)
73    }
74}
75
76impl ScalarValue for bool {
77    fn from_row(row: &Row, col: usize) -> Option<Self> {
78        row.get_bool(col)
79    }
80}
81
82impl ScalarValue for String {
83    fn from_row(row: &Row, col: usize) -> Option<Self> {
84        row.get_string(col)
85    }
86}
87
88/// Database creation mode when connecting.
89#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
90pub enum CreateMode {
91    /// Do not create the database. Method will fail if database doesn't exist.
92    #[default]
93    DoNotCreate,
94    /// Create the database. Method will fail if the database already exists.
95    Create,
96    /// Create the database if it doesn't exist.
97    CreateIfNotExists,
98    /// Create the database. If it already exists, drop the old one first.
99    CreateAndReplace,
100}
101
102/// A connection to a Hyper database.
103///
104/// This struct represents an active connection to a Hyper server and optionally
105/// an attached database. The connection is automatically closed when dropped.
106///
107/// # Transport Auto-Detection
108///
109/// The transport is automatically detected from the endpoint URL:
110/// - `https://` or `http://` → gRPC transport (read-only until server supports writes)
111/// - Otherwise → TCP transport (full read/write support)
112///
113/// # CSV / Text Import & Export
114///
115/// For CSV, TSV, and other delimited-text formats, see the [`copy`](crate::copy)
116/// module which provides [`export_csv()`](Self::export_csv),
117/// [`import_csv()`](Self::import_csv), and related methods on this struct.
118///
119/// # Example
120///
121/// ```no_run
122/// use hyperdb_api::{Connection, CreateMode, Result};
123///
124/// fn main() -> Result<()> {
125///     // TCP connection (full read/write)
126///     let conn = Connection::connect("localhost:7483", "example.hyper", CreateMode::CreateIfNotExists)?;
127///
128///     // Execute SQL commands
129///     conn.execute_command("CREATE TABLE test (id INT, name TEXT)")?;
130///     conn.execute_command("INSERT INTO test VALUES (1, 'Hello')")?;
131///
132///     Ok(())
133/// }
134/// ```
135///
136/// ```no_run
137/// # use hyperdb_api::{Connection, CreateMode, Result};
138/// # fn example() -> Result<()> {
139/// // gRPC connection (read-only, auto-detected from URL)
140/// let conn = Connection::connect(
141///     "https://hyper-server.example.com:443",
142///     "example.hyper",
143///     CreateMode::DoNotCreate,  // Must be DoNotCreate for gRPC
144/// )?;
145/// # Ok(())
146/// # }
147/// ```
148pub struct Connection {
149    transport: Transport,
150    database: Option<String>,
151    stats_provider: Option<Arc<dyn QueryStatsProvider>>,
152    /// Pending stats token + SQL from the most recent query, resolved lazily.
153    pending_stats: Mutex<Option<(Box<dyn Any + Send>, String)>>,
154}
155
156impl std::fmt::Debug for Connection {
157    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158        f.debug_struct("Connection")
159            .field("database", &self.database)
160            .finish_non_exhaustive()
161    }
162}
163
164impl Connection {
165    /// Creates a new connection to a Hyper instance with a database.
166    ///
167    /// This is the primary way to connect to a running [`HyperProcess`].
168    ///
169    /// # Arguments
170    ///
171    /// * `instance` - The Hyper server instance to connect to.
172    /// * `database_path` - Path to the database file.
173    /// * `create_mode` - How to handle database creation.
174    ///
175    /// # Errors
176    ///
177    /// Returns an error if the connection could not be established.
178    ///
179    /// # Example
180    ///
181    /// ```no_run
182    /// use hyperdb_api::{HyperProcess, Connection, CreateMode, Result};
183    ///
184    /// fn main() -> Result<()> {
185    ///     let hyper = HyperProcess::new(None, None)?;
186    ///     let conn = Connection::new(&hyper, "database.hyper", CreateMode::CreateIfNotExists)?;
187    ///     Ok(())
188    /// }
189    /// ```
190    pub fn new(
191        instance: &HyperProcess,
192        database_path: impl AsRef<Path>,
193        create_mode: CreateMode,
194    ) -> Result<Self> {
195        // Prefer using the connection_endpoint which properly handles UDS/Named Pipes
196        if let Some(conn_endpoint) = instance.connection_endpoint() {
197            return Self::connect_with_endpoint(
198                conn_endpoint,
199                &database_path.as_ref().to_string_lossy(),
200                create_mode,
201            );
202        }
203
204        // Fall back to string endpoint (TCP)
205        let endpoint = instance.require_endpoint()?;
206        Self::connect(
207            endpoint,
208            &database_path.as_ref().to_string_lossy(),
209            create_mode,
210        )
211    }
212
213    /// Connects using a `ConnectionEndpoint` (supports TCP, UDS, and Named Pipes).
214    fn connect_with_endpoint(
215        endpoint: &hyperdb_api_core::client::ConnectionEndpoint,
216        database_path: &str,
217        create_mode: CreateMode,
218    ) -> Result<Self> {
219        let db_path_str = Some(database_path.to_string());
220
221        let config = hyperdb_api_core::client::Config::new().with_user("tableau_internal_user");
222
223        let client = hyperdb_api_core::client::Client::connect_endpoint(endpoint, &config)?;
224
225        let conn = Connection::from_client(client, db_path_str.clone());
226
227        // Handle database creation
228        if let Some(db_path) = db_path_str {
229            conn.handle_creation_mode(&db_path, create_mode)?;
230            conn.attach_and_set_path(&db_path)?;
231        }
232
233        Ok(conn)
234    }
235
236    /// Connects to a Hyper server and optionally attaches a database.
237    ///
238    /// # Arguments
239    ///
240    /// * `endpoint` - The server endpoint (host:port).
241    /// * `database_path` - Path to the database file.
242    /// * `create_mode` - How to handle database creation.
243    ///
244    /// # Errors
245    ///
246    /// Returns an error if the connection could not be established.
247    pub fn connect(endpoint: &str, database_path: &str, create_mode: CreateMode) -> Result<Self> {
248        crate::ConnectionBuilder::new(endpoint)
249            .database(database_path)
250            .create_mode(create_mode)
251            .build()
252    }
253
254    /// Returns a connection builder for advanced configuration.
255    ///
256    /// This is useful when you need to set authentication, timeouts, or
257    /// other advanced options before connecting.
258    #[must_use]
259    pub fn builder(endpoint: &str) -> crate::ConnectionBuilder {
260        crate::ConnectionBuilder::new(endpoint)
261    }
262
263    /// Creates a Connection from a low-level Client (internal use, TCP only).
264    pub(crate) fn from_client(client: Client, database: Option<String>) -> Self {
265        Connection {
266            transport: Transport::Tcp(Box::new(crate::transport::TcpTransport { client })),
267            database,
268            stats_provider: None,
269            pending_stats: Mutex::new(None),
270        }
271    }
272
273    /// Creates a Connection from a Transport (internal use).
274    #[allow(
275        dead_code,
276        reason = "used by ConnectionBuilder for the gRPC path; not reached under non-gRPC feature builds"
277    )]
278    pub(crate) fn from_transport(transport: Transport, database: Option<String>) -> Self {
279        Connection {
280            transport,
281            database,
282            stats_provider: None,
283            pending_stats: Mutex::new(None),
284        }
285    }
286
287    /// Returns the transport type name (e.g., "TCP", "gRPC", "Unix Socket").
288    pub fn transport_type(&self) -> &'static str {
289        self.transport.transport_type().as_str()
290    }
291
292    /// Returns true if this connection supports write operations.
293    ///
294    /// Currently, only TCP connections support writes. gRPC connections are
295    /// read-only until the server supports write operations over gRPC.
296    pub fn supports_writes(&self) -> bool {
297        self.transport.supports_writes()
298    }
299
300    /// Handles database creation logic (internal use).
301    pub(crate) fn handle_creation_mode(
302        &self,
303        database_path: &str,
304        create_mode: CreateMode,
305    ) -> Result<()> {
306        match create_mode {
307            CreateMode::DoNotCreate => {}
308            CreateMode::Create => {
309                self.execute_command(&format!(
310                    "CREATE DATABASE {}",
311                    escape_sql_path(database_path)
312                ))?;
313            }
314            CreateMode::CreateIfNotExists => {
315                if let Err(e) = self.execute_command(&format!(
316                    "CREATE DATABASE IF NOT EXISTS {}",
317                    escape_sql_path(database_path)
318                )) {
319                    if !is_already_exists_error(&e) {
320                        return Err(Error::internal(format!(
321                            "Failed to create database '{database_path}': {e}"
322                        )));
323                    }
324                }
325            }
326            CreateMode::CreateAndReplace => {
327                let _ = self.execute_command(&format!(
328                    "DROP DATABASE IF EXISTS {}",
329                    escape_sql_path(database_path)
330                ));
331                self.execute_command(&format!(
332                    "CREATE DATABASE {}",
333                    escape_sql_path(database_path)
334                ))?;
335            }
336        }
337        Ok(())
338    }
339
340    /// Attaches and sets the database path (internal use).
341    pub(crate) fn attach_and_set_path(&self, database_path: &str) -> Result<()> {
342        let db_alias = std::path::Path::new(database_path)
343            .file_stem()
344            .and_then(|s| s.to_str())
345            .unwrap_or("db");
346
347        self.execute_command(&format!(
348            "ATTACH DATABASE {} AS {}",
349            escape_sql_path(database_path),
350            escape_sql_path(db_alias)
351        ))?;
352
353        self.execute_command(&format!(
354            "SET search_path TO {}, public",
355            escape_sql_path(db_alias)
356        ))?;
357
358        Ok(())
359    }
360
361    /// Connects to a Hyper server with authentication.
362    ///
363    /// # Arguments
364    ///
365    /// * `endpoint` - The server endpoint (host:port).
366    /// * `database_path` - Path to the database file.
367    /// * `create_mode` - How to handle database creation.
368    /// * `user` - Username for authentication.
369    /// * `password` - Password for authentication.
370    ///
371    /// # Errors
372    ///
373    /// Returns an error if the connection or authentication fails.
374    pub fn connect_with_auth(
375        endpoint: &str,
376        database_path: &str,
377        create_mode: CreateMode,
378        user: &str,
379        password: &str,
380    ) -> Result<Self> {
381        crate::ConnectionBuilder::new(endpoint)
382            .database(database_path)
383            .create_mode(create_mode)
384            .user(user.to_string())
385            .password(password)
386            .build()
387    }
388
389    /// Creates a connection to a Hyper server without attaching a database.
390    ///
391    /// # Errors
392    ///
393    /// Returns [`Error::Connection`] if the TCP or gRPC handshake fails, and
394    /// [`Error::Io`] if the endpoint cannot be reached.
395    pub fn without_database(endpoint: &str) -> Result<Self> {
396        crate::ConnectionBuilder::new(endpoint).build()
397    }
398
399    /// Executes a SQL command that doesn't return results.
400    ///
401    /// Use this for DDL statements (CREATE, ALTER, DROP) and DML statements
402    /// (INSERT, UPDATE, DELETE).
403    ///
404    /// # Arguments
405    ///
406    /// * `command` - The SQL command to execute.
407    ///
408    /// # Returns
409    ///
410    /// The number of affected rows, or 0 if not applicable.
411    ///
412    /// # Errors
413    ///
414    /// Returns an error if:
415    /// - The connection is using gRPC transport (write operations not yet supported)
416    /// - The command fails to execute
417    pub fn execute_command(&self, command: &str) -> Result<u64> {
418        let token = self.stats_before_query(command);
419
420        let result = self.transport.execute_command(command);
421
422        // For commands, the query is fully executed synchronously, so we
423        // can store the pending token immediately for lazy resolution.
424        self.stats_store_pending(token, command);
425
426        result
427    }
428
429    /// Executes a SQL query and returns a streaming result set.
430    ///
431    /// Results are streamed in chunks (default 64K rows), keeping memory usage
432    /// constant regardless of result set size. This makes it safe for any
433    /// result size, from a single row to billions of rows.
434    ///
435    /// # Example
436    ///
437    /// ```no_run
438    /// # use hyperdb_api::{Connection, Result};
439    /// # fn example(conn: &Connection) -> Result<()> {
440    /// let mut result = conn.execute_query("SELECT id, value FROM measurements")?;
441    /// while let Some(chunk) = result.next_chunk()? {
442    ///     for row in &chunk {
443    ///         // Generic typed access (like C++ row.get<T>())
444    ///         let id: Option<i32> = row.get(0);
445    ///         let value: Option<f64> = row.get(1);
446    ///
447    ///         // Or direct accessors
448    ///         let id = row.get_i32(0);
449    ///         let value = row.get_f64(1);
450    ///     }
451    /// }
452    /// # Ok(())
453    /// # }
454    /// ```
455    ///
456    /// # Memory Behavior
457    ///
458    /// - Only one chunk is held in memory at a time (~few MB for 64K rows)
459    /// - Safe for result sets of any size (millions/billions of rows)
460    /// - Memory usage is `O(chunk_size)`, not `O(total_rows)`
461    ///
462    /// # Errors
463    ///
464    /// - Returns [`Error::Server`] wrapping a `hyperdb_api_core::client::Error` if the
465    ///   SQL fails to parse, execute, or if the server reports an error
466    ///   while streaming.
467    /// - Returns [`Error::Io`] on transport-level I/O failures.
468    pub fn execute_query(&self, query: &str) -> Result<Rowset<'_>> {
469        let token = self.stats_before_query(query);
470
471        let result = match &self.transport {
472            Transport::Tcp(tcp) => {
473                let stream = tcp
474                    .client
475                    .query_streaming(query, DEFAULT_BINARY_CHUNK_SIZE)?;
476                Ok(Rowset::new(stream))
477            }
478            Transport::Grpc(grpc) => {
479                // gRPC streaming: pull chunks lazily so peak memory is
480                // bounded by one gRPC message (tonic default 64 MB), not
481                // by the full result size. Matches TCP's
482                // constant-memory streaming shape.
483                //
484                // The transport module already creates a fresh gRPC client
485                // per query (gRPC client needs &mut self to execute), so we
486                // do the same here: connect, start the stream, wrap as a
487                // `ChunkSource`. The stream keeps the channel and runtime
488                // alive via refcounted handles inside `GrpcChunkStreamSync`.
489                let mut client =
490                    hyperdb_api_core::client::grpc::GrpcClientSync::connect(grpc.config.clone())?;
491                let stream = client.execute_query_stream(query)?;
492                let source = Box::new(crate::grpc_connection::GrpcChunkStreamSource::new(stream));
493                let arrow_rowset = crate::arrow_result::ArrowRowset::from_stream(source)?;
494                Ok(Rowset::from_arrow(arrow_rowset))
495            }
496        };
497
498        // Store the pending token — Hyper logs the execution stats after the
499        // result is consumed (streamed), so we defer resolution until
500        // last_query_stats() is called.
501        self.stats_store_pending(token, query);
502
503        result
504    }
505
506    // =========================================================================
507    // Arrow Format Queries
508    // =========================================================================
509
510    /// Executes a SELECT query and returns results as Arrow IPC stream bytes.
511    ///
512    /// # Example
513    ///
514    /// ```no_run
515    /// use hyperdb_api::{Connection, CreateMode, Result};
516    ///
517    /// fn main() -> Result<()> {
518    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
519    ///
520    ///     // Create and populate a table
521    ///     conn.execute_command("CREATE TABLE data (id INT, value DOUBLE PRECISION)")?;
522    ///     conn.execute_command("INSERT INTO data VALUES (1, 1.5), (2, 2.5)")?;
523    ///
524    ///     // Get results as Arrow IPC stream
525    ///     let arrow_data = conn.execute_query_to_arrow("SELECT * FROM data")?;
526    ///     println!("Got {} bytes of Arrow IPC data", arrow_data.len());
527    ///
528    ///     Ok(())
529    /// }
530    /// ```
531    ///
532    /// # Errors
533    ///
534    /// Propagates any [`Error::Server`] from the TCP or gRPC transport when
535    /// the query fails or the server cannot produce Arrow IPC output.
536    pub fn execute_query_to_arrow(&self, select_query: &str) -> Result<bytes::Bytes> {
537        self.transport.execute_query_to_arrow(select_query)
538    }
539
540    /// Exports an entire table to Arrow IPC stream format.
541    ///
542    /// This is a convenience method equivalent to
543    /// `execute_query_to_arrow("SELECT * FROM table_name")`.
544    ///
545    /// # Arguments
546    ///
547    /// * `table_name` - The table name
548    ///
549    /// # Returns
550    ///
551    /// Raw Arrow IPC stream bytes containing all rows from the table.
552    ///
553    /// # Example
554    ///
555    /// ```no_run
556    /// use hyperdb_api::{Connection, CreateMode, Result};
557    ///
558    /// fn main() -> Result<()> {
559    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
560    ///     let arrow_data = conn.export_table_to_arrow("my_table")?;
561    ///     Ok(())
562    /// }
563    /// ```
564    ///
565    /// # Errors
566    ///
567    /// Returns whatever [`execute_query_to_arrow`](Self::execute_query_to_arrow)
568    /// would return for `SELECT * FROM <table_name>` — typically
569    /// [`Error::Server`] if the table does not exist or the query is rejected.
570    pub fn export_table_to_arrow(&self, table_name: &str) -> Result<bytes::Bytes> {
571        self.execute_query_to_arrow(&format!("SELECT * FROM {table_name}"))
572    }
573
574    /// Executes a SELECT query and returns results as Arrow `RecordBatch`es.
575    ///
576    /// This is the recommended method for Arrow-native workflows (`DataFusion`,
577    /// Polars, etc.) where you want direct `RecordBatch` access without going
578    /// through the `Row` abstraction.
579    ///
580    /// # Example
581    ///
582    /// ```no_run
583    /// use hyperdb_api::{Connection, CreateMode, Result};
584    /// use arrow::record_batch::RecordBatch;
585    ///
586    /// fn main() -> Result<()> {
587    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
588    ///
589    ///     let batches: Vec<RecordBatch> = conn.execute_query_to_batches("SELECT * FROM data")?;
590    ///     for batch in &batches {
591    ///         println!("batch: {} rows x {} cols", batch.num_rows(), batch.num_columns());
592    ///     }
593    ///     Ok(())
594    /// }
595    /// ```
596    ///
597    /// # Errors
598    ///
599    /// - Returns [`Error::Server`] if the query itself fails.
600    /// - Returns [`Error::Conversion`] if the Arrow IPC payload returned by the
601    ///   server is malformed and cannot be decoded into record batches.
602    pub fn execute_query_to_batches(
603        &self,
604        select_query: &str,
605    ) -> Result<Vec<arrow::record_batch::RecordBatch>> {
606        let arrow_data = self.execute_query_to_arrow(select_query)?;
607        crate::arrow_result::parse_arrow_ipc(arrow_data)
608    }
609
610    /// Fetches a single row from a query.
611    ///
612    /// Returns an error if the query returns no rows.
613    ///
614    /// # Example
615    ///
616    /// ```no_run
617    /// use hyperdb_api::{Connection, CreateMode, Result};
618    ///
619    /// fn main() -> Result<()> {
620    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
621    ///     let row = conn.fetch_one("SELECT * FROM users WHERE id = 1")?;
622    ///     let id: Option<i32> = row.get(0);
623    ///     let name: Option<String> = row.get(1);
624    ///     Ok(())
625    /// }
626    /// ```
627    ///
628    /// # Errors
629    ///
630    /// - Returns the error from [`execute_query`](Self::execute_query) if
631    ///   the query itself fails.
632    /// - Returns [`Error::Conversion`] with message `"Query returned no rows"` if
633    ///   the query produced zero rows.
634    pub fn fetch_one<Q>(&self, query: Q) -> Result<crate::Row>
635    where
636        Q: AsRef<str>,
637    {
638        let query = query.as_ref();
639        let result = self.execute_query(query)?;
640        result.require_first_row()
641    }
642
643    /// Fetches an optional single row from a query.
644    ///
645    /// Returns `None` if the query returns no rows.
646    ///
647    /// # Example
648    ///
649    /// ```no_run
650    /// use hyperdb_api::{Connection, CreateMode, Result};
651    ///
652    /// fn main() -> Result<()> {
653    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
654    ///     if let Some(row) = conn.fetch_optional("SELECT * FROM users WHERE id = 999")? {
655    ///         let name: Option<String> = row.get(1);
656    ///         println!("Found user: {:?}", name);
657    ///     }
658    ///     Ok(())
659    /// }
660    /// ```
661    ///
662    /// # Errors
663    ///
664    /// Returns the error from [`execute_query`](Self::execute_query) if the
665    /// query itself fails. An empty result set is not an error — it yields
666    /// `Ok(None)`.
667    pub fn fetch_optional<Q>(&self, query: Q) -> Result<Option<crate::Row>>
668    where
669        Q: AsRef<str>,
670    {
671        let query = query.as_ref();
672        let result = self.execute_query(query)?;
673        result.first_row()
674    }
675
676    /// Fetches all rows from a query.
677    ///
678    /// # Example
679    ///
680    /// ```no_run
681    /// use hyperdb_api::{Connection, CreateMode, Result};
682    ///
683    /// fn main() -> Result<()> {
684    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
685    ///     let rows = conn.fetch_all("SELECT * FROM users WHERE active = true ORDER BY name")?;
686    ///     for row in rows {
687    ///         let id: Option<i32> = row.get(0);
688    ///         let name: Option<String> = row.get(1);
689    ///         println!("User {}: {:?}", id.unwrap_or(-1), name);
690    ///     }
691    ///     Ok(())
692    /// }
693    /// ```
694    ///
695    /// # Errors
696    ///
697    /// Returns the error from [`execute_query`](Self::execute_query), or a
698    /// transport error produced while draining every chunk of the streamed
699    /// result set.
700    pub fn fetch_all<Q>(&self, query: Q) -> Result<Vec<crate::Row>>
701    where
702        Q: AsRef<str>,
703    {
704        let query = query.as_ref();
705        let result = self.execute_query(query)?;
706        result.collect_rows()
707    }
708
709    /// Fetches a single row and maps it to a struct using [`FromRow`](crate::FromRow).
710    ///
711    /// Returns an error if the query returns no rows or if mapping fails.
712    ///
713    /// # Example
714    ///
715    /// ```no_run
716    /// use hyperdb_api::{Connection, CreateMode, FromRow, RowAccessor, Result};
717    ///
718    /// struct User { id: i32, name: String }
719    ///
720    /// impl FromRow for User {
721    ///     fn from_row(row: RowAccessor<'_>) -> Result<Self> {
722    ///         Ok(User {
723    ///             id: row.get("id")?,
724    ///             name: row.get_opt("name")?.unwrap_or_default(),
725    ///         })
726    ///     }
727    /// }
728    ///
729    /// fn main() -> Result<()> {
730    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
731    ///     let user: User = conn.fetch_one_as("SELECT id, name FROM users WHERE id = 1")?;
732    ///     Ok(())
733    /// }
734    /// ```
735    ///
736    /// # Errors
737    ///
738    /// - Returns the error from [`fetch_one`](Self::fetch_one) if the query
739    ///   fails or returns no rows.
740    /// - Returns whatever error [`FromRow::from_row`](crate::FromRow::from_row)
741    ///   produces when the row cannot be mapped into `T`.
742    pub fn fetch_one_as<T: crate::FromRow>(&self, query: &str) -> Result<T> {
743        let row = self.fetch_one(query)?;
744        let indices = row
745            .schema()
746            .map(crate::row_accessor::RowAccessor::build_indices)
747            .unwrap_or_default();
748        T::from_row(crate::RowAccessor::new(&row, &indices))
749    }
750
751    /// Fetches all rows and maps them to structs using [`FromRow`](crate::FromRow).
752    ///
753    /// # Example
754    ///
755    /// ```no_run
756    /// # use hyperdb_api::{Connection, FromRow, RowAccessor, Result};
757    /// # struct User { id: i32, name: String }
758    /// # impl FromRow for User {
759    /// #     fn from_row(row: RowAccessor<'_>) -> Result<Self> {
760    /// #         Ok(User { id: row.get("id")?, name: row.get_opt("name")?.unwrap_or_default() })
761    /// #     }
762    /// # }
763    /// # fn example(conn: &Connection) -> Result<()> {
764    /// let users: Vec<User> = conn.fetch_all_as("SELECT id, name FROM users")?;
765    /// # Ok(())
766    /// # }
767    /// ```
768    ///
769    /// # Errors
770    ///
771    /// - Returns the error from [`fetch_all`](Self::fetch_all) if the query
772    ///   fails.
773    /// - Returns the first error produced by
774    ///   [`FromRow::from_row`](crate::FromRow::from_row) on any of the rows.
775    pub fn fetch_all_as<T: crate::FromRow>(&self, query: &str) -> Result<Vec<T>> {
776        let rows = self.fetch_all(query)?;
777        // Build the column-name → index lookup once from the first
778        // row's schema; reuse for every row. All rows in a result set
779        // share the same `Arc<ResultSchema>`, so this is safe.
780        let indices = rows
781            .first()
782            .and_then(crate::result::Row::schema)
783            .map(crate::row_accessor::RowAccessor::build_indices)
784            .unwrap_or_default();
785        rows.iter()
786            .map(|r| T::from_row(crate::RowAccessor::new(r, &indices)))
787            .collect()
788    }
789
790    /// Returns a lazy iterator over rows, mapping each to `T` via
791    /// [`FromRow`].
792    ///
793    /// This is the streaming variant of [`fetch_all_as`](Self::fetch_all_as):
794    /// memory usage is bounded by the chunk size (default 64K rows), not by
795    /// the total row count. Use this for large result sets where collecting
796    /// all rows into a `Vec` would exceed memory limits.
797    ///
798    /// The column-name → index lookup table is built exactly once (on the
799    /// first non-empty chunk) and reused for all rows, so per-row mapping is
800    /// O(1) in column count.
801    ///
802    /// # Example
803    ///
804    /// ```no_run
805    /// # use hyperdb_api::{Connection, CreateMode, FromRow, RowAccessor, Result};
806    /// # struct User { id: i32, name: String }
807    /// # impl FromRow for User {
808    /// #     fn from_row(row: RowAccessor<'_>) -> Result<Self> {
809    /// #         Ok(User { id: row.get("id")?, name: row.get("name")? })
810    /// #     }
811    /// # }
812    /// # fn example(conn: &Connection) -> Result<()> {
813    /// for row_result in conn.stream_as::<User>("SELECT id, name FROM users")? {
814    ///     let user = row_result?;
815    ///     println!("{}: {}", user.id, user.name);
816    /// }
817    /// # Ok(())
818    /// # }
819    /// ```
820    ///
821    /// # Errors
822    ///
823    /// - The returned `Result` wraps errors detected while *opening* the
824    ///   result stream — transport/connection failures, and (on the gRPC
825    ///   transport, which establishes the query stream eagerly) SQL parse and
826    ///   server errors. On the default TCP transport the query is streamed
827    ///   lazily, so SQL errors such as a missing table are typically reported
828    ///   as the **first yielded item** rather than by this outer `Result`.
829    /// - Each yielded item is itself a `Result<T>`:
830    ///   - `Ok(T)` if the row was successfully mapped via `FromRow`.
831    ///   - `Err(e)` for a server/transport error encountered while streaming a
832    ///     later chunk, or for a per-row mapping failure (missing column, type
833    ///     mismatch, NULL in a non-optional field).
834    ///
835    /// In short: always handle errors *both* on the outer `Result` and on each
836    /// item — do not assume a successfully-returned iterator means the query
837    /// succeeded.
838    ///
839    /// [`FromRow`]: crate::FromRow
840    pub fn stream_as<'a, T>(&'a self, query: &str) -> Result<impl Iterator<Item = Result<T>> + 'a>
841    where
842        T: crate::FromRow + 'a,
843    {
844        let rowset = self.execute_query(query)?;
845        Ok(crate::result::TypedRowIterator::<T>::new(rowset))
846    }
847
848    /// Fetches a single row from a **parameterized** query and maps it to a
849    /// struct using [`FromRow`](crate::FromRow).
850    ///
851    /// This is the parameterized counterpart to
852    /// [`fetch_one_as`](Self::fetch_one_as): it binds `$1`, `$2`, … placeholders
853    /// from `params` (via [`ToSqlParam`](crate::params::ToSqlParam), exactly as
854    /// [`query_params`](Self::query_params) does) and maps the first result row
855    /// into `T`. Use it when a parameterized `SELECT` should yield a typed
856    /// struct rather than a raw [`Row`](crate::Row).
857    ///
858    /// # Example
859    ///
860    /// ```no_run
861    /// # use hyperdb_api::{Connection, FromRow, RowAccessor, Result};
862    /// # struct User { id: i32, name: String }
863    /// # impl FromRow for User {
864    /// #     fn from_row(row: RowAccessor<'_>) -> Result<Self> {
865    /// #         Ok(User { id: row.get("id")?, name: row.get("name")? })
866    /// #     }
867    /// # }
868    /// # fn example(conn: &Connection) -> Result<()> {
869    /// let user: User = conn.fetch_one_as_params(
870    ///     "SELECT id, name FROM users WHERE id = $1",
871    ///     &[&1i32],
872    /// )?;
873    /// # Ok(())
874    /// # }
875    /// ```
876    ///
877    /// # Errors
878    ///
879    /// - Returns [`Error::FeatureNotSupported`] if the connection is using gRPC
880    ///   transport (prepared statements are TCP-only).
881    /// - Returns the error from [`query_params`](Self::query_params) if the
882    ///   server rejects the statement at `Parse`, `Bind`, or `Execute` time, or
883    ///   on transport-level I/O failures.
884    /// - Returns [`Error::Conversion`] with message `"Query returned no rows"`
885    ///   if the query produced zero rows.
886    /// - Returns whatever [`FromRow::from_row`](crate::FromRow::from_row)
887    ///   produces when the row cannot be mapped into `T`.
888    pub fn fetch_one_as_params<T: crate::FromRow>(
889        &self,
890        query: &str,
891        params: &[&dyn crate::params::ToSqlParam],
892    ) -> Result<T> {
893        let row = self.query_params(query, params)?.require_first_row()?;
894        let indices = row
895            .schema()
896            .map(crate::row_accessor::RowAccessor::build_indices)
897            .unwrap_or_default();
898        T::from_row(crate::RowAccessor::new(&row, &indices))
899    }
900
901    /// Fetches all rows from a **parameterized** query and maps them to structs
902    /// using [`FromRow`](crate::FromRow).
903    ///
904    /// This is the parameterized counterpart to
905    /// [`fetch_all_as`](Self::fetch_all_as): it binds `$1`, `$2`, … placeholders
906    /// from `params` (see [`query_params`](Self::query_params)) and maps every
907    /// result row into `T`.
908    ///
909    /// # Example
910    ///
911    /// ```no_run
912    /// # use hyperdb_api::{Connection, FromRow, RowAccessor, Result};
913    /// # struct User { id: i32, name: String }
914    /// # impl FromRow for User {
915    /// #     fn from_row(row: RowAccessor<'_>) -> Result<Self> {
916    /// #         Ok(User { id: row.get("id")?, name: row.get("name")? })
917    /// #     }
918    /// # }
919    /// # fn example(conn: &Connection) -> Result<()> {
920    /// let users: Vec<User> = conn.fetch_all_as_params(
921    ///     "SELECT id, name FROM users WHERE org_id = $1",
922    ///     &[&42i32],
923    /// )?;
924    /// # Ok(())
925    /// # }
926    /// ```
927    ///
928    /// # Errors
929    ///
930    /// - Returns [`Error::FeatureNotSupported`] if the connection is using gRPC
931    ///   transport.
932    /// - Returns the error from [`query_params`](Self::query_params) if the
933    ///   server rejects the statement, or on transport-level I/O failures.
934    /// - Returns the first error produced by
935    ///   [`FromRow::from_row`](crate::FromRow::from_row) on any of the rows.
936    pub fn fetch_all_as_params<T: crate::FromRow>(
937        &self,
938        query: &str,
939        params: &[&dyn crate::params::ToSqlParam],
940    ) -> Result<Vec<T>> {
941        let rows = self.query_params(query, params)?.collect_rows()?;
942        // Build the column-name → index lookup once from the first row's
943        // schema; reuse for every row. See `fetch_all_as`.
944        let indices = rows
945            .first()
946            .and_then(crate::result::Row::schema)
947            .map(crate::row_accessor::RowAccessor::build_indices)
948            .unwrap_or_default();
949        rows.iter()
950            .map(|r| T::from_row(crate::RowAccessor::new(r, &indices)))
951            .collect()
952    }
953
954    /// Returns a lazy iterator over the rows of a **parameterized** query,
955    /// mapping each to `T` via [`FromRow`].
956    ///
957    /// This is the parameterized counterpart to
958    /// [`stream_as`](Self::stream_as): it binds `$1`, `$2`, … placeholders from
959    /// `params` (see [`query_params`](Self::query_params)) and streams the
960    /// result, mapping each row into `T` while holding only one transport chunk
961    /// in memory at a time. The column-index map is built once on the first
962    /// chunk and reused, so per-row mapping is O(1) in the column count.
963    ///
964    /// # Example
965    ///
966    /// ```no_run
967    /// # use hyperdb_api::{Connection, FromRow, RowAccessor, Result};
968    /// # struct User { id: i32, name: String }
969    /// # impl FromRow for User {
970    /// #     fn from_row(row: RowAccessor<'_>) -> Result<Self> {
971    /// #         Ok(User { id: row.get("id")?, name: row.get("name")? })
972    /// #     }
973    /// # }
974    /// # fn example(conn: &Connection) -> Result<()> {
975    /// for row_result in conn.stream_as_params::<User>(
976    ///     "SELECT id, name FROM users WHERE org_id = $1",
977    ///     &[&42i32],
978    /// )? {
979    ///     let user = row_result?;
980    ///     println!("{}: {}", user.id, user.name);
981    /// }
982    /// # Ok(())
983    /// # }
984    /// ```
985    ///
986    /// # Errors
987    ///
988    /// - The returned outer `Result` wraps errors detected while *opening* the
989    ///   stream: [`Error::FeatureNotSupported`] on gRPC transport, and any
990    ///   `Parse`/`Bind` rejection or transport failure surfaced by
991    ///   [`query_params`](Self::query_params).
992    /// - Each yielded item is itself a `Result<T>`: `Ok(T)` when the row mapped
993    ///   cleanly, or `Err(e)` for a per-row mapping failure (missing column,
994    ///   type mismatch, NULL in a non-optional field) or a server/transport
995    ///   error hit while streaming a later chunk.
996    ///
997    /// As with [`stream_as`](Self::stream_as), always handle errors *both* on
998    /// the outer `Result` and on each item.
999    ///
1000    /// [`FromRow`]: crate::FromRow
1001    pub fn stream_as_params<'a, T>(
1002        &'a self,
1003        query: &str,
1004        params: &[&dyn crate::params::ToSqlParam],
1005    ) -> Result<impl Iterator<Item = Result<T>> + 'a>
1006    where
1007        T: crate::FromRow + 'a,
1008    {
1009        // `query_params` returns a Rowset that already carries the prepared
1010        // statement guard, so Drop ordering (close_statement after the rowset
1011        // releases its connection lock) is preserved with no extra work here.
1012        let rowset = self.query_params(query, params)?;
1013        Ok(crate::result::TypedRowIterator::<T>::new(rowset))
1014    }
1015
1016    /// Fetches a single scalar value from a query.
1017    ///
1018    /// Returns an error if the query returns no rows or NULL.
1019    ///
1020    /// # Example
1021    ///
1022    /// ```no_run
1023    /// use hyperdb_api::{Connection, CreateMode, Result};
1024    ///
1025    /// fn main() -> Result<()> {
1026    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
1027    ///     let count: i64 = conn.fetch_scalar("SELECT COUNT(*) FROM users")?;
1028    ///     println!("User count: {}", count);
1029    ///     Ok(())
1030    /// }
1031    /// ```
1032    ///
1033    /// # Errors
1034    ///
1035    /// - Returns the error from [`execute_query`](Self::execute_query) if
1036    ///   the query itself fails.
1037    /// - Returns [`Error::Conversion`] with message `"Query returned no rows"` if
1038    ///   the query produced zero rows.
1039    /// - Returns [`Error::Conversion`] with message `"Scalar query returned NULL"`
1040    ///   if the single cell is SQL `NULL`.
1041    pub fn fetch_scalar<T, Q>(&self, query: Q) -> Result<T>
1042    where
1043        T: crate::connection::ScalarValue + crate::result::RowValue,
1044        Q: AsRef<str>,
1045    {
1046        let query = query.as_ref();
1047        let result = self.execute_query(query)?;
1048        result.require_scalar()
1049    }
1050
1051    /// Fetches an optional scalar value from a query.
1052    ///
1053    /// Returns `None` if the query returns no rows or NULL.
1054    ///
1055    /// # Example
1056    ///
1057    /// ```no_run
1058    /// use hyperdb_api::{Connection, CreateMode, Result};
1059    ///
1060    /// fn main() -> Result<()> {
1061    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
1062    ///     let max_id: Option<i32> = conn.fetch_optional_scalar("SELECT MAX(id) FROM users")?;
1063    ///     println!("Max ID: {:?}", max_id);
1064    ///     Ok(())
1065    /// }
1066    /// ```
1067    ///
1068    /// # Errors
1069    ///
1070    /// - Returns the error from [`execute_query`](Self::execute_query) if
1071    ///   the query itself fails.
1072    /// - Returns [`Error::Conversion`] with message `"Query returned no rows"` if
1073    ///   the query produced zero rows. (An empty result is treated as an
1074    ///   error here because we need at least one row to inspect; SQL `NULL`
1075    ///   in the single cell yields `Ok(None)`.)
1076    pub fn fetch_optional_scalar<T, Q>(&self, query: Q) -> Result<Option<T>>
1077    where
1078        T: crate::connection::ScalarValue + crate::result::RowValue,
1079        Q: AsRef<str>,
1080    {
1081        let query = query.as_ref();
1082        let result = self.execute_query(query)?;
1083        result.scalar()
1084    }
1085
1086    /// Executes a scalar query and returns a single value of type `T`.
1087    ///
1088    /// Alias for [`fetch_optional_scalar`](Self::fetch_optional_scalar) for C++ API compatibility.
1089    ///
1090    /// # Errors
1091    ///
1092    /// See [`fetch_optional_scalar`](Self::fetch_optional_scalar).
1093    #[inline]
1094    pub fn execute_scalar_query<T>(&self, query: &str) -> Result<Option<T>>
1095    where
1096        T: ScalarValue + crate::result::RowValue,
1097    {
1098        self.fetch_optional_scalar(query)
1099    }
1100
1101    /// Queries for a count value, defaulting to 0 if NULL.
1102    ///
1103    /// This is optimized for COUNT queries which typically return 0
1104    /// instead of NULL when there are no matching rows.
1105    ///
1106    /// # Example
1107    ///
1108    /// ```no_run
1109    /// use hyperdb_api::{Connection, CreateMode, Result};
1110    ///
1111    /// fn main() -> Result<()> {
1112    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
1113    ///     let count = conn.query_count("SELECT COUNT(*) FROM users WHERE active = true")?;
1114    ///     println!("Active users: {}", count);
1115    ///     Ok(())
1116    /// }
1117    /// ```
1118    ///
1119    /// # Errors
1120    ///
1121    /// Returns the error from [`execute_query`](Self::execute_query) if the
1122    /// query fails or produces no rows. SQL `NULL` is mapped to `0`, not an
1123    /// error.
1124    pub fn query_count(&self, query: &str) -> Result<i64> {
1125        self.fetch_optional_scalar::<i64, _>(query)
1126            .map(|opt| opt.unwrap_or(0))
1127    }
1128
1129    // =========================================================================
1130    // Parameterized Queries (SQL Injection Safe)
1131    // =========================================================================
1132
1133    /// Executes a parameterized query, returning streaming results.
1134    ///
1135    /// This is safe to use with untrusted user input: parameters travel
1136    /// through the extended query protocol (Parse/Bind/Execute) as
1137    /// binary `HyperBinary` values and are never interpolated into the
1138    /// SQL string. For repeated executions of the same SQL with different
1139    /// values, prefer the explicit [`prepare`](Self::prepare) API — it
1140    /// returns a reusable [`PreparedStatement`](crate::PreparedStatement)
1141    /// that skips the Parse round-trip on every call.
1142    ///
1143    /// Under the hood, `query_params` is a one-shot
1144    /// prepare+execute+close: it prepares an unnamed statement, binds
1145    /// the parameters, starts streaming, and closes the statement when
1146    /// the returned [`Rowset`] is dropped.
1147    ///
1148    /// # Arguments
1149    ///
1150    /// * `query` - The SQL query with parameter placeholders (`$1`, `$2`, etc.)
1151    /// * `params` - Parameter values matching the placeholders
1152    ///
1153    /// # SQL Injection Prevention
1154    ///
1155    /// ```no_run
1156    /// use hyperdb_api::{Connection, CreateMode, Result};
1157    ///
1158    /// fn search_users(conn: &Connection, user_input: &str) -> Result<()> {
1159    ///     // DANGEROUS - vulnerable to SQL injection:
1160    ///     // let query = format!("SELECT * FROM users WHERE name = '{}'", user_input);
1161    ///
1162    ///     // SAFE - parameterized query:
1163    ///     let mut result = conn.query_params(
1164    ///         "SELECT * FROM users WHERE name = $1",
1165    ///         &[&user_input],
1166    ///     )?;
1167    ///
1168    ///     while let Some(chunk) = result.next_chunk()? {
1169    ///         for row in &chunk {
1170    ///             let id: Option<i32> = row.get(0);
1171    ///             let name: Option<String> = row.get(1);
1172    ///             println!("Found: {:?} - {:?}", id, name);
1173    ///         }
1174    ///     }
1175    ///     Ok(())
1176    /// }
1177    /// ```
1178    ///
1179    /// # Multiple Parameters
1180    ///
1181    /// ```no_run
1182    /// use hyperdb_api::{Connection, CreateMode, Result};
1183    ///
1184    /// fn main() -> Result<()> {
1185    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
1186    ///
1187    ///     // Multiple parameters of different types
1188    ///     let result = conn.query_params(
1189    ///         "SELECT * FROM orders WHERE customer_id = $1 AND total > $2",
1190    ///         &[&42i32, &100.0f64],
1191    ///     )?;
1192    ///     Ok(())
1193    /// }
1194    /// ```
1195    ///
1196    /// # Errors
1197    ///
1198    /// - Returns [`Error::FeatureNotSupported`] if the connection is using gRPC transport
1199    ///   (prepared statements are TCP-only).
1200    /// - Returns [`Error::Server`] if the server rejects the statement at
1201    ///   `Parse`, `Bind`, or `Execute` time, including on type-mismatch
1202    ///   between `params` and the inferred OIDs.
1203    /// - Returns [`Error::Io`] on transport-level I/O failures.
1204    pub fn query_params(
1205        &self,
1206        query: &str,
1207        params: &[&dyn crate::params::ToSqlParam],
1208    ) -> Result<Rowset<'_>> {
1209        // Implementation note: routes through the extended query protocol
1210        // via Parse/Bind/Execute so parameters travel in HyperBinary
1211        // format — no SQL escaping, full SQL-injection safety regardless of
1212        // parameter content. The statement handle is stashed inside the
1213        // returned Rowset so its Drop-time close_statement fires *after*
1214        // the rowset releases its connection lock (otherwise the close
1215        // would deadlock on the still-held mutex).
1216        let client = match &self.transport {
1217            Transport::Tcp(tcp) => &tcp.client,
1218            Transport::Grpc(_) => {
1219                return Err(Error::feature_not_supported(
1220                    "prepared statements are not supported over gRPC transport",
1221                ));
1222            }
1223        };
1224        let oids: Vec<crate::Oid> = params.iter().map(|p| p.sql_oid()).collect();
1225        let stmt = client.prepare_typed(query, &oids)?;
1226        let encoded: Vec<Option<Vec<u8>>> = params.iter().map(|p| p.encode_param()).collect();
1227        let stream =
1228            client.execute_streaming(&stmt, encoded, crate::result::DEFAULT_BINARY_CHUNK_SIZE)?;
1229        Ok(Rowset::from_prepared(stream).with_statement_guard(stmt))
1230    }
1231
1232    /// Executes a parameterized command that doesn't return rows.
1233    ///
1234    /// Use this for INSERT, UPDATE, DELETE, or DDL statements with parameters.
1235    /// Returns the number of affected rows.
1236    ///
1237    /// See [`query_params`](Self::query_params) for details on parameter
1238    /// handling and SQL injection prevention.
1239    ///
1240    /// # Example
1241    ///
1242    /// ```no_run
1243    /// use hyperdb_api::{Connection, CreateMode, Result};
1244    ///
1245    /// fn delete_user(conn: &Connection, user_id: i32) -> Result<u64> {
1246    ///     // Safe from SQL injection
1247    ///     conn.command_params("DELETE FROM users WHERE id = $1", &[&user_id])
1248    /// }
1249    /// ```
1250    ///
1251    /// # Errors
1252    ///
1253    /// - Returns [`Error::FeatureNotSupported`] if the connection is using gRPC transport.
1254    /// - Returns [`Error::Server`] if the server rejects the statement at
1255    ///   `Parse`, `Bind`, or `Execute` time.
1256    /// - Returns [`Error::Io`] on transport-level I/O failures.
1257    pub fn command_params(
1258        &self,
1259        query: &str,
1260        params: &[&dyn crate::params::ToSqlParam],
1261    ) -> Result<u64> {
1262        // One-shot prepare+execute with explicit OIDs — see `query_params`
1263        // for why we collect OIDs from each parameter.
1264        let client = match &self.transport {
1265            Transport::Tcp(tcp) => &tcp.client,
1266            Transport::Grpc(_) => {
1267                return Err(Error::feature_not_supported(
1268                    "prepared statements are not supported over gRPC transport",
1269                ));
1270            }
1271        };
1272        let oids: Vec<crate::Oid> = params.iter().map(|p| p.sql_oid()).collect();
1273        let stmt = client.prepare_typed(query, &oids)?;
1274        let encoded: Vec<Option<Vec<u8>>> = params.iter().map(|p| p.encode_param()).collect();
1275        Ok(client.execute_no_result(&stmt, encoded)?)
1276    }
1277
1278    /// Executes multiple SQL statements in a single call.
1279    ///
1280    /// Each statement is executed sequentially. If any statement fails,
1281    /// execution stops and the error is returned. Returns the total number
1282    /// of affected rows across all statements.
1283    ///
1284    /// This is more efficient than calling `execute_command` in a loop
1285    /// because it reduces round-trips for DDL scripts and multi-statement setup.
1286    ///
1287    /// # Example
1288    ///
1289    /// ```no_run
1290    /// use hyperdb_api::{Connection, CreateMode, Result};
1291    ///
1292    /// fn main() -> Result<()> {
1293    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
1294    ///     let total = conn.execute_batch(&[
1295    ///         "CREATE TABLE users (id INT, name TEXT)",
1296    ///         "INSERT INTO users VALUES (1, 'Alice')",
1297    ///         "INSERT INTO users VALUES (2, 'Bob')",
1298    ///     ])?;
1299    ///     println!("Total affected: {}", total);
1300    ///     Ok(())
1301    /// }
1302    /// ```
1303    ///
1304    /// # Errors
1305    ///
1306    /// Returns a wrapped [`Error::Internal`] on the first statement that fails;
1307    /// its `source` is the original [`Error::Server`] from
1308    /// [`execute_command`](Self::execute_command). The error message
1309    /// includes the failing statement's ordinal and an 80-character preview
1310    /// of its SQL.
1311    pub fn execute_batch(&self, statements: &[&str]) -> Result<u64> {
1312        let mut total = 0u64;
1313        for (i, stmt) in statements.iter().enumerate() {
1314            if !stmt.trim().is_empty() {
1315                total += self.execute_command(stmt).map_err(|e| {
1316                    let preview: String = stmt.chars().take(80).collect();
1317                    Error::internal(format!(
1318                        "execute_batch failed at statement {} of {}: {}: {}",
1319                        i + 1,
1320                        statements.len(),
1321                        preview,
1322                        e,
1323                    ))
1324                })?;
1325            }
1326        }
1327        Ok(total)
1328    }
1329
1330    /// Returns the attached database path, if any.
1331    pub fn database(&self) -> Option<&str> {
1332        self.database.as_deref()
1333    }
1334
1335    /// Creates a new database file.
1336    ///
1337    /// # Example
1338    ///
1339    /// ```no_run
1340    /// use hyperdb_api::{Connection, Result};
1341    ///
1342    /// fn main() -> Result<()> {
1343    ///     let conn = Connection::without_database("localhost:7483")?;
1344    ///     conn.create_database("new_database.hyper")?;
1345    ///     Ok(())
1346    /// }
1347    /// ```
1348    ///
1349    /// # Errors
1350    ///
1351    /// Returns [`Error::Server`] if the server rejects the
1352    /// `CREATE DATABASE IF NOT EXISTS` statement (e.g. the path is not
1353    /// writable on the server).
1354    pub fn create_database(&self, path: &str) -> Result<()> {
1355        let sql = format!("CREATE DATABASE IF NOT EXISTS {}", escape_sql_path(path));
1356        self.execute_command(&sql)?;
1357        Ok(())
1358    }
1359
1360    /// Drops (deletes) a database file.
1361    ///
1362    /// # Example
1363    ///
1364    /// ```no_run
1365    /// use hyperdb_api::{Connection, Result};
1366    ///
1367    /// fn main() -> Result<()> {
1368    ///     let conn = Connection::without_database("localhost:7483")?;
1369    ///     conn.drop_database("old_database.hyper")?;
1370    ///     Ok(())
1371    /// }
1372    /// ```
1373    ///
1374    /// # Errors
1375    ///
1376    /// Returns [`Error::Server`] if the server rejects the
1377    /// `DROP DATABASE IF EXISTS` statement (e.g. the database is still
1378    /// attached or permissions deny deletion).
1379    pub fn drop_database(&self, path: &str) -> Result<()> {
1380        let sql = format!("DROP DATABASE IF EXISTS {}", escape_sql_path(path));
1381        self.execute_command(&sql)?;
1382        Ok(())
1383    }
1384
1385    /// Attaches a database file to the connection.
1386    ///
1387    /// Once attached, the database can be queried and modified.
1388    /// The database is identified by its alias (or by its path if no alias is provided).
1389    ///
1390    /// # Arguments
1391    ///
1392    /// * `path` - The path to the database file to attach.
1393    /// * `alias` - Optional alias for the database. If `None`, the database is
1394    ///   attached without an explicit alias (typically using its filename).
1395    ///
1396    /// # Errors
1397    ///
1398    /// Returns an error if the database file doesn't exist or if attachment fails.
1399    ///
1400    /// # Example
1401    ///
1402    /// ```no_run
1403    /// use hyperdb_api::{Connection, Result};
1404    ///
1405    /// fn main() -> Result<()> {
1406    ///     let conn = Connection::without_database("localhost:7483")?;
1407    ///
1408    ///     // Attach with an alias
1409    ///     conn.attach_database("data.hyper", Some("mydata"))?;
1410    ///
1411    ///     // Attach without an alias
1412    ///     conn.attach_database("other.hyper", None)?;
1413    ///     Ok(())
1414    /// }
1415    /// ```
1416    pub fn attach_database(&self, path: &str, alias: Option<&str>) -> Result<()> {
1417        let sql = if let Some(alias) = alias {
1418            format!(
1419                "ATTACH DATABASE {} AS {}",
1420                escape_sql_path(path),
1421                escape_sql_path(alias)
1422            )
1423        } else {
1424            format!("ATTACH DATABASE {}", escape_sql_path(path))
1425        };
1426        self.execute_command(&sql)?;
1427        Ok(())
1428    }
1429
1430    /// Detaches a database from this connection.
1431    ///
1432    /// After detaching, the database file is released and can be accessed
1433    /// externally (e.g., copied, moved, etc.). All pending updates are
1434    /// written to disk before detaching.
1435    ///
1436    /// # Arguments
1437    ///
1438    /// * `alias` - The alias of the database to detach.
1439    ///
1440    /// # Errors
1441    ///
1442    /// Returns an error if the database is not attached or if detachment fails.
1443    ///
1444    /// # Example
1445    ///
1446    /// ```no_run
1447    /// use hyperdb_api::{Connection, Result};
1448    ///
1449    /// fn main() -> Result<()> {
1450    ///     let conn = Connection::without_database("localhost:7483")?;
1451    ///     conn.attach_database("data.hyper", Some("mydata"))?;
1452    ///     // ... work with the database ...
1453    ///     conn.detach_database("mydata")?;
1454    ///     Ok(())
1455    /// }
1456    /// ```
1457    pub fn detach_database(&self, alias: &str) -> Result<()> {
1458        let sql = format!("DETACH DATABASE {}", escape_sql_path(alias));
1459        self.execute_command(&sql)?;
1460        Ok(())
1461    }
1462
1463    /// Detaches all databases from this connection.
1464    ///
1465    /// This is useful for cleanup before closing a connection or when
1466    /// you need to release all database files.
1467    ///
1468    /// # Errors
1469    ///
1470    /// Returns [`Error::Server`] if the server rejects the
1471    /// `DETACH ALL DATABASES` statement (e.g. a database is still in use by
1472    /// another session).
1473    pub fn detach_all_databases(&self) -> Result<()> {
1474        self.execute_command("DETACH ALL DATABASES")?;
1475        Ok(())
1476    }
1477
1478    /// Creates a schema in the database.
1479    ///
1480    /// # Errors
1481    ///
1482    /// - Returns an error if `schema_name` cannot be converted into a
1483    ///   [`SchemaName`](crate::SchemaName) (invalid identifier).
1484    /// - Returns [`Error::Server`] if the server rejects the
1485    ///   `CREATE SCHEMA` statement (e.g. the schema already exists).
1486    pub fn create_schema<T>(&self, schema_name: T) -> Result<()>
1487    where
1488        T: TryInto<crate::SchemaName>,
1489        crate::Error: From<T::Error>,
1490    {
1491        crate::catalog::Catalog::new(self).create_schema(schema_name)
1492    }
1493
1494    /// Checks whether a schema exists.
1495    ///
1496    /// # Arguments
1497    ///
1498    /// * `schema` - The schema name (can include database qualifier).
1499    ///
1500    /// # Example
1501    ///
1502    /// ```no_run
1503    /// use hyperdb_api::{Connection, Result};
1504    ///
1505    /// fn main() -> Result<()> {
1506    ///     let conn = Connection::without_database("localhost:7483")?;
1507    ///     if conn.has_schema("public")? {
1508    ///         println!("Schema 'public' exists");
1509    ///     }
1510    ///     Ok(())
1511    /// }
1512    /// ```
1513    ///
1514    /// # Errors
1515    ///
1516    /// - Returns an error if `schema` cannot be converted into a
1517    ///   [`SchemaName`](crate::SchemaName).
1518    /// - Returns [`Error::Server`] if the catalog lookup query fails.
1519    pub fn has_schema<T>(&self, schema: T) -> Result<bool>
1520    where
1521        T: TryInto<crate::SchemaName>,
1522        crate::Error: From<T::Error>,
1523    {
1524        use crate::catalog::Catalog;
1525        Catalog::new(self).has_schema(schema)
1526    }
1527
1528    /// Checks whether a table exists.
1529    ///
1530    /// # Arguments
1531    ///
1532    /// * `table_name` - The table name (can include database and schema qualifiers).
1533    ///
1534    /// # Example
1535    ///
1536    /// ```no_run
1537    /// use hyperdb_api::{Connection, Result};
1538    ///
1539    /// fn main() -> Result<()> {
1540    ///     let conn = Connection::without_database("localhost:7483")?;
1541    ///     if conn.has_table("public.users")? {
1542    ///         println!("Table 'users' exists");
1543    ///     }
1544    ///     Ok(())
1545    /// }
1546    /// ```
1547    ///
1548    /// # Errors
1549    ///
1550    /// - Returns an error if `table_name` cannot be converted into a
1551    ///   [`TableName`](crate::TableName).
1552    /// - Returns [`Error::Server`] if the catalog lookup query fails.
1553    pub fn has_table<T>(&self, table_name: T) -> Result<bool>
1554    where
1555        T: TryInto<crate::TableName>,
1556        crate::Error: From<T::Error>,
1557    {
1558        use crate::catalog::Catalog;
1559        Catalog::new(self).has_table(table_name)
1560    }
1561
1562    /// Returns the server version as a parsed struct.
1563    ///
1564    /// Returns `None` if the version cannot be determined (e.g., gRPC connection).
1565    ///
1566    /// # Example
1567    ///
1568    /// ```no_run
1569    /// use hyperdb_api::{Connection, CreateMode, HyperProcess, ServerVersion, Result};
1570    ///
1571    /// fn main() -> Result<()> {
1572    ///     let hyper = HyperProcess::new(None, None)?;
1573    ///     let conn = Connection::new(&hyper, "test.hyper", CreateMode::CreateIfNotExists)?;
1574    ///     if let Some(version) = conn.server_version() {
1575    ///         println!("Hyper {}", version);
1576    ///         if version >= ServerVersion::new(0, 1, 0) {
1577    ///             println!("Has feature X");
1578    ///         }
1579    ///     }
1580    ///     Ok(())
1581    /// }
1582    /// ```
1583    pub fn server_version(&self) -> Option<crate::ServerVersion> {
1584        let version_str = self.parameter_status("server_version")?;
1585        crate::ServerVersion::parse(&version_str)
1586    }
1587
1588    /// Copies a database file to a new path.
1589    ///
1590    /// The source database must be attached to this connection.
1591    ///
1592    /// # Example
1593    ///
1594    /// ```no_run
1595    /// use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
1596    ///
1597    /// fn main() -> Result<()> {
1598    ///     let hyper = HyperProcess::new(None, None)?;
1599    ///     let conn = Connection::new(&hyper, "source.hyper", CreateMode::DoNotCreate)?;
1600    ///     conn.copy_database("source.hyper", "backup.hyper")?;
1601    ///     Ok(())
1602    /// }
1603    /// ```
1604    ///
1605    /// # Errors
1606    ///
1607    /// Returns [`Error::Server`] if the server rejects the
1608    /// `COPY DATABASE` statement — e.g. the source is not attached, the
1609    /// destination path is not writable, or it already exists.
1610    pub fn copy_database(&self, source: &str, destination: &str) -> Result<()> {
1611        let sql = format!(
1612            "COPY DATABASE {} TO {}",
1613            escape_sql_path(source),
1614            escape_sql_path(destination)
1615        );
1616        self.execute_command(&sql)?;
1617        Ok(())
1618    }
1619
1620    /// Executes EXPLAIN on a query and returns the plan as a string.
1621    ///
1622    /// # Example
1623    ///
1624    /// ```no_run
1625    /// use hyperdb_api::{Connection, CreateMode, Result};
1626    ///
1627    /// fn main() -> Result<()> {
1628    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
1629    ///     let plan = conn.explain("SELECT * FROM users WHERE id = 1")?;
1630    ///     println!("{}", plan);
1631    ///     Ok(())
1632    /// }
1633    /// ```
1634    ///
1635    /// # Errors
1636    ///
1637    /// Returns [`Error::Server`] if `EXPLAIN <query>` fails to parse or
1638    /// plan, or if the streamed result cannot be consumed.
1639    pub fn explain(&self, query: &str) -> Result<String> {
1640        let explain_sql = format!("EXPLAIN {query}");
1641        let result = self.execute_query(&explain_sql)?;
1642        let mut lines = Vec::new();
1643        for row in result.rows() {
1644            let row = row?;
1645            if let Some(line) = row.get::<String>(0) {
1646                lines.push(line);
1647            }
1648        }
1649        Ok(lines.join("\n"))
1650    }
1651
1652    /// Executes EXPLAIN ANALYZE on a query and returns the plan with timing info.
1653    ///
1654    /// **Note:** This actually executes the query to collect timing information.
1655    ///
1656    /// # Errors
1657    ///
1658    /// Returns [`Error::Server`] if `EXPLAIN ANALYZE <query>` fails — this
1659    /// includes any runtime error raised by actually executing `query`.
1660    pub fn explain_analyze(&self, query: &str) -> Result<String> {
1661        let explain_sql = format!("EXPLAIN ANALYZE {query}");
1662        let result = self.execute_query(&explain_sql)?;
1663        let mut lines = Vec::new();
1664        for row in result.rows() {
1665            let row = row?;
1666            if let Some(line) = row.get::<String>(0) {
1667                lines.push(line);
1668            }
1669        }
1670        Ok(lines.join("\n"))
1671    }
1672
1673    /// Returns a reference to the underlying TCP client.
1674    ///
1675    /// # Panics
1676    ///
1677    /// This method returns `None` if the connection is using gRPC transport.
1678    pub fn tcp_client(&self) -> Option<&Client> {
1679        match &self.transport {
1680            Transport::Tcp(tcp) => Some(&tcp.client),
1681            Transport::Grpc(_) => None,
1682        }
1683    }
1684
1685    /// Crate-internal accessor for the transport. Used by
1686    /// [`PreparedStatement`](crate::PreparedStatement) to reach the
1687    /// underlying `hyperdb_api_core::client::Client`.
1688    pub(crate) fn transport(&self) -> &Transport {
1689        &self.transport
1690    }
1691
1692    /// Prepares a SQL statement with automatic parameter type inference.
1693    ///
1694    /// The returned [`PreparedStatement`](crate::PreparedStatement) can
1695    /// be executed many times with different parameter values; the
1696    /// server caches the parsed plan. This is the preferred way to
1697    /// execute a statement repeatedly inside a loop.
1698    ///
1699    /// For explicit parameter types (necessary when `$N` placeholders
1700    /// would otherwise be ambiguous), use
1701    /// [`prepare_typed`](Self::prepare_typed).
1702    ///
1703    /// # Example
1704    ///
1705    /// ```no_run
1706    /// # use hyperdb_api::{Connection, CreateMode, Result};
1707    /// # fn example(conn: &Connection) -> Result<()> {
1708    /// let stmt = conn.prepare("SELECT name FROM users WHERE id = $1")?;
1709    /// for id in [1_i32, 2, 3] {
1710    ///     let name: String = stmt.fetch_scalar(&[&id])?;
1711    ///     println!("{id}: {name}");
1712    /// }
1713    /// # Ok(())
1714    /// # }
1715    /// ```
1716    ///
1717    /// # Errors
1718    ///
1719    /// See [`prepare_typed`](Self::prepare_typed) — this method delegates
1720    /// to it with an empty OID list.
1721    pub fn prepare(&self, query: &str) -> Result<crate::PreparedStatement<'_>> {
1722        self.prepare_typed(query, &[])
1723    }
1724
1725    /// Prepares a SQL statement with explicit parameter type OIDs.
1726    ///
1727    /// Use this when the server cannot infer parameter types from the
1728    /// SQL alone (e.g. a bare `$1` in a `WHERE v > $1` clause with no
1729    /// other context). Constants for common types live in
1730    /// [`hyperdb_api_core::types::oids`].
1731    ///
1732    /// # Errors
1733    ///
1734    /// - Returns [`Error::FeatureNotSupported`] if the connection is using gRPC transport
1735    ///   (prepared statements are TCP-only).
1736    /// - Returns [`Error::Server`] if the server rejects the `Parse`
1737    ///   message, e.g. SQL syntax error or unknown OID.
1738    /// - Returns [`Error::Io`] on transport-level I/O failures.
1739    pub fn prepare_typed(
1740        &self,
1741        query: &str,
1742        param_types: &[crate::Oid],
1743    ) -> Result<crate::PreparedStatement<'_>> {
1744        let client = match &self.transport {
1745            Transport::Tcp(tcp) => &tcp.client,
1746            Transport::Grpc(_) => {
1747                return Err(Error::feature_not_supported(
1748                    "prepared statements are not supported over gRPC transport",
1749                ));
1750            }
1751        };
1752        let inner = client.prepare_typed(query, param_types)?;
1753        crate::PreparedStatement::new(self, inner)
1754    }
1755
1756    /// Returns true if the connection is alive (passive check).
1757    ///
1758    /// This is a lightweight check that does not send any data to the server.
1759    /// For an active health check, use [`ping`](Self::ping).
1760    pub fn is_alive(&self) -> bool {
1761        match &self.transport {
1762            Transport::Tcp(tcp) => tcp.client.is_alive(),
1763            Transport::Grpc(_) => true, // gRPC connections are stateless
1764        }
1765    }
1766
1767    /// Actively checks that the connection is healthy by executing a trivial query.
1768    ///
1769    /// Unlike [`is_alive`](Self::is_alive) which only checks local state,
1770    /// this method sends `SELECT 1` to the server and verifies a response.
1771    ///
1772    /// # Example
1773    ///
1774    /// ```no_run
1775    /// # use hyperdb_api::{Connection, CreateMode, Result};
1776    /// # fn example(conn: &Connection) -> Result<()> {
1777    /// if conn.ping().is_ok() {
1778    ///     println!("Connection is healthy");
1779    /// }
1780    /// # Ok(())
1781    /// # }
1782    /// ```
1783    ///
1784    /// # Errors
1785    ///
1786    /// Returns [`Error::Server`] or [`Error::Io`] if the `SELECT 1`
1787    /// round-trip fails — i.e. the connection is no longer usable.
1788    pub fn ping(&self) -> Result<()> {
1789        self.execute_command("SELECT 1")?;
1790        Ok(())
1791    }
1792
1793    /// Returns the process ID of the backend server connection.
1794    ///
1795    /// Returns 0 for gRPC connections (not applicable).
1796    pub fn process_id(&self) -> i32 {
1797        match &self.transport {
1798            Transport::Tcp(tcp) => tcp.client.process_id(),
1799            Transport::Grpc(_) => 0,
1800        }
1801    }
1802
1803    /// Returns the secret key for the backend server connection.
1804    ///
1805    /// This is used for cancellation requests.
1806    /// Returns 0 for gRPC connections (not applicable).
1807    pub fn secret_key(&self) -> i32 {
1808        match &self.transport {
1809            Transport::Tcp(tcp) => tcp.client.secret_key(),
1810            Transport::Grpc(_) => 0,
1811        }
1812    }
1813
1814    /// Returns a server parameter value by name.
1815    ///
1816    /// Server parameters are sent by the server during connection startup.
1817    /// Common parameters include:
1818    /// - `server_version` - The server version string
1819    /// - `server_encoding` - The server's character encoding
1820    /// - `client_encoding` - The client's character encoding
1821    /// - `DateStyle` - Date display format
1822    /// - `TimeZone` - Server timezone
1823    /// - `session_identifier` - Session ID for connection migration (if routing enabled)
1824    ///
1825    /// Returns `None` if the parameter is not known.
1826    ///
1827    /// # Example
1828    ///
1829    /// ```no_run
1830    /// use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
1831    ///
1832    /// fn main() -> Result<()> {
1833    ///     let hyper = HyperProcess::new(None, None)?;
1834    ///     let conn = Connection::new(&hyper, "test.hyper", CreateMode::CreateIfNotExists)?;
1835    ///
1836    ///     if let Some(version) = conn.parameter_status("server_version") {
1837    ///         println!("Connected to Hyper version: {}", version);
1838    ///     }
1839    ///     Ok(())
1840    /// }
1841    /// ```
1842    pub fn parameter_status(&self, name: &str) -> Option<String> {
1843        match &self.transport {
1844            Transport::Tcp(tcp) => tcp.client.parameter_status(name),
1845            Transport::Grpc(_) => None, // gRPC doesn't have server parameters
1846        }
1847    }
1848
1849    /// Sets the notice receiver for this connection.
1850    ///
1851    /// Server notices and warnings are passed to this callback instead of being
1852    /// logged. Pass `None` to restore default logging behavior.
1853    pub fn set_notice_receiver(
1854        &mut self,
1855        receiver: Option<hyperdb_api_core::client::NoticeReceiver>,
1856    ) {
1857        match &mut self.transport {
1858            Transport::Tcp(tcp) => tcp.client.set_notice_receiver(receiver),
1859            Transport::Grpc(_) => {} // gRPC doesn't support notice receivers
1860        }
1861    }
1862
1863    /// Cancels the currently executing query (thread-safe).
1864    ///
1865    /// # Errors
1866    ///
1867    /// - Returns [`Error::FeatureNotSupported`] on gRPC connections — cancellation is not
1868    ///   yet implemented for gRPC transport.
1869    /// - Returns [`Error::Connection`] or [`Error::Io`] if the separate
1870    ///   cancel-request connection to the server fails.
1871    pub fn cancel(&self) -> Result<()> {
1872        match &self.transport {
1873            Transport::Tcp(tcp) => tcp.client.cancel().map_err(Error::from),
1874            Transport::Grpc(_) => Err(Error::feature_not_supported(
1875                "Query cancellation is not yet supported for gRPC connections.",
1876            )),
1877        }
1878    }
1879
1880    /// Closes the connection, detaching all databases first.
1881    ///
1882    /// # Errors
1883    ///
1884    /// - Returns [`Error::Internal`] wrapping the underlying close failure
1885    ///   (its `source` is the transport error) if the client cannot be
1886    ///   shut down cleanly.
1887    /// - Returns [`Error::Internal`] wrapping the detach failure if the
1888    ///   attached database could not be detached but close itself
1889    ///   succeeded.
1890    pub fn close(self) -> Result<()> {
1891        // Detach the attached database to ensure files are flushed and released.
1892        // Always attempt close, even if detach fails.
1893        let detach_err = if let Some(ref db_path) = self.database {
1894            let db_alias = std::path::Path::new(db_path)
1895                .file_stem()
1896                .and_then(|s| s.to_str())
1897                .unwrap_or("db");
1898            self.execute_command(&format!("DETACH DATABASE {}", escape_sql_path(db_alias)))
1899                .err()
1900        } else {
1901            None
1902        };
1903
1904        // Always attempt to close the client to release the connection.
1905        let close_result = match self.transport {
1906            Transport::Tcp(tcp) => tcp.client.close(),
1907            Transport::Grpc(_) => Ok(()), // gRPC connections are stateless
1908        };
1909
1910        if let Err(e) = close_result {
1911            return Err(Error::internal(format!("Failed to close connection: {e}")));
1912        }
1913
1914        if let Some(e) = detach_err {
1915            // Detach failed but close succeeded; surface the detach error.
1916            return Err(Error::internal(format!(
1917                "Failed to detach database during close: {e}"
1918            )));
1919        }
1920
1921        Ok(())
1922    }
1923
1924    /// Unloads the database from memory while keeping the connection active.
1925    ///
1926    /// This executes the `UNLOAD DATABASE` command, which releases the database
1927    /// from memory but keeps the session and connection open. The database can
1928    /// be accessed again by subsequent queries that will automatically reload it.
1929    ///
1930    /// This is useful for releasing memory locks when switching between databases
1931    /// or when working with multiple database files.
1932    ///
1933    /// # Example
1934    ///
1935    /// ```no_run
1936    /// use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
1937    ///
1938    /// fn main() -> Result<()> {
1939    ///     let hyper = HyperProcess::new(None, None)?;
1940    ///     let conn = Connection::new(&hyper, "test.hyper", CreateMode::Create)?;
1941    ///     
1942    ///     // Do some work with the database
1943    ///     conn.execute_command("CREATE TABLE test (id INT)")?;
1944    ///     
1945    ///     // Unload from memory (but keep connection)
1946    ///     conn.unload_database()?;
1947    ///     
1948    ///     // Database can still be accessed (will be reloaded automatically)
1949    ///     let count: i64 = conn.fetch_scalar("SELECT COUNT(*) FROM test")?;
1950    ///     println!("Count: {}", count);
1951    ///     
1952    ///     Ok(())
1953    /// }
1954    /// ```
1955    ///
1956    /// # Errors
1957    ///
1958    /// Returns [`Error::Server`] if the server rejects the `UNLOAD DATABASE`
1959    /// command (e.g. the database is still in use by another session).
1960    pub fn unload_database(&self) -> Result<()> {
1961        self.execute_command("UNLOAD DATABASE")?;
1962        Ok(())
1963    }
1964
1965    /// Releases the database completely from the session.
1966    ///
1967    /// This executes the `UNLOAD RELEASE` command, which completely releases
1968    /// the database from the session. After this call, the database cannot
1969    /// be accessed until a new connection is established.
1970    ///
1971    /// This is useful for completely freeing database resources when you're
1972    /// done with a database and want to ensure no locks are held.
1973    ///
1974    /// **Note:** This should only be used when the session has exactly one
1975    /// database attached. Hyper does not support `UNLOAD RELEASE` with
1976    /// multiple databases attached to the same session.
1977    ///
1978    /// # Example
1979    ///
1980    /// ```no_run
1981    /// use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
1982    ///
1983    /// fn main() -> Result<()> {
1984    ///     let hyper = HyperProcess::new(None, None)?;
1985    ///     let conn = Connection::new(&hyper, "test.hyper", CreateMode::Create)?;
1986    ///     
1987    ///     // Do some work with the database
1988    ///     conn.execute_command("CREATE TABLE test (id INT)")?;
1989    ///     
1990    ///     // Release database completely from session
1991    ///     conn.unload_release()?;
1992    ///     
1993    ///     // Database cannot be accessed after this point without new connection
1994    ///     // conn.execute_command("SELECT * FROM test")?; // This would fail
1995    ///     
1996    ///     Ok(())
1997    /// }
1998    /// ```
1999    ///
2000    /// # Errors
2001    ///
2002    /// Returns [`Error::Server`] if the server rejects `UNLOAD RELEASE`, most
2003    /// commonly because multiple databases are attached to the same session
2004    /// (Hyper only supports `UNLOAD RELEASE` with exactly one attached DB).
2005    pub fn unload_release(&self) -> Result<()> {
2006        self.execute_command("UNLOAD RELEASE")?;
2007        Ok(())
2008    }
2009
2010    // =========================================================================
2011    // Query Statistics
2012    // =========================================================================
2013
2014    /// Enables query statistics collection for this connection.
2015    ///
2016    /// After enabling, each `execute_command()` or `execute_query()` call will
2017    /// capture detailed performance metrics from Hyper. Retrieve them via
2018    /// [`last_query_stats()`](Self::last_query_stats).
2019    ///
2020    /// The provider determines how stats are collected. Use
2021    /// [`LogFileStatsProvider`](crate::LogFileStatsProvider) to parse Hyper's log file (requires local
2022    /// `hyperd.log`), or implement a custom [`QueryStatsProvider`](crate::QueryStatsProvider).
2023    ///
2024    /// # Example
2025    ///
2026    /// ```no_run
2027    /// # use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
2028    /// # fn main() -> Result<()> {
2029    /// # let hyper = HyperProcess::new(None, None)?;
2030    /// # let mut conn = Connection::new(&hyper, "test.hyper", CreateMode::CreateIfNotExists)?;
2031    /// use hyperdb_api::LogFileStatsProvider;
2032    ///
2033    /// // Auto-detect log path from HyperProcess
2034    /// conn.enable_query_stats(LogFileStatsProvider::from_process(&hyper));
2035    ///
2036    /// // Or specify an explicit log path
2037    /// // conn.enable_query_stats(LogFileStatsProvider::new("/path/to/hyperd.log"));
2038    /// # Ok(())
2039    /// # }
2040    /// ```
2041    pub fn enable_query_stats(&mut self, provider: impl QueryStatsProvider + 'static) {
2042        self.stats_provider = Some(Arc::new(provider));
2043    }
2044
2045    /// Disables query statistics collection.
2046    ///
2047    /// After calling this, `last_query_stats()` will return `None`.
2048    pub fn disable_query_stats(&mut self) {
2049        self.stats_provider = None;
2050        if let Ok(mut guard) = self.pending_stats.lock() {
2051            *guard = None;
2052        }
2053    }
2054
2055    /// Returns the query statistics from the most recent query execution.
2056    ///
2057    /// Stats are resolved **lazily** — the log file is read when this method
2058    /// is called, not when the query executes. This is important for streaming
2059    /// queries (`execute_query`), where Hyper writes the execution stats only
2060    /// after the result set is fully consumed.
2061    ///
2062    /// **Call this after consuming the result set** (e.g., after `collect_rows()`,
2063    /// iterating all chunks, or dropping the `Rowset`).
2064    ///
2065    /// Returns `None` if:
2066    /// - Query stats collection is not enabled
2067    /// - No query has been executed yet
2068    /// - Stats could not be found for the last query (e.g., log entry not matched)
2069    ///
2070    /// # Example
2071    ///
2072    /// ```no_run
2073    /// # use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
2074    /// # fn main() -> Result<()> {
2075    /// # let hyper = HyperProcess::new(None, None)?;
2076    /// # let mut conn = Connection::new(&hyper, "test.hyper", CreateMode::CreateIfNotExists)?;
2077    /// # use hyperdb_api::LogFileStatsProvider;
2078    /// # conn.enable_query_stats(LogFileStatsProvider::from_process(&hyper));
2079    /// conn.execute_command("CREATE TABLE t (id INT)")?;
2080    ///
2081    /// if let Some(stats) = conn.last_query_stats() {
2082    ///     println!("Total: {}s", stats.elapsed_s);
2083    ///     if let Some(ref pre) = stats.pre_execution {
2084    ///         println!("  Parse: {:?}s", pre.parsing_time_s);
2085    ///         println!("  Compile: {:?}s", pre.compilation_time_s);
2086    ///     }
2087    ///     if let Some(ref exec) = stats.execution {
2088    ///         println!("  Execute: {:?}s", exec.elapsed_s);
2089    ///         println!("  Peak mem: {:?} MB", exec.peak_memory_mb);
2090    ///     }
2091    /// }
2092    /// # Ok(())
2093    /// # }
2094    /// ```
2095    pub fn last_query_stats(&self) -> Option<QueryStats> {
2096        let provider = self.stats_provider.as_ref()?;
2097        let mut guard = self.pending_stats.lock().ok()?;
2098        let (token, sql) = guard.take()?;
2099        provider.after_query(token, &sql)
2100    }
2101
2102    /// Internal: call provider's `before_query` if stats are enabled.
2103    fn stats_before_query(&self, sql: &str) -> Option<Box<dyn Any + Send>> {
2104        self.stats_provider.as_ref().map(|p| p.before_query(sql))
2105    }
2106
2107    /// Internal: store the pending token+sql for lazy resolution.
2108    fn stats_store_pending(&self, token: Option<Box<dyn Any + Send>>, sql: &str) {
2109        if let Some(token) = token {
2110            if let Ok(mut guard) = self.pending_stats.lock() {
2111                *guard = Some((token, sql.to_string()));
2112            }
2113        }
2114    }
2115}
2116
2117impl Connection {
2118    // =========================================================================
2119    // Transaction Control
2120    // =========================================================================
2121
2122    // -------------------------------------------------------------------
2123    // Raw transaction control (internal)
2124    // -------------------------------------------------------------------
2125    //
2126    // The `*_raw` methods below are `pub(crate)` and form the canonical
2127    // implementation of session-level transaction control. The RAII
2128    // guard at `crate::Transaction` and any internal helper that
2129    // genuinely needs `&self` (rather than the guard's `&mut self`)
2130    // delegate to these.
2131    //
2132    // The matching `pub` methods (`begin_transaction`, `commit`,
2133    // `rollback`) are thin `#[doc(hidden)] #[deprecated]` wrappers
2134    // retained only so any pre-existing downstream caller sees a
2135    // compiler warning rather than a hard break. They will be deleted
2136    // in a future release; the `_raw` methods stay.
2137
2138    /// Issues `BEGIN TRANSACTION`. Crate-internal use only.
2139    pub(crate) fn begin_transaction_raw(&self) -> Result<()> {
2140        self.execute_command("BEGIN TRANSACTION")?;
2141        Ok(())
2142    }
2143
2144    /// Issues `COMMIT`. Crate-internal use only.
2145    pub(crate) fn commit_raw(&self) -> Result<()> {
2146        self.execute_command("COMMIT")?;
2147        Ok(())
2148    }
2149
2150    /// Issues `ROLLBACK`. Crate-internal use only.
2151    pub(crate) fn rollback_raw(&self) -> Result<()> {
2152        self.execute_command("ROLLBACK")?;
2153        Ok(())
2154    }
2155
2156    /// Begins an explicit transaction.
2157    ///
2158    /// **Prefer [`transaction()`](Self::transaction)** — the RAII guard
2159    /// auto-rolls back on drop and cannot leak a half-open transaction
2160    /// across error paths. This method is hidden from generated
2161    /// rustdoc and marked deprecated; it will be removed in a future
2162    /// release.
2163    ///
2164    /// # Errors
2165    ///
2166    /// Returns [`Error::Server`] if the server rejects `BEGIN TRANSACTION`
2167    /// (e.g. a transaction is already open on this session).
2168    #[doc(hidden)]
2169    #[deprecated(
2170        note = "Use `Connection::transaction()` for an RAII guard. This method will be removed \
2171                in a future release."
2172    )]
2173    pub fn begin_transaction(&self) -> Result<()> {
2174        self.begin_transaction_raw()
2175    }
2176
2177    /// Commits the current transaction.
2178    ///
2179    /// **Prefer [`Transaction::commit`](crate::Transaction::commit)** on
2180    /// the RAII guard returned by [`transaction()`](Self::transaction).
2181    /// Hidden from generated rustdoc and deprecated; slated for removal.
2182    ///
2183    /// # Errors
2184    ///
2185    /// Returns [`Error::Server`] if the server rejects `COMMIT`.
2186    #[doc(hidden)]
2187    #[deprecated(
2188        note = "Use `Transaction::commit()` on the RAII guard from `Connection::transaction()`. \
2189                This method will be removed in a future release."
2190    )]
2191    pub fn commit(&self) -> Result<()> {
2192        self.commit_raw()
2193    }
2194
2195    /// Rolls back the current transaction.
2196    ///
2197    /// **Prefer [`Transaction::rollback`](crate::Transaction::rollback)**
2198    /// on the RAII guard returned by [`transaction()`](Self::transaction).
2199    /// Hidden from generated rustdoc and deprecated; slated for removal.
2200    ///
2201    /// # Errors
2202    ///
2203    /// Returns [`Error::Server`] if the server rejects `ROLLBACK`.
2204    #[doc(hidden)]
2205    #[deprecated(
2206        note = "Use `Transaction::rollback()` on the RAII guard from `Connection::transaction()`. \
2207                This method will be removed in a future release."
2208    )]
2209    pub fn rollback(&self) -> Result<()> {
2210        self.rollback_raw()
2211    }
2212
2213    /// Starts a transaction and returns an RAII guard that auto-rolls back on drop.
2214    ///
2215    /// The returned [`Transaction`](crate::Transaction) exclusively borrows this connection,
2216    /// preventing any other use of the connection while the transaction is active.
2217    /// This is enforced at compile time by Rust's borrow checker. The guard provides
2218    /// `commit()` and `rollback()` methods. If dropped without calling either, the
2219    /// transaction is automatically rolled back.
2220    ///
2221    /// # Example
2222    ///
2223    /// ```no_run
2224    /// # use hyperdb_api::{Connection, CreateMode, Result};
2225    /// # fn main() -> Result<()> {
2226    /// # let mut conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
2227    /// let txn = conn.transaction()?;
2228    /// txn.execute_command("INSERT INTO users VALUES (1, 'Alice')")?;
2229    /// txn.commit()?; // or drop `txn` to auto-rollback
2230    /// # Ok(())
2231    /// # }
2232    /// ```
2233    ///
2234    /// # Errors
2235    ///
2236    /// Returns [`Error::Server`] if the server rejects the `BEGIN`
2237    /// statement issued internally by
2238    /// [`Transaction::new`](crate::Transaction).
2239    pub fn transaction(&mut self) -> Result<crate::Transaction<'_>> {
2240        crate::Transaction::new(self)
2241    }
2242}
2243
2244/// Checks if an error indicates an "already exists" condition based on SQLSTATE codes.
2245///
2246/// This function uses `PostgreSQL` SQLSTATE codes to reliably detect duplicate object errors
2247/// regardless of server locale or message formatting. The codes checked are:
2248/// - `42P04`: Database already exists
2249/// - `42710`: Duplicate object
2250/// - `42P06`: Duplicate schema
2251/// - `42P07`: Duplicate table
2252///
2253/// See: <https://www.postgresql.org/docs/current/errcodes-appendix.html>
2254fn is_already_exists_error(err: &Error) -> bool {
2255    err.sqlstate()
2256        .is_some_and(|code| matches!(code, "42P04" | "42710" | "42P06" | "42P07"))
2257}