hyperdb_api/connection.rs
1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Database connection management.
5//!
6//! The [`Connection`] type provides a unified interface for connecting to Hyper
7//! databases via either TCP (`PostgreSQL` wire protocol) or gRPC transport.
8//! The transport is automatically detected from the endpoint URL:
9//!
10//! - `https://` or `http://` → gRPC transport
11//! - Otherwise → TCP transport (e.g., `localhost:7483`)
12
13use std::path::Path;
14
15use hyperdb_api_core::client::Client;
16
17use crate::error::{Error, Result};
18use crate::names::escape_sql_path;
19use crate::process::HyperProcess;
20use crate::result::{Row, Rowset, DEFAULT_BINARY_CHUNK_SIZE};
21use crate::transport::Transport;
22
23use std::any::Any;
24use std::sync::{Arc, Mutex};
25
26use crate::query_stats::{QueryStats, QueryStatsProvider};
27
28/// Trait for types that can be extracted from a scalar query result.
29///
30/// This trait enables the generic [`Connection::execute_scalar_query`] method,
31/// similar to C++'s `executeScalarQuery<T>()` template.
32///
33/// # Implementing Custom Types
34///
35/// You can implement this trait for custom types to use them with `execute_scalar_query`:
36///
37/// ```no_run
38/// # use hyperdb_api::{Row, ScalarValue};
39/// # struct MyType;
40/// # impl MyType { fn parse(s: &str) -> Self { MyType } }
41/// impl ScalarValue for MyType {
42/// fn from_row(row: &Row, col: usize) -> Option<Self> {
43/// row.get_string(col).map(|s| MyType::parse(&s))
44/// }
45/// }
46/// ```
47pub trait ScalarValue: Sized {
48 /// Extracts a value of this type from a row at the given column.
49 fn from_row(row: &Row, col: usize) -> Option<Self>;
50}
51
52impl ScalarValue for i64 {
53 fn from_row(row: &Row, col: usize) -> Option<Self> {
54 row.get_i64(col)
55 }
56}
57
58impl ScalarValue for i32 {
59 fn from_row(row: &Row, col: usize) -> Option<Self> {
60 row.get_i32(col)
61 }
62}
63
64impl ScalarValue for i16 {
65 fn from_row(row: &Row, col: usize) -> Option<Self> {
66 row.get_i16(col)
67 }
68}
69
70impl ScalarValue for f64 {
71 fn from_row(row: &Row, col: usize) -> Option<Self> {
72 row.get_f64(col)
73 }
74}
75
76impl ScalarValue for bool {
77 fn from_row(row: &Row, col: usize) -> Option<Self> {
78 row.get_bool(col)
79 }
80}
81
82impl ScalarValue for String {
83 fn from_row(row: &Row, col: usize) -> Option<Self> {
84 row.get_string(col)
85 }
86}
87
88/// Database creation mode when connecting.
89#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
90pub enum CreateMode {
91 /// Do not create the database. Method will fail if database doesn't exist.
92 #[default]
93 DoNotCreate,
94 /// Create the database. Method will fail if the database already exists.
95 Create,
96 /// Create the database if it doesn't exist.
97 CreateIfNotExists,
98 /// Create the database. If it already exists, drop the old one first.
99 CreateAndReplace,
100}
101
102/// A connection to a Hyper database.
103///
104/// This struct represents an active connection to a Hyper server and optionally
105/// an attached database. The connection is automatically closed when dropped.
106///
107/// # Transport Auto-Detection
108///
109/// The transport is automatically detected from the endpoint URL:
110/// - `https://` or `http://` → gRPC transport (read-only until server supports writes)
111/// - Otherwise → TCP transport (full read/write support)
112///
113/// # CSV / Text Import & Export
114///
115/// For CSV, TSV, and other delimited-text formats, see the [`copy`](crate::copy)
116/// module which provides [`export_csv()`](Self::export_csv),
117/// [`import_csv()`](Self::import_csv), and related methods on this struct.
118///
119/// # Example
120///
121/// ```no_run
122/// use hyperdb_api::{Connection, CreateMode, Result};
123///
124/// fn main() -> Result<()> {
125/// // TCP connection (full read/write)
126/// let conn = Connection::connect("localhost:7483", "example.hyper", CreateMode::CreateIfNotExists)?;
127///
128/// // Execute SQL commands
129/// conn.execute_command("CREATE TABLE test (id INT, name TEXT)")?;
130/// conn.execute_command("INSERT INTO test VALUES (1, 'Hello')")?;
131///
132/// Ok(())
133/// }
134/// ```
135///
136/// ```no_run
137/// # use hyperdb_api::{Connection, CreateMode, Result};
138/// # fn example() -> Result<()> {
139/// // gRPC connection (read-only, auto-detected from URL)
140/// let conn = Connection::connect(
141/// "https://hyper-server.example.com:443",
142/// "example.hyper",
143/// CreateMode::DoNotCreate, // Must be DoNotCreate for gRPC
144/// )?;
145/// # Ok(())
146/// # }
147/// ```
148pub struct Connection {
149 transport: Transport,
150 database: Option<String>,
151 stats_provider: Option<Arc<dyn QueryStatsProvider>>,
152 /// Pending stats token + SQL from the most recent query, resolved lazily.
153 pending_stats: Mutex<Option<(Box<dyn Any + Send>, String)>>,
154}
155
156impl std::fmt::Debug for Connection {
157 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158 f.debug_struct("Connection")
159 .field("database", &self.database)
160 .finish_non_exhaustive()
161 }
162}
163
164impl Connection {
165 /// Creates a new connection to a Hyper instance with a database.
166 ///
167 /// This is the primary way to connect to a running [`HyperProcess`].
168 ///
169 /// # Arguments
170 ///
171 /// * `instance` - The Hyper server instance to connect to.
172 /// * `database_path` - Path to the database file.
173 /// * `create_mode` - How to handle database creation.
174 ///
175 /// # Errors
176 ///
177 /// Returns an error if the connection could not be established.
178 ///
179 /// # Example
180 ///
181 /// ```no_run
182 /// use hyperdb_api::{HyperProcess, Connection, CreateMode, Result};
183 ///
184 /// fn main() -> Result<()> {
185 /// let hyper = HyperProcess::new(None, None)?;
186 /// let conn = Connection::new(&hyper, "database.hyper", CreateMode::CreateIfNotExists)?;
187 /// Ok(())
188 /// }
189 /// ```
190 pub fn new(
191 instance: &HyperProcess,
192 database_path: impl AsRef<Path>,
193 create_mode: CreateMode,
194 ) -> Result<Self> {
195 // Prefer using the connection_endpoint which properly handles UDS/Named Pipes
196 if let Some(conn_endpoint) = instance.connection_endpoint() {
197 return Self::connect_with_endpoint(
198 conn_endpoint,
199 &database_path.as_ref().to_string_lossy(),
200 create_mode,
201 );
202 }
203
204 // Fall back to string endpoint (TCP)
205 let endpoint = instance.require_endpoint()?;
206 Self::connect(
207 endpoint,
208 &database_path.as_ref().to_string_lossy(),
209 create_mode,
210 )
211 }
212
213 /// Connects using a `ConnectionEndpoint` (supports TCP, UDS, and Named Pipes).
214 fn connect_with_endpoint(
215 endpoint: &hyperdb_api_core::client::ConnectionEndpoint,
216 database_path: &str,
217 create_mode: CreateMode,
218 ) -> Result<Self> {
219 let db_path_str = Some(database_path.to_string());
220
221 let config = hyperdb_api_core::client::Config::new().with_user("tableau_internal_user");
222
223 let client = hyperdb_api_core::client::Client::connect_endpoint(endpoint, &config)?;
224
225 let conn = Connection::from_client(client, db_path_str.clone());
226
227 // Handle database creation
228 if let Some(db_path) = db_path_str {
229 conn.handle_creation_mode(&db_path, create_mode)?;
230 conn.attach_and_set_path(&db_path)?;
231 }
232
233 Ok(conn)
234 }
235
236 /// Connects to a Hyper server and optionally attaches a database.
237 ///
238 /// # Arguments
239 ///
240 /// * `endpoint` - The server endpoint (host:port).
241 /// * `database_path` - Path to the database file.
242 /// * `create_mode` - How to handle database creation.
243 ///
244 /// # Errors
245 ///
246 /// Returns an error if the connection could not be established.
247 pub fn connect(endpoint: &str, database_path: &str, create_mode: CreateMode) -> Result<Self> {
248 crate::ConnectionBuilder::new(endpoint)
249 .database(database_path)
250 .create_mode(create_mode)
251 .build()
252 }
253
254 /// Returns a connection builder for advanced configuration.
255 ///
256 /// This is useful when you need to set authentication, timeouts, or
257 /// other advanced options before connecting.
258 #[must_use]
259 pub fn builder(endpoint: &str) -> crate::ConnectionBuilder {
260 crate::ConnectionBuilder::new(endpoint)
261 }
262
263 /// Creates a Connection from a low-level Client (internal use, TCP only).
264 pub(crate) fn from_client(client: Client, database: Option<String>) -> Self {
265 Connection {
266 transport: Transport::Tcp(Box::new(crate::transport::TcpTransport { client })),
267 database,
268 stats_provider: None,
269 pending_stats: Mutex::new(None),
270 }
271 }
272
273 /// Creates a Connection from a Transport (internal use).
274 #[allow(
275 dead_code,
276 reason = "used by ConnectionBuilder for the gRPC path; not reached under non-gRPC feature builds"
277 )]
278 pub(crate) fn from_transport(transport: Transport, database: Option<String>) -> Self {
279 Connection {
280 transport,
281 database,
282 stats_provider: None,
283 pending_stats: Mutex::new(None),
284 }
285 }
286
287 /// Returns the transport type name (e.g., "TCP", "gRPC", "Unix Socket").
288 pub fn transport_type(&self) -> &'static str {
289 self.transport.transport_type().as_str()
290 }
291
292 /// Returns true if this connection supports write operations.
293 ///
294 /// Currently, only TCP connections support writes. gRPC connections are
295 /// read-only until the server supports write operations over gRPC.
296 pub fn supports_writes(&self) -> bool {
297 self.transport.supports_writes()
298 }
299
300 /// Handles database creation logic (internal use).
301 pub(crate) fn handle_creation_mode(
302 &self,
303 database_path: &str,
304 create_mode: CreateMode,
305 ) -> Result<()> {
306 match create_mode {
307 CreateMode::DoNotCreate => {}
308 CreateMode::Create => {
309 self.execute_command(&format!(
310 "CREATE DATABASE {}",
311 escape_sql_path(database_path)
312 ))?;
313 }
314 CreateMode::CreateIfNotExists => {
315 if let Err(e) = self.execute_command(&format!(
316 "CREATE DATABASE IF NOT EXISTS {}",
317 escape_sql_path(database_path)
318 )) {
319 if !is_already_exists_error(&e) {
320 return Err(Error::with_cause(
321 format!("Failed to create database '{database_path}': {e}"),
322 e,
323 ));
324 }
325 }
326 }
327 CreateMode::CreateAndReplace => {
328 let _ = self.execute_command(&format!(
329 "DROP DATABASE IF EXISTS {}",
330 escape_sql_path(database_path)
331 ));
332 self.execute_command(&format!(
333 "CREATE DATABASE {}",
334 escape_sql_path(database_path)
335 ))?;
336 }
337 }
338 Ok(())
339 }
340
341 /// Attaches and sets the database path (internal use).
342 pub(crate) fn attach_and_set_path(&self, database_path: &str) -> Result<()> {
343 let db_alias = std::path::Path::new(database_path)
344 .file_stem()
345 .and_then(|s| s.to_str())
346 .unwrap_or("db");
347
348 self.execute_command(&format!(
349 "ATTACH DATABASE {} AS {}",
350 escape_sql_path(database_path),
351 escape_sql_path(db_alias)
352 ))?;
353
354 self.execute_command(&format!(
355 "SET search_path TO {}, public",
356 escape_sql_path(db_alias)
357 ))?;
358
359 Ok(())
360 }
361
362 /// Connects to a Hyper server with authentication.
363 ///
364 /// # Arguments
365 ///
366 /// * `endpoint` - The server endpoint (host:port).
367 /// * `database_path` - Path to the database file.
368 /// * `create_mode` - How to handle database creation.
369 /// * `user` - Username for authentication.
370 /// * `password` - Password for authentication.
371 ///
372 /// # Errors
373 ///
374 /// Returns an error if the connection or authentication fails.
375 pub fn connect_with_auth(
376 endpoint: &str,
377 database_path: &str,
378 create_mode: CreateMode,
379 user: &str,
380 password: &str,
381 ) -> Result<Self> {
382 crate::ConnectionBuilder::new(endpoint)
383 .database(database_path)
384 .create_mode(create_mode)
385 .user(user.to_string())
386 .password(password)
387 .build()
388 }
389
390 /// Creates a connection to a Hyper server without attaching a database.
391 ///
392 /// # Errors
393 ///
394 /// Returns [`Error::Client`] if the TCP or gRPC handshake fails, and
395 /// [`Error::Io`] if the endpoint cannot be reached.
396 pub fn without_database(endpoint: &str) -> Result<Self> {
397 crate::ConnectionBuilder::new(endpoint).build()
398 }
399
400 /// Executes a SQL command that doesn't return results.
401 ///
402 /// Use this for DDL statements (CREATE, ALTER, DROP) and DML statements
403 /// (INSERT, UPDATE, DELETE).
404 ///
405 /// # Arguments
406 ///
407 /// * `command` - The SQL command to execute.
408 ///
409 /// # Returns
410 ///
411 /// The number of affected rows, or 0 if not applicable.
412 ///
413 /// # Errors
414 ///
415 /// Returns an error if:
416 /// - The connection is using gRPC transport (write operations not yet supported)
417 /// - The command fails to execute
418 pub fn execute_command(&self, command: &str) -> Result<u64> {
419 let token = self.stats_before_query(command);
420
421 let result = self.transport.execute_command(command);
422
423 // For commands, the query is fully executed synchronously, so we
424 // can store the pending token immediately for lazy resolution.
425 self.stats_store_pending(token, command);
426
427 result
428 }
429
430 /// Executes a SQL query and returns a streaming result set.
431 ///
432 /// Results are streamed in chunks (default 64K rows), keeping memory usage
433 /// constant regardless of result set size. This makes it safe for any
434 /// result size, from a single row to billions of rows.
435 ///
436 /// # Example
437 ///
438 /// ```no_run
439 /// # use hyperdb_api::{Connection, Result};
440 /// # fn example(conn: &Connection) -> Result<()> {
441 /// let mut result = conn.execute_query("SELECT id, value FROM measurements")?;
442 /// while let Some(chunk) = result.next_chunk()? {
443 /// for row in &chunk {
444 /// // Generic typed access (like C++ row.get<T>())
445 /// let id: Option<i32> = row.get(0);
446 /// let value: Option<f64> = row.get(1);
447 ///
448 /// // Or direct accessors
449 /// let id = row.get_i32(0);
450 /// let value = row.get_f64(1);
451 /// }
452 /// }
453 /// # Ok(())
454 /// # }
455 /// ```
456 ///
457 /// # Memory Behavior
458 ///
459 /// - Only one chunk is held in memory at a time (~few MB for 64K rows)
460 /// - Safe for result sets of any size (millions/billions of rows)
461 /// - Memory usage is `O(chunk_size)`, not `O(total_rows)`
462 ///
463 /// # Errors
464 ///
465 /// - Returns [`Error::Client`] wrapping a `hyperdb_api_core::client::Error` if the
466 /// SQL fails to parse, execute, or if the server reports an error
467 /// while streaming.
468 /// - Returns [`Error::Io`] on transport-level I/O failures.
469 pub fn execute_query(&self, query: &str) -> Result<Rowset<'_>> {
470 let token = self.stats_before_query(query);
471
472 let result = match &self.transport {
473 Transport::Tcp(tcp) => {
474 let stream = tcp
475 .client
476 .query_streaming(query, DEFAULT_BINARY_CHUNK_SIZE)?;
477 Ok(Rowset::new(stream))
478 }
479 Transport::Grpc(grpc) => {
480 // gRPC streaming: pull chunks lazily so peak memory is
481 // bounded by one gRPC message (tonic default 64 MB), not
482 // by the full result size. Matches TCP's
483 // constant-memory streaming shape.
484 //
485 // The transport module already creates a fresh gRPC client
486 // per query (gRPC client needs &mut self to execute), so we
487 // do the same here: connect, start the stream, wrap as a
488 // `ChunkSource`. The stream keeps the channel and runtime
489 // alive via refcounted handles inside `GrpcChunkStreamSync`.
490 let mut client =
491 hyperdb_api_core::client::grpc::GrpcClientSync::connect(grpc.config.clone())?;
492 let stream = client.execute_query_stream(query)?;
493 let source = Box::new(crate::grpc_connection::GrpcChunkStreamSource::new(stream));
494 let arrow_rowset = crate::arrow_result::ArrowRowset::from_stream(source)?;
495 Ok(Rowset::from_arrow(arrow_rowset))
496 }
497 };
498
499 // Store the pending token — Hyper logs the execution stats after the
500 // result is consumed (streamed), so we defer resolution until
501 // last_query_stats() is called.
502 self.stats_store_pending(token, query);
503
504 result
505 }
506
507 // =========================================================================
508 // Arrow Format Queries
509 // =========================================================================
510
511 /// Executes a SELECT query and returns results as Arrow IPC stream bytes.
512 ///
513 /// # Example
514 ///
515 /// ```no_run
516 /// use hyperdb_api::{Connection, CreateMode, Result};
517 ///
518 /// fn main() -> Result<()> {
519 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
520 ///
521 /// // Create and populate a table
522 /// conn.execute_command("CREATE TABLE data (id INT, value DOUBLE PRECISION)")?;
523 /// conn.execute_command("INSERT INTO data VALUES (1, 1.5), (2, 2.5)")?;
524 ///
525 /// // Get results as Arrow IPC stream
526 /// let arrow_data = conn.execute_query_to_arrow("SELECT * FROM data")?;
527 /// println!("Got {} bytes of Arrow IPC data", arrow_data.len());
528 ///
529 /// Ok(())
530 /// }
531 /// ```
532 ///
533 /// # Errors
534 ///
535 /// Propagates any [`Error::Client`] from the TCP or gRPC transport when
536 /// the query fails or the server cannot produce Arrow IPC output.
537 pub fn execute_query_to_arrow(&self, select_query: &str) -> Result<bytes::Bytes> {
538 self.transport.execute_query_to_arrow(select_query)
539 }
540
541 /// Exports an entire table to Arrow IPC stream format.
542 ///
543 /// This is a convenience method equivalent to
544 /// `execute_query_to_arrow("SELECT * FROM table_name")`.
545 ///
546 /// # Arguments
547 ///
548 /// * `table_name` - The table name
549 ///
550 /// # Returns
551 ///
552 /// Raw Arrow IPC stream bytes containing all rows from the table.
553 ///
554 /// # Example
555 ///
556 /// ```no_run
557 /// use hyperdb_api::{Connection, CreateMode, Result};
558 ///
559 /// fn main() -> Result<()> {
560 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
561 /// let arrow_data = conn.export_table_to_arrow("my_table")?;
562 /// Ok(())
563 /// }
564 /// ```
565 ///
566 /// # Errors
567 ///
568 /// Returns whatever [`execute_query_to_arrow`](Self::execute_query_to_arrow)
569 /// would return for `SELECT * FROM <table_name>` — typically
570 /// [`Error::Client`] if the table does not exist or the query is rejected.
571 pub fn export_table_to_arrow(&self, table_name: &str) -> Result<bytes::Bytes> {
572 self.execute_query_to_arrow(&format!("SELECT * FROM {table_name}"))
573 }
574
575 /// Executes a SELECT query and returns results as Arrow `RecordBatch`es.
576 ///
577 /// This is the recommended method for Arrow-native workflows (`DataFusion`,
578 /// Polars, etc.) where you want direct `RecordBatch` access without going
579 /// through the `Row` abstraction.
580 ///
581 /// # Example
582 ///
583 /// ```no_run
584 /// use hyperdb_api::{Connection, CreateMode, Result};
585 /// use arrow::record_batch::RecordBatch;
586 ///
587 /// fn main() -> Result<()> {
588 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
589 ///
590 /// let batches: Vec<RecordBatch> = conn.execute_query_to_batches("SELECT * FROM data")?;
591 /// for batch in &batches {
592 /// println!("batch: {} rows x {} cols", batch.num_rows(), batch.num_columns());
593 /// }
594 /// Ok(())
595 /// }
596 /// ```
597 ///
598 /// # Errors
599 ///
600 /// - Returns [`Error::Client`] if the query itself fails.
601 /// - Returns [`Error::Other`] if the Arrow IPC payload returned by the
602 /// server is malformed and cannot be decoded into record batches.
603 pub fn execute_query_to_batches(
604 &self,
605 select_query: &str,
606 ) -> Result<Vec<arrow::record_batch::RecordBatch>> {
607 let arrow_data = self.execute_query_to_arrow(select_query)?;
608 crate::arrow_result::parse_arrow_ipc(arrow_data)
609 }
610
611 /// Fetches a single row from a query.
612 ///
613 /// Returns an error if the query returns no rows.
614 ///
615 /// # Example
616 ///
617 /// ```no_run
618 /// use hyperdb_api::{Connection, CreateMode, Result};
619 ///
620 /// fn main() -> Result<()> {
621 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
622 /// let row = conn.fetch_one("SELECT * FROM users WHERE id = 1")?;
623 /// let id: Option<i32> = row.get(0);
624 /// let name: Option<String> = row.get(1);
625 /// Ok(())
626 /// }
627 /// ```
628 ///
629 /// # Errors
630 ///
631 /// - Returns the error from [`execute_query`](Self::execute_query) if
632 /// the query itself fails.
633 /// - Returns [`Error::Other`] with message `"Query returned no rows"` if
634 /// the query produced zero rows.
635 pub fn fetch_one<Q>(&self, query: Q) -> Result<crate::Row>
636 where
637 Q: AsRef<str>,
638 {
639 let query = query.as_ref();
640 let result = self.execute_query(query)?;
641 result.require_first_row()
642 }
643
644 /// Fetches an optional single row from a query.
645 ///
646 /// Returns `None` if the query returns no rows.
647 ///
648 /// # Example
649 ///
650 /// ```no_run
651 /// use hyperdb_api::{Connection, CreateMode, Result};
652 ///
653 /// fn main() -> Result<()> {
654 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
655 /// if let Some(row) = conn.fetch_optional("SELECT * FROM users WHERE id = 999")? {
656 /// let name: Option<String> = row.get(1);
657 /// println!("Found user: {:?}", name);
658 /// }
659 /// Ok(())
660 /// }
661 /// ```
662 ///
663 /// # Errors
664 ///
665 /// Returns the error from [`execute_query`](Self::execute_query) if the
666 /// query itself fails. An empty result set is not an error — it yields
667 /// `Ok(None)`.
668 pub fn fetch_optional<Q>(&self, query: Q) -> Result<Option<crate::Row>>
669 where
670 Q: AsRef<str>,
671 {
672 let query = query.as_ref();
673 let result = self.execute_query(query)?;
674 result.first_row()
675 }
676
677 /// Fetches all rows from a query.
678 ///
679 /// # Example
680 ///
681 /// ```no_run
682 /// use hyperdb_api::{Connection, CreateMode, Result};
683 ///
684 /// fn main() -> Result<()> {
685 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
686 /// let rows = conn.fetch_all("SELECT * FROM users WHERE active = true ORDER BY name")?;
687 /// for row in rows {
688 /// let id: Option<i32> = row.get(0);
689 /// let name: Option<String> = row.get(1);
690 /// println!("User {}: {:?}", id.unwrap_or(-1), name);
691 /// }
692 /// Ok(())
693 /// }
694 /// ```
695 ///
696 /// # Errors
697 ///
698 /// Returns the error from [`execute_query`](Self::execute_query), or a
699 /// transport error produced while draining every chunk of the streamed
700 /// result set.
701 pub fn fetch_all<Q>(&self, query: Q) -> Result<Vec<crate::Row>>
702 where
703 Q: AsRef<str>,
704 {
705 let query = query.as_ref();
706 let result = self.execute_query(query)?;
707 result.collect_rows()
708 }
709
710 /// Fetches a single row and maps it to a struct using [`FromRow`](crate::FromRow).
711 ///
712 /// Returns an error if the query returns no rows or if mapping fails.
713 ///
714 /// # Example
715 ///
716 /// ```no_run
717 /// use hyperdb_api::{Connection, CreateMode, Row, FromRow, Result};
718 ///
719 /// struct User { id: i32, name: String }
720 ///
721 /// impl FromRow for User {
722 /// fn from_row(row: &Row) -> Result<Self> {
723 /// Ok(User {
724 /// id: row.get::<i32>(0).ok_or_else(|| hyperdb_api::Error::new("NULL id"))?,
725 /// name: row.get::<String>(1).unwrap_or_default(),
726 /// })
727 /// }
728 /// }
729 ///
730 /// fn main() -> Result<()> {
731 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
732 /// let user: User = conn.fetch_one_as("SELECT id, name FROM users WHERE id = 1")?;
733 /// Ok(())
734 /// }
735 /// ```
736 ///
737 /// # Errors
738 ///
739 /// - Returns the error from [`fetch_one`](Self::fetch_one) if the query
740 /// fails or returns no rows.
741 /// - Returns whatever error [`FromRow::from_row`](crate::FromRow::from_row)
742 /// produces when the row cannot be mapped into `T`.
743 pub fn fetch_one_as<T: crate::FromRow>(&self, query: &str) -> Result<T> {
744 let row = self.fetch_one(query)?;
745 T::from_row(&row)
746 }
747
748 /// Fetches all rows and maps them to structs using [`FromRow`](crate::FromRow).
749 ///
750 /// # Example
751 ///
752 /// ```no_run
753 /// # use hyperdb_api::{Connection, CreateMode, Row, FromRow, Result};
754 /// # struct User { id: i32, name: String }
755 /// # impl FromRow for User {
756 /// # fn from_row(row: &Row) -> Result<Self> {
757 /// # Ok(User { id: row.get::<i32>(0).unwrap_or(0), name: row.get::<String>(1).unwrap_or_default() })
758 /// # }
759 /// # }
760 /// # fn example(conn: &Connection) -> Result<()> {
761 /// let users: Vec<User> = conn.fetch_all_as("SELECT id, name FROM users")?;
762 /// # Ok(())
763 /// # }
764 /// ```
765 ///
766 /// # Errors
767 ///
768 /// - Returns the error from [`fetch_all`](Self::fetch_all) if the query
769 /// fails.
770 /// - Returns the first error produced by
771 /// [`FromRow::from_row`](crate::FromRow::from_row) on any of the rows.
772 pub fn fetch_all_as<T: crate::FromRow>(&self, query: &str) -> Result<Vec<T>> {
773 let rows = self.fetch_all(query)?;
774 rows.iter().map(|r| T::from_row(r)).collect()
775 }
776
777 /// Fetches a single scalar value from a query.
778 ///
779 /// Returns an error if the query returns no rows or NULL.
780 ///
781 /// # Example
782 ///
783 /// ```no_run
784 /// use hyperdb_api::{Connection, CreateMode, Result};
785 ///
786 /// fn main() -> Result<()> {
787 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
788 /// let count: i64 = conn.fetch_scalar("SELECT COUNT(*) FROM users")?;
789 /// println!("User count: {}", count);
790 /// Ok(())
791 /// }
792 /// ```
793 ///
794 /// # Errors
795 ///
796 /// - Returns the error from [`execute_query`](Self::execute_query) if
797 /// the query itself fails.
798 /// - Returns [`Error::Other`] with message `"Query returned no rows"` if
799 /// the query produced zero rows.
800 /// - Returns [`Error::Other`] with message `"Scalar query returned NULL"`
801 /// if the single cell is SQL `NULL`.
802 pub fn fetch_scalar<T, Q>(&self, query: Q) -> Result<T>
803 where
804 T: crate::connection::ScalarValue + crate::result::RowValue,
805 Q: AsRef<str>,
806 {
807 let query = query.as_ref();
808 let result = self.execute_query(query)?;
809 result.require_scalar()
810 }
811
812 /// Fetches an optional scalar value from a query.
813 ///
814 /// Returns `None` if the query returns no rows or NULL.
815 ///
816 /// # Example
817 ///
818 /// ```no_run
819 /// use hyperdb_api::{Connection, CreateMode, Result};
820 ///
821 /// fn main() -> Result<()> {
822 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
823 /// let max_id: Option<i32> = conn.fetch_optional_scalar("SELECT MAX(id) FROM users")?;
824 /// println!("Max ID: {:?}", max_id);
825 /// Ok(())
826 /// }
827 /// ```
828 ///
829 /// # Errors
830 ///
831 /// - Returns the error from [`execute_query`](Self::execute_query) if
832 /// the query itself fails.
833 /// - Returns [`Error::Other`] with message `"Query returned no rows"` if
834 /// the query produced zero rows. (An empty result is treated as an
835 /// error here because we need at least one row to inspect; SQL `NULL`
836 /// in the single cell yields `Ok(None)`.)
837 pub fn fetch_optional_scalar<T, Q>(&self, query: Q) -> Result<Option<T>>
838 where
839 T: crate::connection::ScalarValue + crate::result::RowValue,
840 Q: AsRef<str>,
841 {
842 let query = query.as_ref();
843 let result = self.execute_query(query)?;
844 result.scalar()
845 }
846
847 /// Executes a scalar query and returns a single value of type `T`.
848 ///
849 /// Alias for [`fetch_optional_scalar`](Self::fetch_optional_scalar) for C++ API compatibility.
850 ///
851 /// # Errors
852 ///
853 /// See [`fetch_optional_scalar`](Self::fetch_optional_scalar).
854 #[inline]
855 pub fn execute_scalar_query<T>(&self, query: &str) -> Result<Option<T>>
856 where
857 T: ScalarValue + crate::result::RowValue,
858 {
859 self.fetch_optional_scalar(query)
860 }
861
862 /// Queries for a count value, defaulting to 0 if NULL.
863 ///
864 /// This is optimized for COUNT queries which typically return 0
865 /// instead of NULL when there are no matching rows.
866 ///
867 /// # Example
868 ///
869 /// ```no_run
870 /// use hyperdb_api::{Connection, CreateMode, Result};
871 ///
872 /// fn main() -> Result<()> {
873 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
874 /// let count = conn.query_count("SELECT COUNT(*) FROM users WHERE active = true")?;
875 /// println!("Active users: {}", count);
876 /// Ok(())
877 /// }
878 /// ```
879 ///
880 /// # Errors
881 ///
882 /// Returns the error from [`execute_query`](Self::execute_query) if the
883 /// query fails or produces no rows. SQL `NULL` is mapped to `0`, not an
884 /// error.
885 pub fn query_count(&self, query: &str) -> Result<i64> {
886 self.fetch_optional_scalar::<i64, _>(query)
887 .map(|opt| opt.unwrap_or(0))
888 }
889
890 // =========================================================================
891 // Parameterized Queries (SQL Injection Safe)
892 // =========================================================================
893
894 /// Executes a parameterized query, returning streaming results.
895 ///
896 /// This is safe to use with untrusted user input: parameters travel
897 /// through the extended query protocol (Parse/Bind/Execute) as
898 /// binary `HyperBinary` values and are never interpolated into the
899 /// SQL string. For repeated executions of the same SQL with different
900 /// values, prefer the explicit [`prepare`](Self::prepare) API — it
901 /// returns a reusable [`PreparedStatement`](crate::PreparedStatement)
902 /// that skips the Parse round-trip on every call.
903 ///
904 /// Under the hood, `query_params` is a one-shot
905 /// prepare+execute+close: it prepares an unnamed statement, binds
906 /// the parameters, starts streaming, and closes the statement when
907 /// the returned [`Rowset`] is dropped.
908 ///
909 /// # Arguments
910 ///
911 /// * `query` - The SQL query with parameter placeholders (`$1`, `$2`, etc.)
912 /// * `params` - Parameter values matching the placeholders
913 ///
914 /// # SQL Injection Prevention
915 ///
916 /// ```no_run
917 /// use hyperdb_api::{Connection, CreateMode, Result};
918 ///
919 /// fn search_users(conn: &Connection, user_input: &str) -> Result<()> {
920 /// // DANGEROUS - vulnerable to SQL injection:
921 /// // let query = format!("SELECT * FROM users WHERE name = '{}'", user_input);
922 ///
923 /// // SAFE - parameterized query:
924 /// let mut result = conn.query_params(
925 /// "SELECT * FROM users WHERE name = $1",
926 /// &[&user_input],
927 /// )?;
928 ///
929 /// while let Some(chunk) = result.next_chunk()? {
930 /// for row in &chunk {
931 /// let id: Option<i32> = row.get(0);
932 /// let name: Option<String> = row.get(1);
933 /// println!("Found: {:?} - {:?}", id, name);
934 /// }
935 /// }
936 /// Ok(())
937 /// }
938 /// ```
939 ///
940 /// # Multiple Parameters
941 ///
942 /// ```no_run
943 /// use hyperdb_api::{Connection, CreateMode, Result};
944 ///
945 /// fn main() -> Result<()> {
946 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
947 ///
948 /// // Multiple parameters of different types
949 /// let result = conn.query_params(
950 /// "SELECT * FROM orders WHERE customer_id = $1 AND total > $2",
951 /// &[&42i32, &100.0f64],
952 /// )?;
953 /// Ok(())
954 /// }
955 /// ```
956 ///
957 /// # Errors
958 ///
959 /// - Returns [`Error::Other`] if the connection is using gRPC transport
960 /// (prepared statements are TCP-only).
961 /// - Returns [`Error::Client`] if the server rejects the statement at
962 /// `Parse`, `Bind`, or `Execute` time, including on type-mismatch
963 /// between `params` and the inferred OIDs.
964 /// - Returns [`Error::Io`] on transport-level I/O failures.
965 pub fn query_params(
966 &self,
967 query: &str,
968 params: &[&dyn crate::params::ToSqlParam],
969 ) -> Result<Rowset<'_>> {
970 // Implementation note: routes through the extended query protocol
971 // via Parse/Bind/Execute so parameters travel in HyperBinary
972 // format — no SQL escaping, full SQL-injection safety regardless of
973 // parameter content. The statement handle is stashed inside the
974 // returned Rowset so its Drop-time close_statement fires *after*
975 // the rowset releases its connection lock (otherwise the close
976 // would deadlock on the still-held mutex).
977 let client = match &self.transport {
978 Transport::Tcp(tcp) => &tcp.client,
979 Transport::Grpc(_) => {
980 return Err(Error::new(
981 "prepared statements are not supported over gRPC transport",
982 ));
983 }
984 };
985 let oids: Vec<crate::Oid> = params.iter().map(|p| p.sql_oid()).collect();
986 let stmt = client.prepare_typed(query, &oids)?;
987 let encoded: Vec<Option<Vec<u8>>> = params.iter().map(|p| p.encode_param()).collect();
988 let stream =
989 client.execute_streaming(&stmt, encoded, crate::result::DEFAULT_BINARY_CHUNK_SIZE)?;
990 Ok(Rowset::from_prepared(stream).with_statement_guard(stmt))
991 }
992
993 /// Executes a parameterized command that doesn't return rows.
994 ///
995 /// Use this for INSERT, UPDATE, DELETE, or DDL statements with parameters.
996 /// Returns the number of affected rows.
997 ///
998 /// See [`query_params`](Self::query_params) for details on parameter
999 /// handling and SQL injection prevention.
1000 ///
1001 /// # Example
1002 ///
1003 /// ```no_run
1004 /// use hyperdb_api::{Connection, CreateMode, Result};
1005 ///
1006 /// fn delete_user(conn: &Connection, user_id: i32) -> Result<u64> {
1007 /// // Safe from SQL injection
1008 /// conn.command_params("DELETE FROM users WHERE id = $1", &[&user_id])
1009 /// }
1010 /// ```
1011 ///
1012 /// # Errors
1013 ///
1014 /// - Returns [`Error::Other`] if the connection is using gRPC transport.
1015 /// - Returns [`Error::Client`] if the server rejects the statement at
1016 /// `Parse`, `Bind`, or `Execute` time.
1017 /// - Returns [`Error::Io`] on transport-level I/O failures.
1018 pub fn command_params(
1019 &self,
1020 query: &str,
1021 params: &[&dyn crate::params::ToSqlParam],
1022 ) -> Result<u64> {
1023 // One-shot prepare+execute with explicit OIDs — see `query_params`
1024 // for why we collect OIDs from each parameter.
1025 let client = match &self.transport {
1026 Transport::Tcp(tcp) => &tcp.client,
1027 Transport::Grpc(_) => {
1028 return Err(Error::new(
1029 "prepared statements are not supported over gRPC transport",
1030 ));
1031 }
1032 };
1033 let oids: Vec<crate::Oid> = params.iter().map(|p| p.sql_oid()).collect();
1034 let stmt = client.prepare_typed(query, &oids)?;
1035 let encoded: Vec<Option<Vec<u8>>> = params.iter().map(|p| p.encode_param()).collect();
1036 Ok(client.execute_no_result(&stmt, encoded)?)
1037 }
1038
1039 /// Executes multiple SQL statements in a single call.
1040 ///
1041 /// Each statement is executed sequentially. If any statement fails,
1042 /// execution stops and the error is returned. Returns the total number
1043 /// of affected rows across all statements.
1044 ///
1045 /// This is more efficient than calling `execute_command` in a loop
1046 /// because it reduces round-trips for DDL scripts and multi-statement setup.
1047 ///
1048 /// # Example
1049 ///
1050 /// ```no_run
1051 /// use hyperdb_api::{Connection, CreateMode, Result};
1052 ///
1053 /// fn main() -> Result<()> {
1054 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
1055 /// let total = conn.execute_batch(&[
1056 /// "CREATE TABLE users (id INT, name TEXT)",
1057 /// "INSERT INTO users VALUES (1, 'Alice')",
1058 /// "INSERT INTO users VALUES (2, 'Bob')",
1059 /// ])?;
1060 /// println!("Total affected: {}", total);
1061 /// Ok(())
1062 /// }
1063 /// ```
1064 ///
1065 /// # Errors
1066 ///
1067 /// Returns a wrapped [`Error::Other`] on the first statement that fails;
1068 /// its `source` is the original [`Error::Client`] from
1069 /// [`execute_command`](Self::execute_command). The error message
1070 /// includes the failing statement's ordinal and an 80-character preview
1071 /// of its SQL.
1072 pub fn execute_batch(&self, statements: &[&str]) -> Result<u64> {
1073 let mut total = 0u64;
1074 for (i, stmt) in statements.iter().enumerate() {
1075 if !stmt.trim().is_empty() {
1076 total += self.execute_command(stmt).map_err(|e| {
1077 let preview: String = stmt.chars().take(80).collect();
1078 Error::with_cause(
1079 format!(
1080 "execute_batch failed at statement {} of {}: {}",
1081 i + 1,
1082 statements.len(),
1083 preview,
1084 ),
1085 e,
1086 )
1087 })?;
1088 }
1089 }
1090 Ok(total)
1091 }
1092
1093 /// Returns the attached database path, if any.
1094 pub fn database(&self) -> Option<&str> {
1095 self.database.as_deref()
1096 }
1097
1098 /// Creates a new database file.
1099 ///
1100 /// # Example
1101 ///
1102 /// ```no_run
1103 /// use hyperdb_api::{Connection, Result};
1104 ///
1105 /// fn main() -> Result<()> {
1106 /// let conn = Connection::without_database("localhost:7483")?;
1107 /// conn.create_database("new_database.hyper")?;
1108 /// Ok(())
1109 /// }
1110 /// ```
1111 ///
1112 /// # Errors
1113 ///
1114 /// Returns [`Error::Client`] if the server rejects the
1115 /// `CREATE DATABASE IF NOT EXISTS` statement (e.g. the path is not
1116 /// writable on the server).
1117 pub fn create_database(&self, path: &str) -> Result<()> {
1118 let sql = format!("CREATE DATABASE IF NOT EXISTS {}", escape_sql_path(path));
1119 self.execute_command(&sql)?;
1120 Ok(())
1121 }
1122
1123 /// Drops (deletes) a database file.
1124 ///
1125 /// # Example
1126 ///
1127 /// ```no_run
1128 /// use hyperdb_api::{Connection, Result};
1129 ///
1130 /// fn main() -> Result<()> {
1131 /// let conn = Connection::without_database("localhost:7483")?;
1132 /// conn.drop_database("old_database.hyper")?;
1133 /// Ok(())
1134 /// }
1135 /// ```
1136 ///
1137 /// # Errors
1138 ///
1139 /// Returns [`Error::Client`] if the server rejects the
1140 /// `DROP DATABASE IF EXISTS` statement (e.g. the database is still
1141 /// attached or permissions deny deletion).
1142 pub fn drop_database(&self, path: &str) -> Result<()> {
1143 let sql = format!("DROP DATABASE IF EXISTS {}", escape_sql_path(path));
1144 self.execute_command(&sql)?;
1145 Ok(())
1146 }
1147
1148 /// Attaches a database file to the connection.
1149 ///
1150 /// Once attached, the database can be queried and modified.
1151 /// The database is identified by its alias (or by its path if no alias is provided).
1152 ///
1153 /// # Arguments
1154 ///
1155 /// * `path` - The path to the database file to attach.
1156 /// * `alias` - Optional alias for the database. If `None`, the database is
1157 /// attached without an explicit alias (typically using its filename).
1158 ///
1159 /// # Errors
1160 ///
1161 /// Returns an error if the database file doesn't exist or if attachment fails.
1162 ///
1163 /// # Example
1164 ///
1165 /// ```no_run
1166 /// use hyperdb_api::{Connection, Result};
1167 ///
1168 /// fn main() -> Result<()> {
1169 /// let conn = Connection::without_database("localhost:7483")?;
1170 ///
1171 /// // Attach with an alias
1172 /// conn.attach_database("data.hyper", Some("mydata"))?;
1173 ///
1174 /// // Attach without an alias
1175 /// conn.attach_database("other.hyper", None)?;
1176 /// Ok(())
1177 /// }
1178 /// ```
1179 pub fn attach_database(&self, path: &str, alias: Option<&str>) -> Result<()> {
1180 let sql = if let Some(alias) = alias {
1181 format!(
1182 "ATTACH DATABASE {} AS {}",
1183 escape_sql_path(path),
1184 escape_sql_path(alias)
1185 )
1186 } else {
1187 format!("ATTACH DATABASE {}", escape_sql_path(path))
1188 };
1189 self.execute_command(&sql)?;
1190 Ok(())
1191 }
1192
1193 /// Detaches a database from this connection.
1194 ///
1195 /// After detaching, the database file is released and can be accessed
1196 /// externally (e.g., copied, moved, etc.). All pending updates are
1197 /// written to disk before detaching.
1198 ///
1199 /// # Arguments
1200 ///
1201 /// * `alias` - The alias of the database to detach.
1202 ///
1203 /// # Errors
1204 ///
1205 /// Returns an error if the database is not attached or if detachment fails.
1206 ///
1207 /// # Example
1208 ///
1209 /// ```no_run
1210 /// use hyperdb_api::{Connection, Result};
1211 ///
1212 /// fn main() -> Result<()> {
1213 /// let conn = Connection::without_database("localhost:7483")?;
1214 /// conn.attach_database("data.hyper", Some("mydata"))?;
1215 /// // ... work with the database ...
1216 /// conn.detach_database("mydata")?;
1217 /// Ok(())
1218 /// }
1219 /// ```
1220 pub fn detach_database(&self, alias: &str) -> Result<()> {
1221 let sql = format!("DETACH DATABASE {}", escape_sql_path(alias));
1222 self.execute_command(&sql)?;
1223 Ok(())
1224 }
1225
1226 /// Detaches all databases from this connection.
1227 ///
1228 /// This is useful for cleanup before closing a connection or when
1229 /// you need to release all database files.
1230 ///
1231 /// # Errors
1232 ///
1233 /// Returns [`Error::Client`] if the server rejects the
1234 /// `DETACH ALL DATABASES` statement (e.g. a database is still in use by
1235 /// another session).
1236 pub fn detach_all_databases(&self) -> Result<()> {
1237 self.execute_command("DETACH ALL DATABASES")?;
1238 Ok(())
1239 }
1240
1241 /// Creates a schema in the database.
1242 ///
1243 /// # Errors
1244 ///
1245 /// - Returns an error if `schema_name` cannot be converted into a
1246 /// [`SchemaName`](crate::SchemaName) (invalid identifier).
1247 /// - Returns [`Error::Client`] if the server rejects the
1248 /// `CREATE SCHEMA` statement (e.g. the schema already exists).
1249 pub fn create_schema<T>(&self, schema_name: T) -> Result<()>
1250 where
1251 T: TryInto<crate::SchemaName>,
1252 crate::Error: From<T::Error>,
1253 {
1254 crate::catalog::Catalog::new(self).create_schema(schema_name)
1255 }
1256
1257 /// Checks whether a schema exists.
1258 ///
1259 /// # Arguments
1260 ///
1261 /// * `schema` - The schema name (can include database qualifier).
1262 ///
1263 /// # Example
1264 ///
1265 /// ```no_run
1266 /// use hyperdb_api::{Connection, Result};
1267 ///
1268 /// fn main() -> Result<()> {
1269 /// let conn = Connection::without_database("localhost:7483")?;
1270 /// if conn.has_schema("public")? {
1271 /// println!("Schema 'public' exists");
1272 /// }
1273 /// Ok(())
1274 /// }
1275 /// ```
1276 ///
1277 /// # Errors
1278 ///
1279 /// - Returns an error if `schema` cannot be converted into a
1280 /// [`SchemaName`](crate::SchemaName).
1281 /// - Returns [`Error::Client`] if the catalog lookup query fails.
1282 pub fn has_schema<T>(&self, schema: T) -> Result<bool>
1283 where
1284 T: TryInto<crate::SchemaName>,
1285 crate::Error: From<T::Error>,
1286 {
1287 use crate::catalog::Catalog;
1288 Catalog::new(self).has_schema(schema)
1289 }
1290
1291 /// Checks whether a table exists.
1292 ///
1293 /// # Arguments
1294 ///
1295 /// * `table_name` - The table name (can include database and schema qualifiers).
1296 ///
1297 /// # Example
1298 ///
1299 /// ```no_run
1300 /// use hyperdb_api::{Connection, Result};
1301 ///
1302 /// fn main() -> Result<()> {
1303 /// let conn = Connection::without_database("localhost:7483")?;
1304 /// if conn.has_table("public.users")? {
1305 /// println!("Table 'users' exists");
1306 /// }
1307 /// Ok(())
1308 /// }
1309 /// ```
1310 ///
1311 /// # Errors
1312 ///
1313 /// - Returns an error if `table_name` cannot be converted into a
1314 /// [`TableName`](crate::TableName).
1315 /// - Returns [`Error::Client`] if the catalog lookup query fails.
1316 pub fn has_table<T>(&self, table_name: T) -> Result<bool>
1317 where
1318 T: TryInto<crate::TableName>,
1319 crate::Error: From<T::Error>,
1320 {
1321 use crate::catalog::Catalog;
1322 Catalog::new(self).has_table(table_name)
1323 }
1324
1325 /// Returns the server version as a parsed struct.
1326 ///
1327 /// Returns `None` if the version cannot be determined (e.g., gRPC connection).
1328 ///
1329 /// # Example
1330 ///
1331 /// ```no_run
1332 /// use hyperdb_api::{Connection, CreateMode, HyperProcess, ServerVersion, Result};
1333 ///
1334 /// fn main() -> Result<()> {
1335 /// let hyper = HyperProcess::new(None, None)?;
1336 /// let conn = Connection::new(&hyper, "test.hyper", CreateMode::CreateIfNotExists)?;
1337 /// if let Some(version) = conn.server_version() {
1338 /// println!("Hyper {}", version);
1339 /// if version >= ServerVersion::new(0, 1, 0) {
1340 /// println!("Has feature X");
1341 /// }
1342 /// }
1343 /// Ok(())
1344 /// }
1345 /// ```
1346 pub fn server_version(&self) -> Option<crate::ServerVersion> {
1347 let version_str = self.parameter_status("server_version")?;
1348 crate::ServerVersion::parse(&version_str)
1349 }
1350
1351 /// Copies a database file to a new path.
1352 ///
1353 /// The source database must be attached to this connection.
1354 ///
1355 /// # Example
1356 ///
1357 /// ```no_run
1358 /// use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
1359 ///
1360 /// fn main() -> Result<()> {
1361 /// let hyper = HyperProcess::new(None, None)?;
1362 /// let conn = Connection::new(&hyper, "source.hyper", CreateMode::DoNotCreate)?;
1363 /// conn.copy_database("source.hyper", "backup.hyper")?;
1364 /// Ok(())
1365 /// }
1366 /// ```
1367 ///
1368 /// # Errors
1369 ///
1370 /// Returns [`Error::Client`] if the server rejects the
1371 /// `COPY DATABASE` statement — e.g. the source is not attached, the
1372 /// destination path is not writable, or it already exists.
1373 pub fn copy_database(&self, source: &str, destination: &str) -> Result<()> {
1374 let sql = format!(
1375 "COPY DATABASE {} TO {}",
1376 escape_sql_path(source),
1377 escape_sql_path(destination)
1378 );
1379 self.execute_command(&sql)?;
1380 Ok(())
1381 }
1382
1383 /// Executes EXPLAIN on a query and returns the plan as a string.
1384 ///
1385 /// # Example
1386 ///
1387 /// ```no_run
1388 /// use hyperdb_api::{Connection, CreateMode, Result};
1389 ///
1390 /// fn main() -> Result<()> {
1391 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
1392 /// let plan = conn.explain("SELECT * FROM users WHERE id = 1")?;
1393 /// println!("{}", plan);
1394 /// Ok(())
1395 /// }
1396 /// ```
1397 ///
1398 /// # Errors
1399 ///
1400 /// Returns [`Error::Client`] if `EXPLAIN <query>` fails to parse or
1401 /// plan, or if the streamed result cannot be consumed.
1402 pub fn explain(&self, query: &str) -> Result<String> {
1403 let explain_sql = format!("EXPLAIN {query}");
1404 let result = self.execute_query(&explain_sql)?;
1405 let mut lines = Vec::new();
1406 for row in result.rows() {
1407 let row = row?;
1408 if let Some(line) = row.get::<String>(0) {
1409 lines.push(line);
1410 }
1411 }
1412 Ok(lines.join("\n"))
1413 }
1414
1415 /// Executes EXPLAIN ANALYZE on a query and returns the plan with timing info.
1416 ///
1417 /// **Note:** This actually executes the query to collect timing information.
1418 ///
1419 /// # Errors
1420 ///
1421 /// Returns [`Error::Client`] if `EXPLAIN ANALYZE <query>` fails — this
1422 /// includes any runtime error raised by actually executing `query`.
1423 pub fn explain_analyze(&self, query: &str) -> Result<String> {
1424 let explain_sql = format!("EXPLAIN ANALYZE {query}");
1425 let result = self.execute_query(&explain_sql)?;
1426 let mut lines = Vec::new();
1427 for row in result.rows() {
1428 let row = row?;
1429 if let Some(line) = row.get::<String>(0) {
1430 lines.push(line);
1431 }
1432 }
1433 Ok(lines.join("\n"))
1434 }
1435
1436 /// Returns a reference to the underlying TCP client.
1437 ///
1438 /// # Panics
1439 ///
1440 /// This method returns `None` if the connection is using gRPC transport.
1441 pub fn tcp_client(&self) -> Option<&Client> {
1442 match &self.transport {
1443 Transport::Tcp(tcp) => Some(&tcp.client),
1444 Transport::Grpc(_) => None,
1445 }
1446 }
1447
1448 /// Crate-internal accessor for the transport. Used by
1449 /// [`PreparedStatement`](crate::PreparedStatement) to reach the
1450 /// underlying `hyperdb_api_core::client::Client`.
1451 pub(crate) fn transport(&self) -> &Transport {
1452 &self.transport
1453 }
1454
1455 /// Prepares a SQL statement with automatic parameter type inference.
1456 ///
1457 /// The returned [`PreparedStatement`](crate::PreparedStatement) can
1458 /// be executed many times with different parameter values; the
1459 /// server caches the parsed plan. This is the preferred way to
1460 /// execute a statement repeatedly inside a loop.
1461 ///
1462 /// For explicit parameter types (necessary when `$N` placeholders
1463 /// would otherwise be ambiguous), use
1464 /// [`prepare_typed`](Self::prepare_typed).
1465 ///
1466 /// # Example
1467 ///
1468 /// ```no_run
1469 /// # use hyperdb_api::{Connection, CreateMode, Result};
1470 /// # fn example(conn: &Connection) -> Result<()> {
1471 /// let stmt = conn.prepare("SELECT name FROM users WHERE id = $1")?;
1472 /// for id in [1_i32, 2, 3] {
1473 /// let name: String = stmt.fetch_scalar(&[&id])?;
1474 /// println!("{id}: {name}");
1475 /// }
1476 /// # Ok(())
1477 /// # }
1478 /// ```
1479 ///
1480 /// # Errors
1481 ///
1482 /// See [`prepare_typed`](Self::prepare_typed) — this method delegates
1483 /// to it with an empty OID list.
1484 pub fn prepare(&self, query: &str) -> Result<crate::PreparedStatement<'_>> {
1485 self.prepare_typed(query, &[])
1486 }
1487
1488 /// Prepares a SQL statement with explicit parameter type OIDs.
1489 ///
1490 /// Use this when the server cannot infer parameter types from the
1491 /// SQL alone (e.g. a bare `$1` in a `WHERE v > $1` clause with no
1492 /// other context). Constants for common types live in
1493 /// [`hyperdb_api_core::types::oids`].
1494 ///
1495 /// # Errors
1496 ///
1497 /// - Returns [`Error::Other`] if the connection is using gRPC transport
1498 /// (prepared statements are TCP-only).
1499 /// - Returns [`Error::Client`] if the server rejects the `Parse`
1500 /// message, e.g. SQL syntax error or unknown OID.
1501 /// - Returns [`Error::Io`] on transport-level I/O failures.
1502 pub fn prepare_typed(
1503 &self,
1504 query: &str,
1505 param_types: &[crate::Oid],
1506 ) -> Result<crate::PreparedStatement<'_>> {
1507 let client = match &self.transport {
1508 Transport::Tcp(tcp) => &tcp.client,
1509 Transport::Grpc(_) => {
1510 return Err(Error::new(
1511 "prepared statements are not supported over gRPC transport",
1512 ));
1513 }
1514 };
1515 let inner = client.prepare_typed(query, param_types)?;
1516 crate::PreparedStatement::new(self, inner)
1517 }
1518
1519 /// Returns true if the connection is alive (passive check).
1520 ///
1521 /// This is a lightweight check that does not send any data to the server.
1522 /// For an active health check, use [`ping`](Self::ping).
1523 pub fn is_alive(&self) -> bool {
1524 match &self.transport {
1525 Transport::Tcp(tcp) => tcp.client.is_alive(),
1526 Transport::Grpc(_) => true, // gRPC connections are stateless
1527 }
1528 }
1529
1530 /// Actively checks that the connection is healthy by executing a trivial query.
1531 ///
1532 /// Unlike [`is_alive`](Self::is_alive) which only checks local state,
1533 /// this method sends `SELECT 1` to the server and verifies a response.
1534 ///
1535 /// # Example
1536 ///
1537 /// ```no_run
1538 /// # use hyperdb_api::{Connection, CreateMode, Result};
1539 /// # fn example(conn: &Connection) -> Result<()> {
1540 /// if conn.ping().is_ok() {
1541 /// println!("Connection is healthy");
1542 /// }
1543 /// # Ok(())
1544 /// # }
1545 /// ```
1546 ///
1547 /// # Errors
1548 ///
1549 /// Returns [`Error::Client`] or [`Error::Io`] if the `SELECT 1`
1550 /// round-trip fails — i.e. the connection is no longer usable.
1551 pub fn ping(&self) -> Result<()> {
1552 self.execute_command("SELECT 1")?;
1553 Ok(())
1554 }
1555
1556 /// Returns the process ID of the backend server connection.
1557 ///
1558 /// Returns 0 for gRPC connections (not applicable).
1559 pub fn process_id(&self) -> i32 {
1560 match &self.transport {
1561 Transport::Tcp(tcp) => tcp.client.process_id(),
1562 Transport::Grpc(_) => 0,
1563 }
1564 }
1565
1566 /// Returns the secret key for the backend server connection.
1567 ///
1568 /// This is used for cancellation requests.
1569 /// Returns 0 for gRPC connections (not applicable).
1570 pub fn secret_key(&self) -> i32 {
1571 match &self.transport {
1572 Transport::Tcp(tcp) => tcp.client.secret_key(),
1573 Transport::Grpc(_) => 0,
1574 }
1575 }
1576
1577 /// Returns a server parameter value by name.
1578 ///
1579 /// Server parameters are sent by the server during connection startup.
1580 /// Common parameters include:
1581 /// - `server_version` - The server version string
1582 /// - `server_encoding` - The server's character encoding
1583 /// - `client_encoding` - The client's character encoding
1584 /// - `DateStyle` - Date display format
1585 /// - `TimeZone` - Server timezone
1586 /// - `session_identifier` - Session ID for connection migration (if routing enabled)
1587 ///
1588 /// Returns `None` if the parameter is not known.
1589 ///
1590 /// # Example
1591 ///
1592 /// ```no_run
1593 /// use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
1594 ///
1595 /// fn main() -> Result<()> {
1596 /// let hyper = HyperProcess::new(None, None)?;
1597 /// let conn = Connection::new(&hyper, "test.hyper", CreateMode::CreateIfNotExists)?;
1598 ///
1599 /// if let Some(version) = conn.parameter_status("server_version") {
1600 /// println!("Connected to Hyper version: {}", version);
1601 /// }
1602 /// Ok(())
1603 /// }
1604 /// ```
1605 pub fn parameter_status(&self, name: &str) -> Option<String> {
1606 match &self.transport {
1607 Transport::Tcp(tcp) => tcp.client.parameter_status(name),
1608 Transport::Grpc(_) => None, // gRPC doesn't have server parameters
1609 }
1610 }
1611
1612 /// Sets the notice receiver for this connection.
1613 ///
1614 /// Server notices and warnings are passed to this callback instead of being
1615 /// logged. Pass `None` to restore default logging behavior.
1616 pub fn set_notice_receiver(
1617 &mut self,
1618 receiver: Option<hyperdb_api_core::client::NoticeReceiver>,
1619 ) {
1620 match &mut self.transport {
1621 Transport::Tcp(tcp) => tcp.client.set_notice_receiver(receiver),
1622 Transport::Grpc(_) => {} // gRPC doesn't support notice receivers
1623 }
1624 }
1625
1626 /// Cancels the currently executing query (thread-safe).
1627 ///
1628 /// # Errors
1629 ///
1630 /// - Returns [`Error::Other`] on gRPC connections — cancellation is not
1631 /// yet implemented for gRPC transport.
1632 /// - Returns [`Error::Client`] or [`Error::Io`] if the separate
1633 /// cancel-request connection to the server fails.
1634 pub fn cancel(&self) -> Result<()> {
1635 match &self.transport {
1636 Transport::Tcp(tcp) => tcp.client.cancel().map_err(Error::from),
1637 Transport::Grpc(_) => Err(Error::new(
1638 "Query cancellation is not yet supported for gRPC connections.",
1639 )),
1640 }
1641 }
1642
1643 /// Closes the connection, detaching all databases first.
1644 ///
1645 /// # Errors
1646 ///
1647 /// - Returns [`Error::Other`] wrapping the underlying close failure
1648 /// (its `source` is the transport error) if the client cannot be
1649 /// shut down cleanly.
1650 /// - Returns [`Error::Other`] wrapping the detach failure if the
1651 /// attached database could not be detached but close itself
1652 /// succeeded.
1653 pub fn close(self) -> Result<()> {
1654 // Detach the attached database to ensure files are flushed and released.
1655 // Always attempt close, even if detach fails.
1656 let detach_err = if let Some(ref db_path) = self.database {
1657 let db_alias = std::path::Path::new(db_path)
1658 .file_stem()
1659 .and_then(|s| s.to_str())
1660 .unwrap_or("db");
1661 self.execute_command(&format!("DETACH DATABASE {}", escape_sql_path(db_alias)))
1662 .err()
1663 } else {
1664 None
1665 };
1666
1667 // Always attempt to close the client to release the connection.
1668 let close_result = match self.transport {
1669 Transport::Tcp(tcp) => tcp.client.close(),
1670 Transport::Grpc(_) => Ok(()), // gRPC connections are stateless
1671 };
1672
1673 if let Err(e) = close_result {
1674 return Err(Error::with_cause("Failed to close connection", e));
1675 }
1676
1677 if let Some(e) = detach_err {
1678 // Detach failed but close succeeded; surface the detach error.
1679 return Err(Error::with_cause(
1680 "Failed to detach database during close",
1681 e,
1682 ));
1683 }
1684
1685 Ok(())
1686 }
1687
1688 /// Unloads the database from memory while keeping the connection active.
1689 ///
1690 /// This executes the `UNLOAD DATABASE` command, which releases the database
1691 /// from memory but keeps the session and connection open. The database can
1692 /// be accessed again by subsequent queries that will automatically reload it.
1693 ///
1694 /// This is useful for releasing memory locks when switching between databases
1695 /// or when working with multiple database files.
1696 ///
1697 /// # Example
1698 ///
1699 /// ```no_run
1700 /// use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
1701 ///
1702 /// fn main() -> Result<()> {
1703 /// let hyper = HyperProcess::new(None, None)?;
1704 /// let conn = Connection::new(&hyper, "test.hyper", CreateMode::Create)?;
1705 ///
1706 /// // Do some work with the database
1707 /// conn.execute_command("CREATE TABLE test (id INT)")?;
1708 ///
1709 /// // Unload from memory (but keep connection)
1710 /// conn.unload_database()?;
1711 ///
1712 /// // Database can still be accessed (will be reloaded automatically)
1713 /// let count: i64 = conn.fetch_scalar("SELECT COUNT(*) FROM test")?;
1714 /// println!("Count: {}", count);
1715 ///
1716 /// Ok(())
1717 /// }
1718 /// ```
1719 ///
1720 /// # Errors
1721 ///
1722 /// Returns [`Error::Client`] if the server rejects the `UNLOAD DATABASE`
1723 /// command (e.g. the database is still in use by another session).
1724 pub fn unload_database(&self) -> Result<()> {
1725 self.execute_command("UNLOAD DATABASE")?;
1726 Ok(())
1727 }
1728
1729 /// Releases the database completely from the session.
1730 ///
1731 /// This executes the `UNLOAD RELEASE` command, which completely releases
1732 /// the database from the session. After this call, the database cannot
1733 /// be accessed until a new connection is established.
1734 ///
1735 /// This is useful for completely freeing database resources when you're
1736 /// done with a database and want to ensure no locks are held.
1737 ///
1738 /// **Note:** This should only be used when the session has exactly one
1739 /// database attached. Hyper does not support `UNLOAD RELEASE` with
1740 /// multiple databases attached to the same session.
1741 ///
1742 /// # Example
1743 ///
1744 /// ```no_run
1745 /// use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
1746 ///
1747 /// fn main() -> Result<()> {
1748 /// let hyper = HyperProcess::new(None, None)?;
1749 /// let conn = Connection::new(&hyper, "test.hyper", CreateMode::Create)?;
1750 ///
1751 /// // Do some work with the database
1752 /// conn.execute_command("CREATE TABLE test (id INT)")?;
1753 ///
1754 /// // Release database completely from session
1755 /// conn.unload_release()?;
1756 ///
1757 /// // Database cannot be accessed after this point without new connection
1758 /// // conn.execute_command("SELECT * FROM test")?; // This would fail
1759 ///
1760 /// Ok(())
1761 /// }
1762 /// ```
1763 ///
1764 /// # Errors
1765 ///
1766 /// Returns [`Error::Client`] if the server rejects `UNLOAD RELEASE`, most
1767 /// commonly because multiple databases are attached to the same session
1768 /// (Hyper only supports `UNLOAD RELEASE` with exactly one attached DB).
1769 pub fn unload_release(&self) -> Result<()> {
1770 self.execute_command("UNLOAD RELEASE")?;
1771 Ok(())
1772 }
1773
1774 // =========================================================================
1775 // Query Statistics
1776 // =========================================================================
1777
1778 /// Enables query statistics collection for this connection.
1779 ///
1780 /// After enabling, each `execute_command()` or `execute_query()` call will
1781 /// capture detailed performance metrics from Hyper. Retrieve them via
1782 /// [`last_query_stats()`](Self::last_query_stats).
1783 ///
1784 /// The provider determines how stats are collected. Use
1785 /// [`LogFileStatsProvider`](crate::LogFileStatsProvider) to parse Hyper's log file (requires local
1786 /// `hyperd.log`), or implement a custom [`QueryStatsProvider`](crate::QueryStatsProvider).
1787 ///
1788 /// # Example
1789 ///
1790 /// ```no_run
1791 /// # use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
1792 /// # fn main() -> Result<()> {
1793 /// # let hyper = HyperProcess::new(None, None)?;
1794 /// # let mut conn = Connection::new(&hyper, "test.hyper", CreateMode::CreateIfNotExists)?;
1795 /// use hyperdb_api::LogFileStatsProvider;
1796 ///
1797 /// // Auto-detect log path from HyperProcess
1798 /// conn.enable_query_stats(LogFileStatsProvider::from_process(&hyper));
1799 ///
1800 /// // Or specify an explicit log path
1801 /// // conn.enable_query_stats(LogFileStatsProvider::new("/path/to/hyperd.log"));
1802 /// # Ok(())
1803 /// # }
1804 /// ```
1805 pub fn enable_query_stats(&mut self, provider: impl QueryStatsProvider + 'static) {
1806 self.stats_provider = Some(Arc::new(provider));
1807 }
1808
1809 /// Disables query statistics collection.
1810 ///
1811 /// After calling this, `last_query_stats()` will return `None`.
1812 pub fn disable_query_stats(&mut self) {
1813 self.stats_provider = None;
1814 if let Ok(mut guard) = self.pending_stats.lock() {
1815 *guard = None;
1816 }
1817 }
1818
1819 /// Returns the query statistics from the most recent query execution.
1820 ///
1821 /// Stats are resolved **lazily** — the log file is read when this method
1822 /// is called, not when the query executes. This is important for streaming
1823 /// queries (`execute_query`), where Hyper writes the execution stats only
1824 /// after the result set is fully consumed.
1825 ///
1826 /// **Call this after consuming the result set** (e.g., after `collect_rows()`,
1827 /// iterating all chunks, or dropping the `Rowset`).
1828 ///
1829 /// Returns `None` if:
1830 /// - Query stats collection is not enabled
1831 /// - No query has been executed yet
1832 /// - Stats could not be found for the last query (e.g., log entry not matched)
1833 ///
1834 /// # Example
1835 ///
1836 /// ```no_run
1837 /// # use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
1838 /// # fn main() -> Result<()> {
1839 /// # let hyper = HyperProcess::new(None, None)?;
1840 /// # let mut conn = Connection::new(&hyper, "test.hyper", CreateMode::CreateIfNotExists)?;
1841 /// # use hyperdb_api::LogFileStatsProvider;
1842 /// # conn.enable_query_stats(LogFileStatsProvider::from_process(&hyper));
1843 /// conn.execute_command("CREATE TABLE t (id INT)")?;
1844 ///
1845 /// if let Some(stats) = conn.last_query_stats() {
1846 /// println!("Total: {}s", stats.elapsed_s);
1847 /// if let Some(ref pre) = stats.pre_execution {
1848 /// println!(" Parse: {:?}s", pre.parsing_time_s);
1849 /// println!(" Compile: {:?}s", pre.compilation_time_s);
1850 /// }
1851 /// if let Some(ref exec) = stats.execution {
1852 /// println!(" Execute: {:?}s", exec.elapsed_s);
1853 /// println!(" Peak mem: {:?} MB", exec.peak_memory_mb);
1854 /// }
1855 /// }
1856 /// # Ok(())
1857 /// # }
1858 /// ```
1859 pub fn last_query_stats(&self) -> Option<QueryStats> {
1860 let provider = self.stats_provider.as_ref()?;
1861 let mut guard = self.pending_stats.lock().ok()?;
1862 let (token, sql) = guard.take()?;
1863 provider.after_query(token, &sql)
1864 }
1865
1866 /// Internal: call provider's `before_query` if stats are enabled.
1867 fn stats_before_query(&self, sql: &str) -> Option<Box<dyn Any + Send>> {
1868 self.stats_provider.as_ref().map(|p| p.before_query(sql))
1869 }
1870
1871 /// Internal: store the pending token+sql for lazy resolution.
1872 fn stats_store_pending(&self, token: Option<Box<dyn Any + Send>>, sql: &str) {
1873 if let Some(token) = token {
1874 if let Ok(mut guard) = self.pending_stats.lock() {
1875 *guard = Some((token, sql.to_string()));
1876 }
1877 }
1878 }
1879}
1880
1881impl Connection {
1882 // =========================================================================
1883 // Transaction Control
1884 // =========================================================================
1885
1886 /// Begins an explicit transaction.
1887 ///
1888 /// For an RAII guard that auto-rolls back on drop, use [`transaction()`](Self::transaction) instead.
1889 ///
1890 /// # Example
1891 ///
1892 /// ```no_run
1893 /// # use hyperdb_api::{Connection, CreateMode, Result};
1894 /// # fn main() -> Result<()> {
1895 /// # let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
1896 /// conn.begin_transaction()?;
1897 /// conn.execute_command("INSERT INTO users VALUES (1, 'Alice')")?;
1898 /// conn.commit()?;
1899 /// # Ok(())
1900 /// # }
1901 /// ```
1902 ///
1903 /// # Errors
1904 ///
1905 /// Returns [`Error::Client`] if the server rejects `BEGIN TRANSACTION`
1906 /// (e.g. a transaction is already open on this session).
1907 pub fn begin_transaction(&self) -> Result<()> {
1908 self.execute_command("BEGIN TRANSACTION")?;
1909 Ok(())
1910 }
1911
1912 /// Commits the current transaction.
1913 ///
1914 /// # Errors
1915 ///
1916 /// Returns [`Error::Client`] if the server rejects `COMMIT` — most
1917 /// commonly because no transaction is currently open.
1918 pub fn commit(&self) -> Result<()> {
1919 self.execute_command("COMMIT")?;
1920 Ok(())
1921 }
1922
1923 /// Rolls back the current transaction.
1924 ///
1925 /// # Errors
1926 ///
1927 /// Returns [`Error::Client`] if the server rejects `ROLLBACK` — most
1928 /// commonly because no transaction is currently open.
1929 pub fn rollback(&self) -> Result<()> {
1930 self.execute_command("ROLLBACK")?;
1931 Ok(())
1932 }
1933
1934 /// Starts a transaction and returns an RAII guard that auto-rolls back on drop.
1935 ///
1936 /// The returned [`Transaction`](crate::Transaction) exclusively borrows this connection,
1937 /// preventing any other use of the connection while the transaction is active.
1938 /// This is enforced at compile time by Rust's borrow checker. The guard provides
1939 /// `commit()` and `rollback()` methods. If dropped without calling either, the
1940 /// transaction is automatically rolled back.
1941 ///
1942 /// # Example
1943 ///
1944 /// ```no_run
1945 /// # use hyperdb_api::{Connection, CreateMode, Result};
1946 /// # fn main() -> Result<()> {
1947 /// # let mut conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
1948 /// let txn = conn.transaction()?;
1949 /// txn.execute_command("INSERT INTO users VALUES (1, 'Alice')")?;
1950 /// txn.commit()?; // or drop `txn` to auto-rollback
1951 /// # Ok(())
1952 /// # }
1953 /// ```
1954 ///
1955 /// # Errors
1956 ///
1957 /// Returns [`Error::Client`] if the server rejects the `BEGIN`
1958 /// statement issued internally by
1959 /// [`Transaction::new`](crate::Transaction).
1960 pub fn transaction(&mut self) -> Result<crate::Transaction<'_>> {
1961 crate::Transaction::new(self)
1962 }
1963}
1964
1965/// Checks if an error indicates an "already exists" condition based on SQLSTATE codes.
1966///
1967/// This function uses `PostgreSQL` SQLSTATE codes to reliably detect duplicate object errors
1968/// regardless of server locale or message formatting. The codes checked are:
1969/// - `42P04`: Database already exists
1970/// - `42710`: Duplicate object
1971/// - `42P06`: Duplicate schema
1972/// - `42P07`: Duplicate table
1973///
1974/// See: <https://www.postgresql.org/docs/current/errcodes-appendix.html>
1975fn is_already_exists_error(err: &Error) -> bool {
1976 err.sqlstate()
1977 .is_some_and(|code| matches!(code, "42P04" | "42710" | "42P06" | "42P07"))
1978}