hyperdb_api/connection.rs
1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Database connection management.
5//!
6//! The [`Connection`] type provides a unified interface for connecting to Hyper
7//! databases via either TCP (`PostgreSQL` wire protocol) or gRPC transport.
8//! The transport is automatically detected from the endpoint URL:
9//!
10//! - `https://` or `http://` → gRPC transport
11//! - Otherwise → TCP transport (e.g., `localhost:7483`)
12
13use std::path::Path;
14
15use hyperdb_api_core::client::Client;
16
17use crate::error::{Error, Result};
18use crate::names::escape_sql_path;
19use crate::process::HyperProcess;
20use crate::result::{Row, Rowset, DEFAULT_BINARY_CHUNK_SIZE};
21use crate::transport::Transport;
22
23use std::any::Any;
24use std::sync::{Arc, Mutex};
25
26use crate::query_stats::{QueryStats, QueryStatsProvider};
27
28/// Trait for types that can be extracted from a scalar query result.
29///
30/// This trait enables the generic [`Connection::execute_scalar_query`] method,
31/// similar to C++'s `executeScalarQuery<T>()` template.
32///
33/// # Implementing Custom Types
34///
35/// You can implement this trait for custom types to use them with `execute_scalar_query`:
36///
37/// ```no_run
38/// # use hyperdb_api::{Row, ScalarValue};
39/// # struct MyType;
40/// # impl MyType { fn parse(s: &str) -> Self { MyType } }
41/// impl ScalarValue for MyType {
42/// fn from_row(row: &Row, col: usize) -> Option<Self> {
43/// row.get_string(col).map(|s| MyType::parse(&s))
44/// }
45/// }
46/// ```
47pub trait ScalarValue: Sized {
48 /// Extracts a value of this type from a row at the given column.
49 fn from_row(row: &Row, col: usize) -> Option<Self>;
50}
51
52impl ScalarValue for i64 {
53 fn from_row(row: &Row, col: usize) -> Option<Self> {
54 row.get_i64(col)
55 }
56}
57
58impl ScalarValue for i32 {
59 fn from_row(row: &Row, col: usize) -> Option<Self> {
60 row.get_i32(col)
61 }
62}
63
64impl ScalarValue for i16 {
65 fn from_row(row: &Row, col: usize) -> Option<Self> {
66 row.get_i16(col)
67 }
68}
69
70impl ScalarValue for f64 {
71 fn from_row(row: &Row, col: usize) -> Option<Self> {
72 row.get_f64(col)
73 }
74}
75
76impl ScalarValue for bool {
77 fn from_row(row: &Row, col: usize) -> Option<Self> {
78 row.get_bool(col)
79 }
80}
81
82impl ScalarValue for String {
83 fn from_row(row: &Row, col: usize) -> Option<Self> {
84 row.get_string(col)
85 }
86}
87
88/// Database creation mode when connecting.
89#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
90pub enum CreateMode {
91 /// Do not create the database. Method will fail if database doesn't exist.
92 #[default]
93 DoNotCreate,
94 /// Create the database. Method will fail if the database already exists.
95 Create,
96 /// Create the database if it doesn't exist.
97 CreateIfNotExists,
98 /// Create the database. If it already exists, drop the old one first.
99 CreateAndReplace,
100}
101
102/// A connection to a Hyper database.
103///
104/// This struct represents an active connection to a Hyper server and optionally
105/// an attached database. The connection is automatically closed when dropped.
106///
107/// # Transport Auto-Detection
108///
109/// The transport is automatically detected from the endpoint URL:
110/// - `https://` or `http://` → gRPC transport (read-only until server supports writes)
111/// - Otherwise → TCP transport (full read/write support)
112///
113/// # CSV / Text Import & Export
114///
115/// For CSV, TSV, and other delimited-text formats, see the [`copy`](crate::copy)
116/// module which provides [`export_csv()`](Self::export_csv),
117/// [`import_csv()`](Self::import_csv), and related methods on this struct.
118///
119/// # Example
120///
121/// ```no_run
122/// use hyperdb_api::{Connection, CreateMode, Result};
123///
124/// fn main() -> Result<()> {
125/// // TCP connection (full read/write)
126/// let conn = Connection::connect("localhost:7483", "example.hyper", CreateMode::CreateIfNotExists)?;
127///
128/// // Execute SQL commands
129/// conn.execute_command("CREATE TABLE test (id INT, name TEXT)")?;
130/// conn.execute_command("INSERT INTO test VALUES (1, 'Hello')")?;
131///
132/// Ok(())
133/// }
134/// ```
135///
136/// ```no_run
137/// # use hyperdb_api::{Connection, CreateMode, Result};
138/// # fn example() -> Result<()> {
139/// // gRPC connection (read-only, auto-detected from URL)
140/// let conn = Connection::connect(
141/// "https://hyper-server.example.com:443",
142/// "example.hyper",
143/// CreateMode::DoNotCreate, // Must be DoNotCreate for gRPC
144/// )?;
145/// # Ok(())
146/// # }
147/// ```
148pub struct Connection {
149 transport: Transport,
150 database: Option<String>,
151 stats_provider: Option<Arc<dyn QueryStatsProvider>>,
152 /// Pending stats token + SQL from the most recent query, resolved lazily.
153 pending_stats: Mutex<Option<(Box<dyn Any + Send>, String)>>,
154}
155
156impl std::fmt::Debug for Connection {
157 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158 f.debug_struct("Connection")
159 .field("database", &self.database)
160 .finish_non_exhaustive()
161 }
162}
163
164impl Connection {
165 /// Creates a new connection to a Hyper instance with a database.
166 ///
167 /// This is the primary way to connect to a running [`HyperProcess`].
168 ///
169 /// # Arguments
170 ///
171 /// * `instance` - The Hyper server instance to connect to.
172 /// * `database_path` - Path to the database file.
173 /// * `create_mode` - How to handle database creation.
174 ///
175 /// # Errors
176 ///
177 /// Returns an error if the connection could not be established.
178 ///
179 /// # Example
180 ///
181 /// ```no_run
182 /// use hyperdb_api::{HyperProcess, Connection, CreateMode, Result};
183 ///
184 /// fn main() -> Result<()> {
185 /// let hyper = HyperProcess::new(None, None)?;
186 /// let conn = Connection::new(&hyper, "database.hyper", CreateMode::CreateIfNotExists)?;
187 /// Ok(())
188 /// }
189 /// ```
190 pub fn new(
191 instance: &HyperProcess,
192 database_path: impl AsRef<Path>,
193 create_mode: CreateMode,
194 ) -> Result<Self> {
195 // Prefer using the connection_endpoint which properly handles UDS/Named Pipes
196 if let Some(conn_endpoint) = instance.connection_endpoint() {
197 return Self::connect_with_endpoint(
198 conn_endpoint,
199 &database_path.as_ref().to_string_lossy(),
200 create_mode,
201 );
202 }
203
204 // Fall back to string endpoint (TCP)
205 let endpoint = instance.require_endpoint()?;
206 Self::connect(
207 endpoint,
208 &database_path.as_ref().to_string_lossy(),
209 create_mode,
210 )
211 }
212
213 /// Connects using a `ConnectionEndpoint` (supports TCP, UDS, and Named Pipes).
214 fn connect_with_endpoint(
215 endpoint: &hyperdb_api_core::client::ConnectionEndpoint,
216 database_path: &str,
217 create_mode: CreateMode,
218 ) -> Result<Self> {
219 let db_path_str = Some(database_path.to_string());
220
221 let config = hyperdb_api_core::client::Config::new().with_user("tableau_internal_user");
222
223 let client = hyperdb_api_core::client::Client::connect_endpoint(endpoint, &config)?;
224
225 let conn = Connection::from_client(client, db_path_str.clone());
226
227 // Handle database creation
228 if let Some(db_path) = db_path_str {
229 conn.handle_creation_mode(&db_path, create_mode)?;
230 conn.attach_and_set_path(&db_path)?;
231 }
232
233 Ok(conn)
234 }
235
236 /// Connects to a Hyper server and optionally attaches a database.
237 ///
238 /// # Arguments
239 ///
240 /// * `endpoint` - The server endpoint (host:port).
241 /// * `database_path` - Path to the database file.
242 /// * `create_mode` - How to handle database creation.
243 ///
244 /// # Errors
245 ///
246 /// Returns an error if the connection could not be established.
247 pub fn connect(endpoint: &str, database_path: &str, create_mode: CreateMode) -> Result<Self> {
248 crate::ConnectionBuilder::new(endpoint)
249 .database(database_path)
250 .create_mode(create_mode)
251 .build()
252 }
253
254 /// Returns a connection builder for advanced configuration.
255 ///
256 /// This is useful when you need to set authentication, timeouts, or
257 /// other advanced options before connecting.
258 #[must_use]
259 pub fn builder(endpoint: &str) -> crate::ConnectionBuilder {
260 crate::ConnectionBuilder::new(endpoint)
261 }
262
263 /// Creates a Connection from a low-level Client (internal use, TCP only).
264 pub(crate) fn from_client(client: Client, database: Option<String>) -> Self {
265 Connection {
266 transport: Transport::Tcp(Box::new(crate::transport::TcpTransport { client })),
267 database,
268 stats_provider: None,
269 pending_stats: Mutex::new(None),
270 }
271 }
272
273 /// Creates a Connection from a Transport (internal use).
274 #[allow(
275 dead_code,
276 reason = "used by ConnectionBuilder for the gRPC path; not reached under non-gRPC feature builds"
277 )]
278 pub(crate) fn from_transport(transport: Transport, database: Option<String>) -> Self {
279 Connection {
280 transport,
281 database,
282 stats_provider: None,
283 pending_stats: Mutex::new(None),
284 }
285 }
286
287 /// Returns the transport type name (e.g., "TCP", "gRPC", "Unix Socket").
288 pub fn transport_type(&self) -> &'static str {
289 self.transport.transport_type().as_str()
290 }
291
292 /// Returns true if this connection supports write operations.
293 ///
294 /// Currently, only TCP connections support writes. gRPC connections are
295 /// read-only until the server supports write operations over gRPC.
296 pub fn supports_writes(&self) -> bool {
297 self.transport.supports_writes()
298 }
299
300 /// Handles database creation logic (internal use).
301 pub(crate) fn handle_creation_mode(
302 &self,
303 database_path: &str,
304 create_mode: CreateMode,
305 ) -> Result<()> {
306 match create_mode {
307 CreateMode::DoNotCreate => {}
308 CreateMode::Create => {
309 self.execute_command(&format!(
310 "CREATE DATABASE {}",
311 escape_sql_path(database_path)
312 ))?;
313 }
314 CreateMode::CreateIfNotExists => {
315 if let Err(e) = self.execute_command(&format!(
316 "CREATE DATABASE IF NOT EXISTS {}",
317 escape_sql_path(database_path)
318 )) {
319 if !is_already_exists_error(&e) {
320 return Err(Error::internal(format!(
321 "Failed to create database '{database_path}': {e}"
322 )));
323 }
324 }
325 }
326 CreateMode::CreateAndReplace => {
327 let _ = self.execute_command(&format!(
328 "DROP DATABASE IF EXISTS {}",
329 escape_sql_path(database_path)
330 ));
331 self.execute_command(&format!(
332 "CREATE DATABASE {}",
333 escape_sql_path(database_path)
334 ))?;
335 }
336 }
337 Ok(())
338 }
339
340 /// Attaches and sets the database path (internal use).
341 pub(crate) fn attach_and_set_path(&self, database_path: &str) -> Result<()> {
342 let db_alias = std::path::Path::new(database_path)
343 .file_stem()
344 .and_then(|s| s.to_str())
345 .unwrap_or("db");
346
347 self.execute_command(&format!(
348 "ATTACH DATABASE {} AS {}",
349 escape_sql_path(database_path),
350 escape_sql_path(db_alias)
351 ))?;
352
353 self.execute_command(&format!(
354 "SET search_path TO {}, public",
355 escape_sql_path(db_alias)
356 ))?;
357
358 Ok(())
359 }
360
361 /// Connects to a Hyper server with authentication.
362 ///
363 /// # Arguments
364 ///
365 /// * `endpoint` - The server endpoint (host:port).
366 /// * `database_path` - Path to the database file.
367 /// * `create_mode` - How to handle database creation.
368 /// * `user` - Username for authentication.
369 /// * `password` - Password for authentication.
370 ///
371 /// # Errors
372 ///
373 /// Returns an error if the connection or authentication fails.
374 pub fn connect_with_auth(
375 endpoint: &str,
376 database_path: &str,
377 create_mode: CreateMode,
378 user: &str,
379 password: &str,
380 ) -> Result<Self> {
381 crate::ConnectionBuilder::new(endpoint)
382 .database(database_path)
383 .create_mode(create_mode)
384 .user(user.to_string())
385 .password(password)
386 .build()
387 }
388
389 /// Creates a connection to a Hyper server without attaching a database.
390 ///
391 /// # Errors
392 ///
393 /// Returns [`Error::Connection`] if the TCP or gRPC handshake fails, and
394 /// [`Error::Io`] if the endpoint cannot be reached.
395 pub fn without_database(endpoint: &str) -> Result<Self> {
396 crate::ConnectionBuilder::new(endpoint).build()
397 }
398
399 /// Executes a SQL command that doesn't return results.
400 ///
401 /// Use this for DDL statements (CREATE, ALTER, DROP) and DML statements
402 /// (INSERT, UPDATE, DELETE).
403 ///
404 /// # Arguments
405 ///
406 /// * `command` - The SQL command to execute.
407 ///
408 /// # Returns
409 ///
410 /// The number of affected rows, or 0 if not applicable.
411 ///
412 /// # Errors
413 ///
414 /// Returns an error if:
415 /// - The connection is using gRPC transport (write operations not yet supported)
416 /// - The command fails to execute
417 pub fn execute_command(&self, command: &str) -> Result<u64> {
418 let token = self.stats_before_query(command);
419
420 let result = self.transport.execute_command(command);
421
422 // For commands, the query is fully executed synchronously, so we
423 // can store the pending token immediately for lazy resolution.
424 self.stats_store_pending(token, command);
425
426 result
427 }
428
429 /// Executes a SQL query and returns a streaming result set.
430 ///
431 /// Results are streamed in chunks (default 64K rows), keeping memory usage
432 /// constant regardless of result set size. This makes it safe for any
433 /// result size, from a single row to billions of rows.
434 ///
435 /// # Example
436 ///
437 /// ```no_run
438 /// # use hyperdb_api::{Connection, Result};
439 /// # fn example(conn: &Connection) -> Result<()> {
440 /// let mut result = conn.execute_query("SELECT id, value FROM measurements")?;
441 /// while let Some(chunk) = result.next_chunk()? {
442 /// for row in &chunk {
443 /// // Generic typed access (like C++ row.get<T>())
444 /// let id: Option<i32> = row.get(0);
445 /// let value: Option<f64> = row.get(1);
446 ///
447 /// // Or direct accessors
448 /// let id = row.get_i32(0);
449 /// let value = row.get_f64(1);
450 /// }
451 /// }
452 /// # Ok(())
453 /// # }
454 /// ```
455 ///
456 /// # Memory Behavior
457 ///
458 /// - Only one chunk is held in memory at a time (~few MB for 64K rows)
459 /// - Safe for result sets of any size (millions/billions of rows)
460 /// - Memory usage is `O(chunk_size)`, not `O(total_rows)`
461 ///
462 /// # Errors
463 ///
464 /// - Returns [`Error::Server`] wrapping a `hyperdb_api_core::client::Error` if the
465 /// SQL fails to parse, execute, or if the server reports an error
466 /// while streaming.
467 /// - Returns [`Error::Io`] on transport-level I/O failures.
468 pub fn execute_query(&self, query: &str) -> Result<Rowset<'_>> {
469 let token = self.stats_before_query(query);
470
471 let result = match &self.transport {
472 Transport::Tcp(tcp) => {
473 let stream = tcp
474 .client
475 .query_streaming(query, DEFAULT_BINARY_CHUNK_SIZE)?;
476 Ok(Rowset::new(stream))
477 }
478 Transport::Grpc(grpc) => {
479 // gRPC streaming: pull chunks lazily so peak memory is
480 // bounded by one gRPC message (tonic default 64 MB), not
481 // by the full result size. Matches TCP's
482 // constant-memory streaming shape.
483 //
484 // The transport module already creates a fresh gRPC client
485 // per query (gRPC client needs &mut self to execute), so we
486 // do the same here: connect, start the stream, wrap as a
487 // `ChunkSource`. The stream keeps the channel and runtime
488 // alive via refcounted handles inside `GrpcChunkStreamSync`.
489 let mut client =
490 hyperdb_api_core::client::grpc::GrpcClientSync::connect(grpc.config.clone())?;
491 let stream = client.execute_query_stream(query)?;
492 let source = Box::new(crate::grpc_connection::GrpcChunkStreamSource::new(stream));
493 let arrow_rowset = crate::arrow_result::ArrowRowset::from_stream(source)?;
494 Ok(Rowset::from_arrow(arrow_rowset))
495 }
496 };
497
498 // Store the pending token — Hyper logs the execution stats after the
499 // result is consumed (streamed), so we defer resolution until
500 // last_query_stats() is called.
501 self.stats_store_pending(token, query);
502
503 result
504 }
505
506 // =========================================================================
507 // Arrow Format Queries
508 // =========================================================================
509
510 /// Executes a SELECT query and returns results as Arrow IPC stream bytes.
511 ///
512 /// # Example
513 ///
514 /// ```no_run
515 /// use hyperdb_api::{Connection, CreateMode, Result};
516 ///
517 /// fn main() -> Result<()> {
518 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
519 ///
520 /// // Create and populate a table
521 /// conn.execute_command("CREATE TABLE data (id INT, value DOUBLE PRECISION)")?;
522 /// conn.execute_command("INSERT INTO data VALUES (1, 1.5), (2, 2.5)")?;
523 ///
524 /// // Get results as Arrow IPC stream
525 /// let arrow_data = conn.execute_query_to_arrow("SELECT * FROM data")?;
526 /// println!("Got {} bytes of Arrow IPC data", arrow_data.len());
527 ///
528 /// Ok(())
529 /// }
530 /// ```
531 ///
532 /// # Errors
533 ///
534 /// Propagates any [`Error::Server`] from the TCP or gRPC transport when
535 /// the query fails or the server cannot produce Arrow IPC output.
536 pub fn execute_query_to_arrow(&self, select_query: &str) -> Result<bytes::Bytes> {
537 self.transport.execute_query_to_arrow(select_query)
538 }
539
540 /// Exports an entire table to Arrow IPC stream format.
541 ///
542 /// This is a convenience method equivalent to
543 /// `execute_query_to_arrow("SELECT * FROM table_name")`.
544 ///
545 /// # Arguments
546 ///
547 /// * `table_name` - The table name
548 ///
549 /// # Returns
550 ///
551 /// Raw Arrow IPC stream bytes containing all rows from the table.
552 ///
553 /// # Example
554 ///
555 /// ```no_run
556 /// use hyperdb_api::{Connection, CreateMode, Result};
557 ///
558 /// fn main() -> Result<()> {
559 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
560 /// let arrow_data = conn.export_table_to_arrow("my_table")?;
561 /// Ok(())
562 /// }
563 /// ```
564 ///
565 /// # Errors
566 ///
567 /// Returns whatever [`execute_query_to_arrow`](Self::execute_query_to_arrow)
568 /// would return for `SELECT * FROM <table_name>` — typically
569 /// [`Error::Server`] if the table does not exist or the query is rejected.
570 pub fn export_table_to_arrow(&self, table_name: &str) -> Result<bytes::Bytes> {
571 self.execute_query_to_arrow(&format!("SELECT * FROM {table_name}"))
572 }
573
574 /// Executes a SELECT query and returns results as Arrow `RecordBatch`es.
575 ///
576 /// This is the recommended method for Arrow-native workflows (`DataFusion`,
577 /// Polars, etc.) where you want direct `RecordBatch` access without going
578 /// through the `Row` abstraction.
579 ///
580 /// # Example
581 ///
582 /// ```no_run
583 /// use hyperdb_api::{Connection, CreateMode, Result};
584 /// use arrow::record_batch::RecordBatch;
585 ///
586 /// fn main() -> Result<()> {
587 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
588 ///
589 /// let batches: Vec<RecordBatch> = conn.execute_query_to_batches("SELECT * FROM data")?;
590 /// for batch in &batches {
591 /// println!("batch: {} rows x {} cols", batch.num_rows(), batch.num_columns());
592 /// }
593 /// Ok(())
594 /// }
595 /// ```
596 ///
597 /// # Errors
598 ///
599 /// - Returns [`Error::Server`] if the query itself fails.
600 /// - Returns [`Error::Conversion`] if the Arrow IPC payload returned by the
601 /// server is malformed and cannot be decoded into record batches.
602 pub fn execute_query_to_batches(
603 &self,
604 select_query: &str,
605 ) -> Result<Vec<arrow::record_batch::RecordBatch>> {
606 let arrow_data = self.execute_query_to_arrow(select_query)?;
607 crate::arrow_result::parse_arrow_ipc(arrow_data)
608 }
609
610 /// Fetches a single row from a query.
611 ///
612 /// Returns an error if the query returns no rows.
613 ///
614 /// # Example
615 ///
616 /// ```no_run
617 /// use hyperdb_api::{Connection, CreateMode, Result};
618 ///
619 /// fn main() -> Result<()> {
620 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
621 /// let row = conn.fetch_one("SELECT * FROM users WHERE id = 1")?;
622 /// let id: Option<i32> = row.get(0);
623 /// let name: Option<String> = row.get(1);
624 /// Ok(())
625 /// }
626 /// ```
627 ///
628 /// # Errors
629 ///
630 /// - Returns the error from [`execute_query`](Self::execute_query) if
631 /// the query itself fails.
632 /// - Returns [`Error::Conversion`] with message `"Query returned no rows"` if
633 /// the query produced zero rows.
634 pub fn fetch_one<Q>(&self, query: Q) -> Result<crate::Row>
635 where
636 Q: AsRef<str>,
637 {
638 let query = query.as_ref();
639 let result = self.execute_query(query)?;
640 result.require_first_row()
641 }
642
643 /// Fetches an optional single row from a query.
644 ///
645 /// Returns `None` if the query returns no rows.
646 ///
647 /// # Example
648 ///
649 /// ```no_run
650 /// use hyperdb_api::{Connection, CreateMode, Result};
651 ///
652 /// fn main() -> Result<()> {
653 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
654 /// if let Some(row) = conn.fetch_optional("SELECT * FROM users WHERE id = 999")? {
655 /// let name: Option<String> = row.get(1);
656 /// println!("Found user: {:?}", name);
657 /// }
658 /// Ok(())
659 /// }
660 /// ```
661 ///
662 /// # Errors
663 ///
664 /// Returns the error from [`execute_query`](Self::execute_query) if the
665 /// query itself fails. An empty result set is not an error — it yields
666 /// `Ok(None)`.
667 pub fn fetch_optional<Q>(&self, query: Q) -> Result<Option<crate::Row>>
668 where
669 Q: AsRef<str>,
670 {
671 let query = query.as_ref();
672 let result = self.execute_query(query)?;
673 result.first_row()
674 }
675
676 /// Fetches all rows from a query.
677 ///
678 /// # Example
679 ///
680 /// ```no_run
681 /// use hyperdb_api::{Connection, CreateMode, Result};
682 ///
683 /// fn main() -> Result<()> {
684 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
685 /// let rows = conn.fetch_all("SELECT * FROM users WHERE active = true ORDER BY name")?;
686 /// for row in rows {
687 /// let id: Option<i32> = row.get(0);
688 /// let name: Option<String> = row.get(1);
689 /// println!("User {}: {:?}", id.unwrap_or(-1), name);
690 /// }
691 /// Ok(())
692 /// }
693 /// ```
694 ///
695 /// # Errors
696 ///
697 /// Returns the error from [`execute_query`](Self::execute_query), or a
698 /// transport error produced while draining every chunk of the streamed
699 /// result set.
700 pub fn fetch_all<Q>(&self, query: Q) -> Result<Vec<crate::Row>>
701 where
702 Q: AsRef<str>,
703 {
704 let query = query.as_ref();
705 let result = self.execute_query(query)?;
706 result.collect_rows()
707 }
708
709 /// Fetches a single row and maps it to a struct using [`FromRow`](crate::FromRow).
710 ///
711 /// Returns an error if the query returns no rows or if mapping fails.
712 ///
713 /// # Example
714 ///
715 /// ```no_run
716 /// use hyperdb_api::{Connection, CreateMode, FromRow, RowAccessor, Result};
717 ///
718 /// struct User { id: i32, name: String }
719 ///
720 /// impl FromRow for User {
721 /// fn from_row(row: RowAccessor<'_>) -> Result<Self> {
722 /// Ok(User {
723 /// id: row.get("id")?,
724 /// name: row.get_opt("name")?.unwrap_or_default(),
725 /// })
726 /// }
727 /// }
728 ///
729 /// fn main() -> Result<()> {
730 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
731 /// let user: User = conn.fetch_one_as("SELECT id, name FROM users WHERE id = 1")?;
732 /// Ok(())
733 /// }
734 /// ```
735 ///
736 /// # Errors
737 ///
738 /// - Returns the error from [`fetch_one`](Self::fetch_one) if the query
739 /// fails or returns no rows.
740 /// - Returns whatever error [`FromRow::from_row`](crate::FromRow::from_row)
741 /// produces when the row cannot be mapped into `T`.
742 pub fn fetch_one_as<T: crate::FromRow>(&self, query: &str) -> Result<T> {
743 let row = self.fetch_one(query)?;
744 let indices = row
745 .schema()
746 .map(crate::row_accessor::RowAccessor::build_indices)
747 .unwrap_or_default();
748 T::from_row(crate::RowAccessor::new(&row, &indices))
749 }
750
751 /// Fetches all rows and maps them to structs using [`FromRow`](crate::FromRow).
752 ///
753 /// # Example
754 ///
755 /// ```no_run
756 /// # use hyperdb_api::{Connection, FromRow, RowAccessor, Result};
757 /// # struct User { id: i32, name: String }
758 /// # impl FromRow for User {
759 /// # fn from_row(row: RowAccessor<'_>) -> Result<Self> {
760 /// # Ok(User { id: row.get("id")?, name: row.get_opt("name")?.unwrap_or_default() })
761 /// # }
762 /// # }
763 /// # fn example(conn: &Connection) -> Result<()> {
764 /// let users: Vec<User> = conn.fetch_all_as("SELECT id, name FROM users")?;
765 /// # Ok(())
766 /// # }
767 /// ```
768 ///
769 /// # Errors
770 ///
771 /// - Returns the error from [`fetch_all`](Self::fetch_all) if the query
772 /// fails.
773 /// - Returns the first error produced by
774 /// [`FromRow::from_row`](crate::FromRow::from_row) on any of the rows.
775 pub fn fetch_all_as<T: crate::FromRow>(&self, query: &str) -> Result<Vec<T>> {
776 let rows = self.fetch_all(query)?;
777 // Build the column-name → index lookup once from the first
778 // row's schema; reuse for every row. All rows in a result set
779 // share the same `Arc<ResultSchema>`, so this is safe.
780 let indices = rows
781 .first()
782 .and_then(crate::result::Row::schema)
783 .map(crate::row_accessor::RowAccessor::build_indices)
784 .unwrap_or_default();
785 rows.iter()
786 .map(|r| T::from_row(crate::RowAccessor::new(r, &indices)))
787 .collect()
788 }
789
790 /// Fetches a single scalar value from a query.
791 ///
792 /// Returns an error if the query returns no rows or NULL.
793 ///
794 /// # Example
795 ///
796 /// ```no_run
797 /// use hyperdb_api::{Connection, CreateMode, Result};
798 ///
799 /// fn main() -> Result<()> {
800 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
801 /// let count: i64 = conn.fetch_scalar("SELECT COUNT(*) FROM users")?;
802 /// println!("User count: {}", count);
803 /// Ok(())
804 /// }
805 /// ```
806 ///
807 /// # Errors
808 ///
809 /// - Returns the error from [`execute_query`](Self::execute_query) if
810 /// the query itself fails.
811 /// - Returns [`Error::Conversion`] with message `"Query returned no rows"` if
812 /// the query produced zero rows.
813 /// - Returns [`Error::Conversion`] with message `"Scalar query returned NULL"`
814 /// if the single cell is SQL `NULL`.
815 pub fn fetch_scalar<T, Q>(&self, query: Q) -> Result<T>
816 where
817 T: crate::connection::ScalarValue + crate::result::RowValue,
818 Q: AsRef<str>,
819 {
820 let query = query.as_ref();
821 let result = self.execute_query(query)?;
822 result.require_scalar()
823 }
824
825 /// Fetches an optional scalar value from a query.
826 ///
827 /// Returns `None` if the query returns no rows or NULL.
828 ///
829 /// # Example
830 ///
831 /// ```no_run
832 /// use hyperdb_api::{Connection, CreateMode, Result};
833 ///
834 /// fn main() -> Result<()> {
835 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
836 /// let max_id: Option<i32> = conn.fetch_optional_scalar("SELECT MAX(id) FROM users")?;
837 /// println!("Max ID: {:?}", max_id);
838 /// Ok(())
839 /// }
840 /// ```
841 ///
842 /// # Errors
843 ///
844 /// - Returns the error from [`execute_query`](Self::execute_query) if
845 /// the query itself fails.
846 /// - Returns [`Error::Conversion`] with message `"Query returned no rows"` if
847 /// the query produced zero rows. (An empty result is treated as an
848 /// error here because we need at least one row to inspect; SQL `NULL`
849 /// in the single cell yields `Ok(None)`.)
850 pub fn fetch_optional_scalar<T, Q>(&self, query: Q) -> Result<Option<T>>
851 where
852 T: crate::connection::ScalarValue + crate::result::RowValue,
853 Q: AsRef<str>,
854 {
855 let query = query.as_ref();
856 let result = self.execute_query(query)?;
857 result.scalar()
858 }
859
860 /// Executes a scalar query and returns a single value of type `T`.
861 ///
862 /// Alias for [`fetch_optional_scalar`](Self::fetch_optional_scalar) for C++ API compatibility.
863 ///
864 /// # Errors
865 ///
866 /// See [`fetch_optional_scalar`](Self::fetch_optional_scalar).
867 #[inline]
868 pub fn execute_scalar_query<T>(&self, query: &str) -> Result<Option<T>>
869 where
870 T: ScalarValue + crate::result::RowValue,
871 {
872 self.fetch_optional_scalar(query)
873 }
874
875 /// Queries for a count value, defaulting to 0 if NULL.
876 ///
877 /// This is optimized for COUNT queries which typically return 0
878 /// instead of NULL when there are no matching rows.
879 ///
880 /// # Example
881 ///
882 /// ```no_run
883 /// use hyperdb_api::{Connection, CreateMode, Result};
884 ///
885 /// fn main() -> Result<()> {
886 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
887 /// let count = conn.query_count("SELECT COUNT(*) FROM users WHERE active = true")?;
888 /// println!("Active users: {}", count);
889 /// Ok(())
890 /// }
891 /// ```
892 ///
893 /// # Errors
894 ///
895 /// Returns the error from [`execute_query`](Self::execute_query) if the
896 /// query fails or produces no rows. SQL `NULL` is mapped to `0`, not an
897 /// error.
898 pub fn query_count(&self, query: &str) -> Result<i64> {
899 self.fetch_optional_scalar::<i64, _>(query)
900 .map(|opt| opt.unwrap_or(0))
901 }
902
903 // =========================================================================
904 // Parameterized Queries (SQL Injection Safe)
905 // =========================================================================
906
907 /// Executes a parameterized query, returning streaming results.
908 ///
909 /// This is safe to use with untrusted user input: parameters travel
910 /// through the extended query protocol (Parse/Bind/Execute) as
911 /// binary `HyperBinary` values and are never interpolated into the
912 /// SQL string. For repeated executions of the same SQL with different
913 /// values, prefer the explicit [`prepare`](Self::prepare) API — it
914 /// returns a reusable [`PreparedStatement`](crate::PreparedStatement)
915 /// that skips the Parse round-trip on every call.
916 ///
917 /// Under the hood, `query_params` is a one-shot
918 /// prepare+execute+close: it prepares an unnamed statement, binds
919 /// the parameters, starts streaming, and closes the statement when
920 /// the returned [`Rowset`] is dropped.
921 ///
922 /// # Arguments
923 ///
924 /// * `query` - The SQL query with parameter placeholders (`$1`, `$2`, etc.)
925 /// * `params` - Parameter values matching the placeholders
926 ///
927 /// # SQL Injection Prevention
928 ///
929 /// ```no_run
930 /// use hyperdb_api::{Connection, CreateMode, Result};
931 ///
932 /// fn search_users(conn: &Connection, user_input: &str) -> Result<()> {
933 /// // DANGEROUS - vulnerable to SQL injection:
934 /// // let query = format!("SELECT * FROM users WHERE name = '{}'", user_input);
935 ///
936 /// // SAFE - parameterized query:
937 /// let mut result = conn.query_params(
938 /// "SELECT * FROM users WHERE name = $1",
939 /// &[&user_input],
940 /// )?;
941 ///
942 /// while let Some(chunk) = result.next_chunk()? {
943 /// for row in &chunk {
944 /// let id: Option<i32> = row.get(0);
945 /// let name: Option<String> = row.get(1);
946 /// println!("Found: {:?} - {:?}", id, name);
947 /// }
948 /// }
949 /// Ok(())
950 /// }
951 /// ```
952 ///
953 /// # Multiple Parameters
954 ///
955 /// ```no_run
956 /// use hyperdb_api::{Connection, CreateMode, Result};
957 ///
958 /// fn main() -> Result<()> {
959 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
960 ///
961 /// // Multiple parameters of different types
962 /// let result = conn.query_params(
963 /// "SELECT * FROM orders WHERE customer_id = $1 AND total > $2",
964 /// &[&42i32, &100.0f64],
965 /// )?;
966 /// Ok(())
967 /// }
968 /// ```
969 ///
970 /// # Errors
971 ///
972 /// - Returns [`Error::FeatureNotSupported`] if the connection is using gRPC transport
973 /// (prepared statements are TCP-only).
974 /// - Returns [`Error::Server`] if the server rejects the statement at
975 /// `Parse`, `Bind`, or `Execute` time, including on type-mismatch
976 /// between `params` and the inferred OIDs.
977 /// - Returns [`Error::Io`] on transport-level I/O failures.
978 pub fn query_params(
979 &self,
980 query: &str,
981 params: &[&dyn crate::params::ToSqlParam],
982 ) -> Result<Rowset<'_>> {
983 // Implementation note: routes through the extended query protocol
984 // via Parse/Bind/Execute so parameters travel in HyperBinary
985 // format — no SQL escaping, full SQL-injection safety regardless of
986 // parameter content. The statement handle is stashed inside the
987 // returned Rowset so its Drop-time close_statement fires *after*
988 // the rowset releases its connection lock (otherwise the close
989 // would deadlock on the still-held mutex).
990 let client = match &self.transport {
991 Transport::Tcp(tcp) => &tcp.client,
992 Transport::Grpc(_) => {
993 return Err(Error::feature_not_supported(
994 "prepared statements are not supported over gRPC transport",
995 ));
996 }
997 };
998 let oids: Vec<crate::Oid> = params.iter().map(|p| p.sql_oid()).collect();
999 let stmt = client.prepare_typed(query, &oids)?;
1000 let encoded: Vec<Option<Vec<u8>>> = params.iter().map(|p| p.encode_param()).collect();
1001 let stream =
1002 client.execute_streaming(&stmt, encoded, crate::result::DEFAULT_BINARY_CHUNK_SIZE)?;
1003 Ok(Rowset::from_prepared(stream).with_statement_guard(stmt))
1004 }
1005
1006 /// Executes a parameterized command that doesn't return rows.
1007 ///
1008 /// Use this for INSERT, UPDATE, DELETE, or DDL statements with parameters.
1009 /// Returns the number of affected rows.
1010 ///
1011 /// See [`query_params`](Self::query_params) for details on parameter
1012 /// handling and SQL injection prevention.
1013 ///
1014 /// # Example
1015 ///
1016 /// ```no_run
1017 /// use hyperdb_api::{Connection, CreateMode, Result};
1018 ///
1019 /// fn delete_user(conn: &Connection, user_id: i32) -> Result<u64> {
1020 /// // Safe from SQL injection
1021 /// conn.command_params("DELETE FROM users WHERE id = $1", &[&user_id])
1022 /// }
1023 /// ```
1024 ///
1025 /// # Errors
1026 ///
1027 /// - Returns [`Error::FeatureNotSupported`] if the connection is using gRPC transport.
1028 /// - Returns [`Error::Server`] if the server rejects the statement at
1029 /// `Parse`, `Bind`, or `Execute` time.
1030 /// - Returns [`Error::Io`] on transport-level I/O failures.
1031 pub fn command_params(
1032 &self,
1033 query: &str,
1034 params: &[&dyn crate::params::ToSqlParam],
1035 ) -> Result<u64> {
1036 // One-shot prepare+execute with explicit OIDs — see `query_params`
1037 // for why we collect OIDs from each parameter.
1038 let client = match &self.transport {
1039 Transport::Tcp(tcp) => &tcp.client,
1040 Transport::Grpc(_) => {
1041 return Err(Error::feature_not_supported(
1042 "prepared statements are not supported over gRPC transport",
1043 ));
1044 }
1045 };
1046 let oids: Vec<crate::Oid> = params.iter().map(|p| p.sql_oid()).collect();
1047 let stmt = client.prepare_typed(query, &oids)?;
1048 let encoded: Vec<Option<Vec<u8>>> = params.iter().map(|p| p.encode_param()).collect();
1049 Ok(client.execute_no_result(&stmt, encoded)?)
1050 }
1051
1052 /// Executes multiple SQL statements in a single call.
1053 ///
1054 /// Each statement is executed sequentially. If any statement fails,
1055 /// execution stops and the error is returned. Returns the total number
1056 /// of affected rows across all statements.
1057 ///
1058 /// This is more efficient than calling `execute_command` in a loop
1059 /// because it reduces round-trips for DDL scripts and multi-statement setup.
1060 ///
1061 /// # Example
1062 ///
1063 /// ```no_run
1064 /// use hyperdb_api::{Connection, CreateMode, Result};
1065 ///
1066 /// fn main() -> Result<()> {
1067 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
1068 /// let total = conn.execute_batch(&[
1069 /// "CREATE TABLE users (id INT, name TEXT)",
1070 /// "INSERT INTO users VALUES (1, 'Alice')",
1071 /// "INSERT INTO users VALUES (2, 'Bob')",
1072 /// ])?;
1073 /// println!("Total affected: {}", total);
1074 /// Ok(())
1075 /// }
1076 /// ```
1077 ///
1078 /// # Errors
1079 ///
1080 /// Returns a wrapped [`Error::Internal`] on the first statement that fails;
1081 /// its `source` is the original [`Error::Server`] from
1082 /// [`execute_command`](Self::execute_command). The error message
1083 /// includes the failing statement's ordinal and an 80-character preview
1084 /// of its SQL.
1085 pub fn execute_batch(&self, statements: &[&str]) -> Result<u64> {
1086 let mut total = 0u64;
1087 for (i, stmt) in statements.iter().enumerate() {
1088 if !stmt.trim().is_empty() {
1089 total += self.execute_command(stmt).map_err(|e| {
1090 let preview: String = stmt.chars().take(80).collect();
1091 Error::internal(format!(
1092 "execute_batch failed at statement {} of {}: {}: {}",
1093 i + 1,
1094 statements.len(),
1095 preview,
1096 e,
1097 ))
1098 })?;
1099 }
1100 }
1101 Ok(total)
1102 }
1103
1104 /// Returns the attached database path, if any.
1105 pub fn database(&self) -> Option<&str> {
1106 self.database.as_deref()
1107 }
1108
1109 /// Creates a new database file.
1110 ///
1111 /// # Example
1112 ///
1113 /// ```no_run
1114 /// use hyperdb_api::{Connection, Result};
1115 ///
1116 /// fn main() -> Result<()> {
1117 /// let conn = Connection::without_database("localhost:7483")?;
1118 /// conn.create_database("new_database.hyper")?;
1119 /// Ok(())
1120 /// }
1121 /// ```
1122 ///
1123 /// # Errors
1124 ///
1125 /// Returns [`Error::Server`] if the server rejects the
1126 /// `CREATE DATABASE IF NOT EXISTS` statement (e.g. the path is not
1127 /// writable on the server).
1128 pub fn create_database(&self, path: &str) -> Result<()> {
1129 let sql = format!("CREATE DATABASE IF NOT EXISTS {}", escape_sql_path(path));
1130 self.execute_command(&sql)?;
1131 Ok(())
1132 }
1133
1134 /// Drops (deletes) a database file.
1135 ///
1136 /// # Example
1137 ///
1138 /// ```no_run
1139 /// use hyperdb_api::{Connection, Result};
1140 ///
1141 /// fn main() -> Result<()> {
1142 /// let conn = Connection::without_database("localhost:7483")?;
1143 /// conn.drop_database("old_database.hyper")?;
1144 /// Ok(())
1145 /// }
1146 /// ```
1147 ///
1148 /// # Errors
1149 ///
1150 /// Returns [`Error::Server`] if the server rejects the
1151 /// `DROP DATABASE IF EXISTS` statement (e.g. the database is still
1152 /// attached or permissions deny deletion).
1153 pub fn drop_database(&self, path: &str) -> Result<()> {
1154 let sql = format!("DROP DATABASE IF EXISTS {}", escape_sql_path(path));
1155 self.execute_command(&sql)?;
1156 Ok(())
1157 }
1158
1159 /// Attaches a database file to the connection.
1160 ///
1161 /// Once attached, the database can be queried and modified.
1162 /// The database is identified by its alias (or by its path if no alias is provided).
1163 ///
1164 /// # Arguments
1165 ///
1166 /// * `path` - The path to the database file to attach.
1167 /// * `alias` - Optional alias for the database. If `None`, the database is
1168 /// attached without an explicit alias (typically using its filename).
1169 ///
1170 /// # Errors
1171 ///
1172 /// Returns an error if the database file doesn't exist or if attachment fails.
1173 ///
1174 /// # Example
1175 ///
1176 /// ```no_run
1177 /// use hyperdb_api::{Connection, Result};
1178 ///
1179 /// fn main() -> Result<()> {
1180 /// let conn = Connection::without_database("localhost:7483")?;
1181 ///
1182 /// // Attach with an alias
1183 /// conn.attach_database("data.hyper", Some("mydata"))?;
1184 ///
1185 /// // Attach without an alias
1186 /// conn.attach_database("other.hyper", None)?;
1187 /// Ok(())
1188 /// }
1189 /// ```
1190 pub fn attach_database(&self, path: &str, alias: Option<&str>) -> Result<()> {
1191 let sql = if let Some(alias) = alias {
1192 format!(
1193 "ATTACH DATABASE {} AS {}",
1194 escape_sql_path(path),
1195 escape_sql_path(alias)
1196 )
1197 } else {
1198 format!("ATTACH DATABASE {}", escape_sql_path(path))
1199 };
1200 self.execute_command(&sql)?;
1201 Ok(())
1202 }
1203
1204 /// Detaches a database from this connection.
1205 ///
1206 /// After detaching, the database file is released and can be accessed
1207 /// externally (e.g., copied, moved, etc.). All pending updates are
1208 /// written to disk before detaching.
1209 ///
1210 /// # Arguments
1211 ///
1212 /// * `alias` - The alias of the database to detach.
1213 ///
1214 /// # Errors
1215 ///
1216 /// Returns an error if the database is not attached or if detachment fails.
1217 ///
1218 /// # Example
1219 ///
1220 /// ```no_run
1221 /// use hyperdb_api::{Connection, Result};
1222 ///
1223 /// fn main() -> Result<()> {
1224 /// let conn = Connection::without_database("localhost:7483")?;
1225 /// conn.attach_database("data.hyper", Some("mydata"))?;
1226 /// // ... work with the database ...
1227 /// conn.detach_database("mydata")?;
1228 /// Ok(())
1229 /// }
1230 /// ```
1231 pub fn detach_database(&self, alias: &str) -> Result<()> {
1232 let sql = format!("DETACH DATABASE {}", escape_sql_path(alias));
1233 self.execute_command(&sql)?;
1234 Ok(())
1235 }
1236
1237 /// Detaches all databases from this connection.
1238 ///
1239 /// This is useful for cleanup before closing a connection or when
1240 /// you need to release all database files.
1241 ///
1242 /// # Errors
1243 ///
1244 /// Returns [`Error::Server`] if the server rejects the
1245 /// `DETACH ALL DATABASES` statement (e.g. a database is still in use by
1246 /// another session).
1247 pub fn detach_all_databases(&self) -> Result<()> {
1248 self.execute_command("DETACH ALL DATABASES")?;
1249 Ok(())
1250 }
1251
1252 /// Creates a schema in the database.
1253 ///
1254 /// # Errors
1255 ///
1256 /// - Returns an error if `schema_name` cannot be converted into a
1257 /// [`SchemaName`](crate::SchemaName) (invalid identifier).
1258 /// - Returns [`Error::Server`] if the server rejects the
1259 /// `CREATE SCHEMA` statement (e.g. the schema already exists).
1260 pub fn create_schema<T>(&self, schema_name: T) -> Result<()>
1261 where
1262 T: TryInto<crate::SchemaName>,
1263 crate::Error: From<T::Error>,
1264 {
1265 crate::catalog::Catalog::new(self).create_schema(schema_name)
1266 }
1267
1268 /// Checks whether a schema exists.
1269 ///
1270 /// # Arguments
1271 ///
1272 /// * `schema` - The schema name (can include database qualifier).
1273 ///
1274 /// # Example
1275 ///
1276 /// ```no_run
1277 /// use hyperdb_api::{Connection, Result};
1278 ///
1279 /// fn main() -> Result<()> {
1280 /// let conn = Connection::without_database("localhost:7483")?;
1281 /// if conn.has_schema("public")? {
1282 /// println!("Schema 'public' exists");
1283 /// }
1284 /// Ok(())
1285 /// }
1286 /// ```
1287 ///
1288 /// # Errors
1289 ///
1290 /// - Returns an error if `schema` cannot be converted into a
1291 /// [`SchemaName`](crate::SchemaName).
1292 /// - Returns [`Error::Server`] if the catalog lookup query fails.
1293 pub fn has_schema<T>(&self, schema: T) -> Result<bool>
1294 where
1295 T: TryInto<crate::SchemaName>,
1296 crate::Error: From<T::Error>,
1297 {
1298 use crate::catalog::Catalog;
1299 Catalog::new(self).has_schema(schema)
1300 }
1301
1302 /// Checks whether a table exists.
1303 ///
1304 /// # Arguments
1305 ///
1306 /// * `table_name` - The table name (can include database and schema qualifiers).
1307 ///
1308 /// # Example
1309 ///
1310 /// ```no_run
1311 /// use hyperdb_api::{Connection, Result};
1312 ///
1313 /// fn main() -> Result<()> {
1314 /// let conn = Connection::without_database("localhost:7483")?;
1315 /// if conn.has_table("public.users")? {
1316 /// println!("Table 'users' exists");
1317 /// }
1318 /// Ok(())
1319 /// }
1320 /// ```
1321 ///
1322 /// # Errors
1323 ///
1324 /// - Returns an error if `table_name` cannot be converted into a
1325 /// [`TableName`](crate::TableName).
1326 /// - Returns [`Error::Server`] if the catalog lookup query fails.
1327 pub fn has_table<T>(&self, table_name: T) -> Result<bool>
1328 where
1329 T: TryInto<crate::TableName>,
1330 crate::Error: From<T::Error>,
1331 {
1332 use crate::catalog::Catalog;
1333 Catalog::new(self).has_table(table_name)
1334 }
1335
1336 /// Returns the server version as a parsed struct.
1337 ///
1338 /// Returns `None` if the version cannot be determined (e.g., gRPC connection).
1339 ///
1340 /// # Example
1341 ///
1342 /// ```no_run
1343 /// use hyperdb_api::{Connection, CreateMode, HyperProcess, ServerVersion, Result};
1344 ///
1345 /// fn main() -> Result<()> {
1346 /// let hyper = HyperProcess::new(None, None)?;
1347 /// let conn = Connection::new(&hyper, "test.hyper", CreateMode::CreateIfNotExists)?;
1348 /// if let Some(version) = conn.server_version() {
1349 /// println!("Hyper {}", version);
1350 /// if version >= ServerVersion::new(0, 1, 0) {
1351 /// println!("Has feature X");
1352 /// }
1353 /// }
1354 /// Ok(())
1355 /// }
1356 /// ```
1357 pub fn server_version(&self) -> Option<crate::ServerVersion> {
1358 let version_str = self.parameter_status("server_version")?;
1359 crate::ServerVersion::parse(&version_str)
1360 }
1361
1362 /// Copies a database file to a new path.
1363 ///
1364 /// The source database must be attached to this connection.
1365 ///
1366 /// # Example
1367 ///
1368 /// ```no_run
1369 /// use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
1370 ///
1371 /// fn main() -> Result<()> {
1372 /// let hyper = HyperProcess::new(None, None)?;
1373 /// let conn = Connection::new(&hyper, "source.hyper", CreateMode::DoNotCreate)?;
1374 /// conn.copy_database("source.hyper", "backup.hyper")?;
1375 /// Ok(())
1376 /// }
1377 /// ```
1378 ///
1379 /// # Errors
1380 ///
1381 /// Returns [`Error::Server`] if the server rejects the
1382 /// `COPY DATABASE` statement — e.g. the source is not attached, the
1383 /// destination path is not writable, or it already exists.
1384 pub fn copy_database(&self, source: &str, destination: &str) -> Result<()> {
1385 let sql = format!(
1386 "COPY DATABASE {} TO {}",
1387 escape_sql_path(source),
1388 escape_sql_path(destination)
1389 );
1390 self.execute_command(&sql)?;
1391 Ok(())
1392 }
1393
1394 /// Executes EXPLAIN on a query and returns the plan as a string.
1395 ///
1396 /// # Example
1397 ///
1398 /// ```no_run
1399 /// use hyperdb_api::{Connection, CreateMode, Result};
1400 ///
1401 /// fn main() -> Result<()> {
1402 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
1403 /// let plan = conn.explain("SELECT * FROM users WHERE id = 1")?;
1404 /// println!("{}", plan);
1405 /// Ok(())
1406 /// }
1407 /// ```
1408 ///
1409 /// # Errors
1410 ///
1411 /// Returns [`Error::Server`] if `EXPLAIN <query>` fails to parse or
1412 /// plan, or if the streamed result cannot be consumed.
1413 pub fn explain(&self, query: &str) -> Result<String> {
1414 let explain_sql = format!("EXPLAIN {query}");
1415 let result = self.execute_query(&explain_sql)?;
1416 let mut lines = Vec::new();
1417 for row in result.rows() {
1418 let row = row?;
1419 if let Some(line) = row.get::<String>(0) {
1420 lines.push(line);
1421 }
1422 }
1423 Ok(lines.join("\n"))
1424 }
1425
1426 /// Executes EXPLAIN ANALYZE on a query and returns the plan with timing info.
1427 ///
1428 /// **Note:** This actually executes the query to collect timing information.
1429 ///
1430 /// # Errors
1431 ///
1432 /// Returns [`Error::Server`] if `EXPLAIN ANALYZE <query>` fails — this
1433 /// includes any runtime error raised by actually executing `query`.
1434 pub fn explain_analyze(&self, query: &str) -> Result<String> {
1435 let explain_sql = format!("EXPLAIN ANALYZE {query}");
1436 let result = self.execute_query(&explain_sql)?;
1437 let mut lines = Vec::new();
1438 for row in result.rows() {
1439 let row = row?;
1440 if let Some(line) = row.get::<String>(0) {
1441 lines.push(line);
1442 }
1443 }
1444 Ok(lines.join("\n"))
1445 }
1446
1447 /// Returns a reference to the underlying TCP client.
1448 ///
1449 /// # Panics
1450 ///
1451 /// This method returns `None` if the connection is using gRPC transport.
1452 pub fn tcp_client(&self) -> Option<&Client> {
1453 match &self.transport {
1454 Transport::Tcp(tcp) => Some(&tcp.client),
1455 Transport::Grpc(_) => None,
1456 }
1457 }
1458
1459 /// Crate-internal accessor for the transport. Used by
1460 /// [`PreparedStatement`](crate::PreparedStatement) to reach the
1461 /// underlying `hyperdb_api_core::client::Client`.
1462 pub(crate) fn transport(&self) -> &Transport {
1463 &self.transport
1464 }
1465
1466 /// Prepares a SQL statement with automatic parameter type inference.
1467 ///
1468 /// The returned [`PreparedStatement`](crate::PreparedStatement) can
1469 /// be executed many times with different parameter values; the
1470 /// server caches the parsed plan. This is the preferred way to
1471 /// execute a statement repeatedly inside a loop.
1472 ///
1473 /// For explicit parameter types (necessary when `$N` placeholders
1474 /// would otherwise be ambiguous), use
1475 /// [`prepare_typed`](Self::prepare_typed).
1476 ///
1477 /// # Example
1478 ///
1479 /// ```no_run
1480 /// # use hyperdb_api::{Connection, CreateMode, Result};
1481 /// # fn example(conn: &Connection) -> Result<()> {
1482 /// let stmt = conn.prepare("SELECT name FROM users WHERE id = $1")?;
1483 /// for id in [1_i32, 2, 3] {
1484 /// let name: String = stmt.fetch_scalar(&[&id])?;
1485 /// println!("{id}: {name}");
1486 /// }
1487 /// # Ok(())
1488 /// # }
1489 /// ```
1490 ///
1491 /// # Errors
1492 ///
1493 /// See [`prepare_typed`](Self::prepare_typed) — this method delegates
1494 /// to it with an empty OID list.
1495 pub fn prepare(&self, query: &str) -> Result<crate::PreparedStatement<'_>> {
1496 self.prepare_typed(query, &[])
1497 }
1498
1499 /// Prepares a SQL statement with explicit parameter type OIDs.
1500 ///
1501 /// Use this when the server cannot infer parameter types from the
1502 /// SQL alone (e.g. a bare `$1` in a `WHERE v > $1` clause with no
1503 /// other context). Constants for common types live in
1504 /// [`hyperdb_api_core::types::oids`].
1505 ///
1506 /// # Errors
1507 ///
1508 /// - Returns [`Error::FeatureNotSupported`] if the connection is using gRPC transport
1509 /// (prepared statements are TCP-only).
1510 /// - Returns [`Error::Server`] if the server rejects the `Parse`
1511 /// message, e.g. SQL syntax error or unknown OID.
1512 /// - Returns [`Error::Io`] on transport-level I/O failures.
1513 pub fn prepare_typed(
1514 &self,
1515 query: &str,
1516 param_types: &[crate::Oid],
1517 ) -> Result<crate::PreparedStatement<'_>> {
1518 let client = match &self.transport {
1519 Transport::Tcp(tcp) => &tcp.client,
1520 Transport::Grpc(_) => {
1521 return Err(Error::feature_not_supported(
1522 "prepared statements are not supported over gRPC transport",
1523 ));
1524 }
1525 };
1526 let inner = client.prepare_typed(query, param_types)?;
1527 crate::PreparedStatement::new(self, inner)
1528 }
1529
1530 /// Returns true if the connection is alive (passive check).
1531 ///
1532 /// This is a lightweight check that does not send any data to the server.
1533 /// For an active health check, use [`ping`](Self::ping).
1534 pub fn is_alive(&self) -> bool {
1535 match &self.transport {
1536 Transport::Tcp(tcp) => tcp.client.is_alive(),
1537 Transport::Grpc(_) => true, // gRPC connections are stateless
1538 }
1539 }
1540
1541 /// Actively checks that the connection is healthy by executing a trivial query.
1542 ///
1543 /// Unlike [`is_alive`](Self::is_alive) which only checks local state,
1544 /// this method sends `SELECT 1` to the server and verifies a response.
1545 ///
1546 /// # Example
1547 ///
1548 /// ```no_run
1549 /// # use hyperdb_api::{Connection, CreateMode, Result};
1550 /// # fn example(conn: &Connection) -> Result<()> {
1551 /// if conn.ping().is_ok() {
1552 /// println!("Connection is healthy");
1553 /// }
1554 /// # Ok(())
1555 /// # }
1556 /// ```
1557 ///
1558 /// # Errors
1559 ///
1560 /// Returns [`Error::Server`] or [`Error::Io`] if the `SELECT 1`
1561 /// round-trip fails — i.e. the connection is no longer usable.
1562 pub fn ping(&self) -> Result<()> {
1563 self.execute_command("SELECT 1")?;
1564 Ok(())
1565 }
1566
1567 /// Returns the process ID of the backend server connection.
1568 ///
1569 /// Returns 0 for gRPC connections (not applicable).
1570 pub fn process_id(&self) -> i32 {
1571 match &self.transport {
1572 Transport::Tcp(tcp) => tcp.client.process_id(),
1573 Transport::Grpc(_) => 0,
1574 }
1575 }
1576
1577 /// Returns the secret key for the backend server connection.
1578 ///
1579 /// This is used for cancellation requests.
1580 /// Returns 0 for gRPC connections (not applicable).
1581 pub fn secret_key(&self) -> i32 {
1582 match &self.transport {
1583 Transport::Tcp(tcp) => tcp.client.secret_key(),
1584 Transport::Grpc(_) => 0,
1585 }
1586 }
1587
1588 /// Returns a server parameter value by name.
1589 ///
1590 /// Server parameters are sent by the server during connection startup.
1591 /// Common parameters include:
1592 /// - `server_version` - The server version string
1593 /// - `server_encoding` - The server's character encoding
1594 /// - `client_encoding` - The client's character encoding
1595 /// - `DateStyle` - Date display format
1596 /// - `TimeZone` - Server timezone
1597 /// - `session_identifier` - Session ID for connection migration (if routing enabled)
1598 ///
1599 /// Returns `None` if the parameter is not known.
1600 ///
1601 /// # Example
1602 ///
1603 /// ```no_run
1604 /// use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
1605 ///
1606 /// fn main() -> Result<()> {
1607 /// let hyper = HyperProcess::new(None, None)?;
1608 /// let conn = Connection::new(&hyper, "test.hyper", CreateMode::CreateIfNotExists)?;
1609 ///
1610 /// if let Some(version) = conn.parameter_status("server_version") {
1611 /// println!("Connected to Hyper version: {}", version);
1612 /// }
1613 /// Ok(())
1614 /// }
1615 /// ```
1616 pub fn parameter_status(&self, name: &str) -> Option<String> {
1617 match &self.transport {
1618 Transport::Tcp(tcp) => tcp.client.parameter_status(name),
1619 Transport::Grpc(_) => None, // gRPC doesn't have server parameters
1620 }
1621 }
1622
1623 /// Sets the notice receiver for this connection.
1624 ///
1625 /// Server notices and warnings are passed to this callback instead of being
1626 /// logged. Pass `None` to restore default logging behavior.
1627 pub fn set_notice_receiver(
1628 &mut self,
1629 receiver: Option<hyperdb_api_core::client::NoticeReceiver>,
1630 ) {
1631 match &mut self.transport {
1632 Transport::Tcp(tcp) => tcp.client.set_notice_receiver(receiver),
1633 Transport::Grpc(_) => {} // gRPC doesn't support notice receivers
1634 }
1635 }
1636
1637 /// Cancels the currently executing query (thread-safe).
1638 ///
1639 /// # Errors
1640 ///
1641 /// - Returns [`Error::FeatureNotSupported`] on gRPC connections — cancellation is not
1642 /// yet implemented for gRPC transport.
1643 /// - Returns [`Error::Connection`] or [`Error::Io`] if the separate
1644 /// cancel-request connection to the server fails.
1645 pub fn cancel(&self) -> Result<()> {
1646 match &self.transport {
1647 Transport::Tcp(tcp) => tcp.client.cancel().map_err(Error::from),
1648 Transport::Grpc(_) => Err(Error::feature_not_supported(
1649 "Query cancellation is not yet supported for gRPC connections.",
1650 )),
1651 }
1652 }
1653
1654 /// Closes the connection, detaching all databases first.
1655 ///
1656 /// # Errors
1657 ///
1658 /// - Returns [`Error::Internal`] wrapping the underlying close failure
1659 /// (its `source` is the transport error) if the client cannot be
1660 /// shut down cleanly.
1661 /// - Returns [`Error::Internal`] wrapping the detach failure if the
1662 /// attached database could not be detached but close itself
1663 /// succeeded.
1664 pub fn close(self) -> Result<()> {
1665 // Detach the attached database to ensure files are flushed and released.
1666 // Always attempt close, even if detach fails.
1667 let detach_err = if let Some(ref db_path) = self.database {
1668 let db_alias = std::path::Path::new(db_path)
1669 .file_stem()
1670 .and_then(|s| s.to_str())
1671 .unwrap_or("db");
1672 self.execute_command(&format!("DETACH DATABASE {}", escape_sql_path(db_alias)))
1673 .err()
1674 } else {
1675 None
1676 };
1677
1678 // Always attempt to close the client to release the connection.
1679 let close_result = match self.transport {
1680 Transport::Tcp(tcp) => tcp.client.close(),
1681 Transport::Grpc(_) => Ok(()), // gRPC connections are stateless
1682 };
1683
1684 if let Err(e) = close_result {
1685 return Err(Error::internal(format!("Failed to close connection: {e}")));
1686 }
1687
1688 if let Some(e) = detach_err {
1689 // Detach failed but close succeeded; surface the detach error.
1690 return Err(Error::internal(format!(
1691 "Failed to detach database during close: {e}"
1692 )));
1693 }
1694
1695 Ok(())
1696 }
1697
1698 /// Unloads the database from memory while keeping the connection active.
1699 ///
1700 /// This executes the `UNLOAD DATABASE` command, which releases the database
1701 /// from memory but keeps the session and connection open. The database can
1702 /// be accessed again by subsequent queries that will automatically reload it.
1703 ///
1704 /// This is useful for releasing memory locks when switching between databases
1705 /// or when working with multiple database files.
1706 ///
1707 /// # Example
1708 ///
1709 /// ```no_run
1710 /// use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
1711 ///
1712 /// fn main() -> Result<()> {
1713 /// let hyper = HyperProcess::new(None, None)?;
1714 /// let conn = Connection::new(&hyper, "test.hyper", CreateMode::Create)?;
1715 ///
1716 /// // Do some work with the database
1717 /// conn.execute_command("CREATE TABLE test (id INT)")?;
1718 ///
1719 /// // Unload from memory (but keep connection)
1720 /// conn.unload_database()?;
1721 ///
1722 /// // Database can still be accessed (will be reloaded automatically)
1723 /// let count: i64 = conn.fetch_scalar("SELECT COUNT(*) FROM test")?;
1724 /// println!("Count: {}", count);
1725 ///
1726 /// Ok(())
1727 /// }
1728 /// ```
1729 ///
1730 /// # Errors
1731 ///
1732 /// Returns [`Error::Server`] if the server rejects the `UNLOAD DATABASE`
1733 /// command (e.g. the database is still in use by another session).
1734 pub fn unload_database(&self) -> Result<()> {
1735 self.execute_command("UNLOAD DATABASE")?;
1736 Ok(())
1737 }
1738
1739 /// Releases the database completely from the session.
1740 ///
1741 /// This executes the `UNLOAD RELEASE` command, which completely releases
1742 /// the database from the session. After this call, the database cannot
1743 /// be accessed until a new connection is established.
1744 ///
1745 /// This is useful for completely freeing database resources when you're
1746 /// done with a database and want to ensure no locks are held.
1747 ///
1748 /// **Note:** This should only be used when the session has exactly one
1749 /// database attached. Hyper does not support `UNLOAD RELEASE` with
1750 /// multiple databases attached to the same session.
1751 ///
1752 /// # Example
1753 ///
1754 /// ```no_run
1755 /// use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
1756 ///
1757 /// fn main() -> Result<()> {
1758 /// let hyper = HyperProcess::new(None, None)?;
1759 /// let conn = Connection::new(&hyper, "test.hyper", CreateMode::Create)?;
1760 ///
1761 /// // Do some work with the database
1762 /// conn.execute_command("CREATE TABLE test (id INT)")?;
1763 ///
1764 /// // Release database completely from session
1765 /// conn.unload_release()?;
1766 ///
1767 /// // Database cannot be accessed after this point without new connection
1768 /// // conn.execute_command("SELECT * FROM test")?; // This would fail
1769 ///
1770 /// Ok(())
1771 /// }
1772 /// ```
1773 ///
1774 /// # Errors
1775 ///
1776 /// Returns [`Error::Server`] if the server rejects `UNLOAD RELEASE`, most
1777 /// commonly because multiple databases are attached to the same session
1778 /// (Hyper only supports `UNLOAD RELEASE` with exactly one attached DB).
1779 pub fn unload_release(&self) -> Result<()> {
1780 self.execute_command("UNLOAD RELEASE")?;
1781 Ok(())
1782 }
1783
1784 // =========================================================================
1785 // Query Statistics
1786 // =========================================================================
1787
1788 /// Enables query statistics collection for this connection.
1789 ///
1790 /// After enabling, each `execute_command()` or `execute_query()` call will
1791 /// capture detailed performance metrics from Hyper. Retrieve them via
1792 /// [`last_query_stats()`](Self::last_query_stats).
1793 ///
1794 /// The provider determines how stats are collected. Use
1795 /// [`LogFileStatsProvider`](crate::LogFileStatsProvider) to parse Hyper's log file (requires local
1796 /// `hyperd.log`), or implement a custom [`QueryStatsProvider`](crate::QueryStatsProvider).
1797 ///
1798 /// # Example
1799 ///
1800 /// ```no_run
1801 /// # use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
1802 /// # fn main() -> Result<()> {
1803 /// # let hyper = HyperProcess::new(None, None)?;
1804 /// # let mut conn = Connection::new(&hyper, "test.hyper", CreateMode::CreateIfNotExists)?;
1805 /// use hyperdb_api::LogFileStatsProvider;
1806 ///
1807 /// // Auto-detect log path from HyperProcess
1808 /// conn.enable_query_stats(LogFileStatsProvider::from_process(&hyper));
1809 ///
1810 /// // Or specify an explicit log path
1811 /// // conn.enable_query_stats(LogFileStatsProvider::new("/path/to/hyperd.log"));
1812 /// # Ok(())
1813 /// # }
1814 /// ```
1815 pub fn enable_query_stats(&mut self, provider: impl QueryStatsProvider + 'static) {
1816 self.stats_provider = Some(Arc::new(provider));
1817 }
1818
1819 /// Disables query statistics collection.
1820 ///
1821 /// After calling this, `last_query_stats()` will return `None`.
1822 pub fn disable_query_stats(&mut self) {
1823 self.stats_provider = None;
1824 if let Ok(mut guard) = self.pending_stats.lock() {
1825 *guard = None;
1826 }
1827 }
1828
1829 /// Returns the query statistics from the most recent query execution.
1830 ///
1831 /// Stats are resolved **lazily** — the log file is read when this method
1832 /// is called, not when the query executes. This is important for streaming
1833 /// queries (`execute_query`), where Hyper writes the execution stats only
1834 /// after the result set is fully consumed.
1835 ///
1836 /// **Call this after consuming the result set** (e.g., after `collect_rows()`,
1837 /// iterating all chunks, or dropping the `Rowset`).
1838 ///
1839 /// Returns `None` if:
1840 /// - Query stats collection is not enabled
1841 /// - No query has been executed yet
1842 /// - Stats could not be found for the last query (e.g., log entry not matched)
1843 ///
1844 /// # Example
1845 ///
1846 /// ```no_run
1847 /// # use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
1848 /// # fn main() -> Result<()> {
1849 /// # let hyper = HyperProcess::new(None, None)?;
1850 /// # let mut conn = Connection::new(&hyper, "test.hyper", CreateMode::CreateIfNotExists)?;
1851 /// # use hyperdb_api::LogFileStatsProvider;
1852 /// # conn.enable_query_stats(LogFileStatsProvider::from_process(&hyper));
1853 /// conn.execute_command("CREATE TABLE t (id INT)")?;
1854 ///
1855 /// if let Some(stats) = conn.last_query_stats() {
1856 /// println!("Total: {}s", stats.elapsed_s);
1857 /// if let Some(ref pre) = stats.pre_execution {
1858 /// println!(" Parse: {:?}s", pre.parsing_time_s);
1859 /// println!(" Compile: {:?}s", pre.compilation_time_s);
1860 /// }
1861 /// if let Some(ref exec) = stats.execution {
1862 /// println!(" Execute: {:?}s", exec.elapsed_s);
1863 /// println!(" Peak mem: {:?} MB", exec.peak_memory_mb);
1864 /// }
1865 /// }
1866 /// # Ok(())
1867 /// # }
1868 /// ```
1869 pub fn last_query_stats(&self) -> Option<QueryStats> {
1870 let provider = self.stats_provider.as_ref()?;
1871 let mut guard = self.pending_stats.lock().ok()?;
1872 let (token, sql) = guard.take()?;
1873 provider.after_query(token, &sql)
1874 }
1875
1876 /// Internal: call provider's `before_query` if stats are enabled.
1877 fn stats_before_query(&self, sql: &str) -> Option<Box<dyn Any + Send>> {
1878 self.stats_provider.as_ref().map(|p| p.before_query(sql))
1879 }
1880
1881 /// Internal: store the pending token+sql for lazy resolution.
1882 fn stats_store_pending(&self, token: Option<Box<dyn Any + Send>>, sql: &str) {
1883 if let Some(token) = token {
1884 if let Ok(mut guard) = self.pending_stats.lock() {
1885 *guard = Some((token, sql.to_string()));
1886 }
1887 }
1888 }
1889}
1890
1891impl Connection {
1892 // =========================================================================
1893 // Transaction Control
1894 // =========================================================================
1895
1896 // -------------------------------------------------------------------
1897 // Raw transaction control (internal)
1898 // -------------------------------------------------------------------
1899 //
1900 // The `*_raw` methods below are `pub(crate)` and form the canonical
1901 // implementation of session-level transaction control. The RAII
1902 // guard at `crate::Transaction` and any internal helper that
1903 // genuinely needs `&self` (rather than the guard's `&mut self`)
1904 // delegate to these.
1905 //
1906 // The matching `pub` methods (`begin_transaction`, `commit`,
1907 // `rollback`) are thin `#[doc(hidden)] #[deprecated]` wrappers
1908 // retained only so any pre-existing downstream caller sees a
1909 // compiler warning rather than a hard break. They will be deleted
1910 // in a future release; the `_raw` methods stay.
1911
1912 /// Issues `BEGIN TRANSACTION`. Crate-internal use only.
1913 pub(crate) fn begin_transaction_raw(&self) -> Result<()> {
1914 self.execute_command("BEGIN TRANSACTION")?;
1915 Ok(())
1916 }
1917
1918 /// Issues `COMMIT`. Crate-internal use only.
1919 pub(crate) fn commit_raw(&self) -> Result<()> {
1920 self.execute_command("COMMIT")?;
1921 Ok(())
1922 }
1923
1924 /// Issues `ROLLBACK`. Crate-internal use only.
1925 pub(crate) fn rollback_raw(&self) -> Result<()> {
1926 self.execute_command("ROLLBACK")?;
1927 Ok(())
1928 }
1929
1930 /// Begins an explicit transaction.
1931 ///
1932 /// **Prefer [`transaction()`](Self::transaction)** — the RAII guard
1933 /// auto-rolls back on drop and cannot leak a half-open transaction
1934 /// across error paths. This method is hidden from generated
1935 /// rustdoc and marked deprecated; it will be removed in a future
1936 /// release.
1937 ///
1938 /// # Errors
1939 ///
1940 /// Returns [`Error::Server`] if the server rejects `BEGIN TRANSACTION`
1941 /// (e.g. a transaction is already open on this session).
1942 #[doc(hidden)]
1943 #[deprecated(
1944 note = "Use `Connection::transaction()` for an RAII guard. This method will be removed \
1945 in a future release."
1946 )]
1947 pub fn begin_transaction(&self) -> Result<()> {
1948 self.begin_transaction_raw()
1949 }
1950
1951 /// Commits the current transaction.
1952 ///
1953 /// **Prefer [`Transaction::commit`](crate::Transaction::commit)** on
1954 /// the RAII guard returned by [`transaction()`](Self::transaction).
1955 /// Hidden from generated rustdoc and deprecated; slated for removal.
1956 ///
1957 /// # Errors
1958 ///
1959 /// Returns [`Error::Server`] if the server rejects `COMMIT`.
1960 #[doc(hidden)]
1961 #[deprecated(
1962 note = "Use `Transaction::commit()` on the RAII guard from `Connection::transaction()`. \
1963 This method will be removed in a future release."
1964 )]
1965 pub fn commit(&self) -> Result<()> {
1966 self.commit_raw()
1967 }
1968
1969 /// Rolls back the current transaction.
1970 ///
1971 /// **Prefer [`Transaction::rollback`](crate::Transaction::rollback)**
1972 /// on the RAII guard returned by [`transaction()`](Self::transaction).
1973 /// Hidden from generated rustdoc and deprecated; slated for removal.
1974 ///
1975 /// # Errors
1976 ///
1977 /// Returns [`Error::Server`] if the server rejects `ROLLBACK`.
1978 #[doc(hidden)]
1979 #[deprecated(
1980 note = "Use `Transaction::rollback()` on the RAII guard from `Connection::transaction()`. \
1981 This method will be removed in a future release."
1982 )]
1983 pub fn rollback(&self) -> Result<()> {
1984 self.rollback_raw()
1985 }
1986
1987 /// Starts a transaction and returns an RAII guard that auto-rolls back on drop.
1988 ///
1989 /// The returned [`Transaction`](crate::Transaction) exclusively borrows this connection,
1990 /// preventing any other use of the connection while the transaction is active.
1991 /// This is enforced at compile time by Rust's borrow checker. The guard provides
1992 /// `commit()` and `rollback()` methods. If dropped without calling either, the
1993 /// transaction is automatically rolled back.
1994 ///
1995 /// # Example
1996 ///
1997 /// ```no_run
1998 /// # use hyperdb_api::{Connection, CreateMode, Result};
1999 /// # fn main() -> Result<()> {
2000 /// # let mut conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
2001 /// let txn = conn.transaction()?;
2002 /// txn.execute_command("INSERT INTO users VALUES (1, 'Alice')")?;
2003 /// txn.commit()?; // or drop `txn` to auto-rollback
2004 /// # Ok(())
2005 /// # }
2006 /// ```
2007 ///
2008 /// # Errors
2009 ///
2010 /// Returns [`Error::Server`] if the server rejects the `BEGIN`
2011 /// statement issued internally by
2012 /// [`Transaction::new`](crate::Transaction).
2013 pub fn transaction(&mut self) -> Result<crate::Transaction<'_>> {
2014 crate::Transaction::new(self)
2015 }
2016}
2017
2018/// Checks if an error indicates an "already exists" condition based on SQLSTATE codes.
2019///
2020/// This function uses `PostgreSQL` SQLSTATE codes to reliably detect duplicate object errors
2021/// regardless of server locale or message formatting. The codes checked are:
2022/// - `42P04`: Database already exists
2023/// - `42710`: Duplicate object
2024/// - `42P06`: Duplicate schema
2025/// - `42P07`: Duplicate table
2026///
2027/// See: <https://www.postgresql.org/docs/current/errcodes-appendix.html>
2028fn is_already_exists_error(err: &Error) -> bool {
2029 err.sqlstate()
2030 .is_some_and(|code| matches!(code, "42P04" | "42710" | "42P06" | "42P07"))
2031}