hyperdb_api/connection.rs
1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Database connection management.
5//!
6//! The [`Connection`] type provides a unified interface for connecting to Hyper
7//! databases via either TCP (`PostgreSQL` wire protocol) or gRPC transport.
8//! The transport is automatically detected from the endpoint URL:
9//!
10//! - `https://` or `http://` → gRPC transport
11//! - Otherwise → TCP transport (e.g., `localhost:7483`)
12
13use std::path::Path;
14
15use hyperdb_api_core::client::Client;
16
17use crate::error::{Error, Result};
18use crate::names::escape_sql_path;
19use crate::process::HyperProcess;
20use crate::result::{Row, Rowset, DEFAULT_BINARY_CHUNK_SIZE};
21use crate::transport::Transport;
22
23use std::any::Any;
24use std::sync::{Arc, Mutex};
25
26use crate::query_stats::{QueryStats, QueryStatsProvider};
27
28/// Trait for types that can be extracted from a scalar query result.
29///
30/// This trait enables the generic [`Connection::execute_scalar_query`] method,
31/// similar to C++'s `executeScalarQuery<T>()` template.
32///
33/// # Implementing Custom Types
34///
35/// You can implement this trait for custom types to use them with `execute_scalar_query`:
36///
37/// ```no_run
38/// # use hyperdb_api::{Row, ScalarValue};
39/// # struct MyType;
40/// # impl MyType { fn parse(s: &str) -> Self { MyType } }
41/// impl ScalarValue for MyType {
42/// fn from_row(row: &Row, col: usize) -> Option<Self> {
43/// row.get_string(col).map(|s| MyType::parse(&s))
44/// }
45/// }
46/// ```
47pub trait ScalarValue: Sized {
48 /// Extracts a value of this type from a row at the given column.
49 fn from_row(row: &Row, col: usize) -> Option<Self>;
50}
51
52impl ScalarValue for i64 {
53 fn from_row(row: &Row, col: usize) -> Option<Self> {
54 row.get_i64(col)
55 }
56}
57
58impl ScalarValue for i32 {
59 fn from_row(row: &Row, col: usize) -> Option<Self> {
60 row.get_i32(col)
61 }
62}
63
64impl ScalarValue for i16 {
65 fn from_row(row: &Row, col: usize) -> Option<Self> {
66 row.get_i16(col)
67 }
68}
69
70impl ScalarValue for f64 {
71 fn from_row(row: &Row, col: usize) -> Option<Self> {
72 row.get_f64(col)
73 }
74}
75
76impl ScalarValue for bool {
77 fn from_row(row: &Row, col: usize) -> Option<Self> {
78 row.get_bool(col)
79 }
80}
81
82impl ScalarValue for String {
83 fn from_row(row: &Row, col: usize) -> Option<Self> {
84 row.get_string(col)
85 }
86}
87
88/// Database creation mode when connecting.
89#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
90pub enum CreateMode {
91 /// Do not create the database. Method will fail if database doesn't exist.
92 #[default]
93 DoNotCreate,
94 /// Create the database. Method will fail if the database already exists.
95 Create,
96 /// Create the database if it doesn't exist.
97 CreateIfNotExists,
98 /// Create the database. If it already exists, drop the old one first.
99 CreateAndReplace,
100}
101
102/// A connection to a Hyper database.
103///
104/// This struct represents an active connection to a Hyper server and optionally
105/// an attached database. The connection is automatically closed when dropped.
106///
107/// # Transport Auto-Detection
108///
109/// The transport is automatically detected from the endpoint URL:
110/// - `https://` or `http://` → gRPC transport (read-only until server supports writes)
111/// - Otherwise → TCP transport (full read/write support)
112///
113/// # CSV / Text Import & Export
114///
115/// For CSV, TSV, and other delimited-text formats, see the [`copy`](crate::copy)
116/// module which provides [`export_csv()`](Self::export_csv),
117/// [`import_csv()`](Self::import_csv), and related methods on this struct.
118///
119/// # Example
120///
121/// ```no_run
122/// use hyperdb_api::{Connection, CreateMode, Result};
123///
124/// fn main() -> Result<()> {
125/// // TCP connection (full read/write)
126/// let conn = Connection::connect("localhost:7483", "example.hyper", CreateMode::CreateIfNotExists)?;
127///
128/// // Execute SQL commands
129/// conn.execute_command("CREATE TABLE test (id INT, name TEXT)")?;
130/// conn.execute_command("INSERT INTO test VALUES (1, 'Hello')")?;
131///
132/// Ok(())
133/// }
134/// ```
135///
136/// ```no_run
137/// # use hyperdb_api::{Connection, CreateMode, Result};
138/// # fn example() -> Result<()> {
139/// // gRPC connection (read-only, auto-detected from URL)
140/// let conn = Connection::connect(
141/// "https://hyper-server.example.com:443",
142/// "example.hyper",
143/// CreateMode::DoNotCreate, // Must be DoNotCreate for gRPC
144/// )?;
145/// # Ok(())
146/// # }
147/// ```
148pub struct Connection {
149 transport: Transport,
150 database: Option<String>,
151 stats_provider: Option<Arc<dyn QueryStatsProvider>>,
152 /// Pending stats token + SQL from the most recent query, resolved lazily.
153 pending_stats: Mutex<Option<(Box<dyn Any + Send>, String)>>,
154}
155
156impl std::fmt::Debug for Connection {
157 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158 f.debug_struct("Connection")
159 .field("database", &self.database)
160 .finish_non_exhaustive()
161 }
162}
163
164impl Connection {
165 /// Creates a new connection to a Hyper instance with a database.
166 ///
167 /// This is the primary way to connect to a running [`HyperProcess`].
168 ///
169 /// # Arguments
170 ///
171 /// * `instance` - The Hyper server instance to connect to.
172 /// * `database_path` - Path to the database file.
173 /// * `create_mode` - How to handle database creation.
174 ///
175 /// # Errors
176 ///
177 /// Returns an error if the connection could not be established.
178 ///
179 /// # Example
180 ///
181 /// ```no_run
182 /// use hyperdb_api::{HyperProcess, Connection, CreateMode, Result};
183 ///
184 /// fn main() -> Result<()> {
185 /// let hyper = HyperProcess::new(None, None)?;
186 /// let conn = Connection::new(&hyper, "database.hyper", CreateMode::CreateIfNotExists)?;
187 /// Ok(())
188 /// }
189 /// ```
190 pub fn new(
191 instance: &HyperProcess,
192 database_path: impl AsRef<Path>,
193 create_mode: CreateMode,
194 ) -> Result<Self> {
195 // Prefer using the connection_endpoint which properly handles UDS/Named Pipes
196 if let Some(conn_endpoint) = instance.connection_endpoint() {
197 return Self::connect_with_endpoint(
198 conn_endpoint,
199 &database_path.as_ref().to_string_lossy(),
200 create_mode,
201 );
202 }
203
204 // Fall back to string endpoint (TCP)
205 let endpoint = instance.require_endpoint()?;
206 Self::connect(
207 endpoint,
208 &database_path.as_ref().to_string_lossy(),
209 create_mode,
210 )
211 }
212
213 /// Connects using a `ConnectionEndpoint` (supports TCP, UDS, and Named Pipes).
214 fn connect_with_endpoint(
215 endpoint: &hyperdb_api_core::client::ConnectionEndpoint,
216 database_path: &str,
217 create_mode: CreateMode,
218 ) -> Result<Self> {
219 let db_path_str = Some(database_path.to_string());
220
221 let config = hyperdb_api_core::client::Config::new().with_user("tableau_internal_user");
222
223 let client = hyperdb_api_core::client::Client::connect_endpoint(endpoint, &config)?;
224
225 let conn = Connection::from_client(client, db_path_str.clone());
226
227 // Handle database creation
228 if let Some(db_path) = db_path_str {
229 conn.handle_creation_mode(&db_path, create_mode)?;
230 conn.attach_and_set_path(&db_path)?;
231 }
232
233 Ok(conn)
234 }
235
236 /// Connects to a Hyper server and optionally attaches a database.
237 ///
238 /// # Arguments
239 ///
240 /// * `endpoint` - The server endpoint (host:port).
241 /// * `database_path` - Path to the database file.
242 /// * `create_mode` - How to handle database creation.
243 ///
244 /// # Errors
245 ///
246 /// Returns an error if the connection could not be established.
247 pub fn connect(endpoint: &str, database_path: &str, create_mode: CreateMode) -> Result<Self> {
248 crate::ConnectionBuilder::new(endpoint)
249 .database(database_path)
250 .create_mode(create_mode)
251 .build()
252 }
253
254 /// Returns a connection builder for advanced configuration.
255 ///
256 /// This is useful when you need to set authentication, timeouts, or
257 /// other advanced options before connecting.
258 #[must_use]
259 pub fn builder(endpoint: &str) -> crate::ConnectionBuilder {
260 crate::ConnectionBuilder::new(endpoint)
261 }
262
263 /// Creates a Connection from a low-level Client (internal use, TCP only).
264 pub(crate) fn from_client(client: Client, database: Option<String>) -> Self {
265 Connection {
266 transport: Transport::Tcp(Box::new(crate::transport::TcpTransport { client })),
267 database,
268 stats_provider: None,
269 pending_stats: Mutex::new(None),
270 }
271 }
272
273 /// Creates a Connection from a Transport (internal use).
274 #[allow(
275 dead_code,
276 reason = "used by ConnectionBuilder for the gRPC path; not reached under non-gRPC feature builds"
277 )]
278 pub(crate) fn from_transport(transport: Transport, database: Option<String>) -> Self {
279 Connection {
280 transport,
281 database,
282 stats_provider: None,
283 pending_stats: Mutex::new(None),
284 }
285 }
286
287 /// Returns the transport type name (e.g., "TCP", "gRPC", "Unix Socket").
288 pub fn transport_type(&self) -> &'static str {
289 self.transport.transport_type().as_str()
290 }
291
292 /// Returns true if this connection supports write operations.
293 ///
294 /// Currently, only TCP connections support writes. gRPC connections are
295 /// read-only until the server supports write operations over gRPC.
296 pub fn supports_writes(&self) -> bool {
297 self.transport.supports_writes()
298 }
299
300 /// Handles database creation logic (internal use).
301 pub(crate) fn handle_creation_mode(
302 &self,
303 database_path: &str,
304 create_mode: CreateMode,
305 ) -> Result<()> {
306 match create_mode {
307 CreateMode::DoNotCreate => {}
308 CreateMode::Create => {
309 self.execute_command(&format!(
310 "CREATE DATABASE {}",
311 escape_sql_path(database_path)
312 ))?;
313 }
314 CreateMode::CreateIfNotExists => {
315 if let Err(e) = self.execute_command(&format!(
316 "CREATE DATABASE IF NOT EXISTS {}",
317 escape_sql_path(database_path)
318 )) {
319 if !is_already_exists_error(&e) {
320 return Err(Error::internal(format!(
321 "Failed to create database '{database_path}': {e}"
322 )));
323 }
324 }
325 }
326 CreateMode::CreateAndReplace => {
327 let _ = self.execute_command(&format!(
328 "DROP DATABASE IF EXISTS {}",
329 escape_sql_path(database_path)
330 ));
331 self.execute_command(&format!(
332 "CREATE DATABASE {}",
333 escape_sql_path(database_path)
334 ))?;
335 }
336 }
337 Ok(())
338 }
339
340 /// Attaches and sets the database path (internal use).
341 pub(crate) fn attach_and_set_path(&self, database_path: &str) -> Result<()> {
342 let db_alias = std::path::Path::new(database_path)
343 .file_stem()
344 .and_then(|s| s.to_str())
345 .unwrap_or("db");
346
347 self.execute_command(&format!(
348 "ATTACH DATABASE {} AS {}",
349 escape_sql_path(database_path),
350 escape_sql_path(db_alias)
351 ))?;
352
353 self.execute_command(&format!(
354 "SET search_path TO {}, public",
355 escape_sql_path(db_alias)
356 ))?;
357
358 Ok(())
359 }
360
361 /// Connects to a Hyper server with authentication.
362 ///
363 /// # Arguments
364 ///
365 /// * `endpoint` - The server endpoint (host:port).
366 /// * `database_path` - Path to the database file.
367 /// * `create_mode` - How to handle database creation.
368 /// * `user` - Username for authentication.
369 /// * `password` - Password for authentication.
370 ///
371 /// # Errors
372 ///
373 /// Returns an error if the connection or authentication fails.
374 pub fn connect_with_auth(
375 endpoint: &str,
376 database_path: &str,
377 create_mode: CreateMode,
378 user: &str,
379 password: &str,
380 ) -> Result<Self> {
381 crate::ConnectionBuilder::new(endpoint)
382 .database(database_path)
383 .create_mode(create_mode)
384 .user(user.to_string())
385 .password(password)
386 .build()
387 }
388
389 /// Creates a connection to a Hyper server without attaching a database.
390 ///
391 /// # Errors
392 ///
393 /// Returns [`Error::Connection`] if the TCP or gRPC handshake fails, and
394 /// [`Error::Io`] if the endpoint cannot be reached.
395 pub fn without_database(endpoint: &str) -> Result<Self> {
396 crate::ConnectionBuilder::new(endpoint).build()
397 }
398
399 /// Executes a SQL command that doesn't return results.
400 ///
401 /// Use this for DDL statements (CREATE, ALTER, DROP) and DML statements
402 /// (INSERT, UPDATE, DELETE).
403 ///
404 /// # Arguments
405 ///
406 /// * `command` - The SQL command to execute.
407 ///
408 /// # Returns
409 ///
410 /// The number of affected rows, or 0 if not applicable.
411 ///
412 /// # Errors
413 ///
414 /// Returns an error if:
415 /// - The connection is using gRPC transport (write operations not yet supported)
416 /// - The command fails to execute
417 pub fn execute_command(&self, command: &str) -> Result<u64> {
418 let token = self.stats_before_query(command);
419
420 let result = self.transport.execute_command(command);
421
422 // For commands, the query is fully executed synchronously, so we
423 // can store the pending token immediately for lazy resolution.
424 self.stats_store_pending(token, command);
425
426 result
427 }
428
429 /// Executes a SQL query and returns a streaming result set.
430 ///
431 /// Results are streamed in chunks (default 64K rows), keeping memory usage
432 /// constant regardless of result set size. This makes it safe for any
433 /// result size, from a single row to billions of rows.
434 ///
435 /// # Example
436 ///
437 /// ```no_run
438 /// # use hyperdb_api::{Connection, Result};
439 /// # fn example(conn: &Connection) -> Result<()> {
440 /// let mut result = conn.execute_query("SELECT id, value FROM measurements")?;
441 /// while let Some(chunk) = result.next_chunk()? {
442 /// for row in &chunk {
443 /// // Generic typed access (like C++ row.get<T>())
444 /// let id: Option<i32> = row.get(0);
445 /// let value: Option<f64> = row.get(1);
446 ///
447 /// // Or direct accessors
448 /// let id = row.get_i32(0);
449 /// let value = row.get_f64(1);
450 /// }
451 /// }
452 /// # Ok(())
453 /// # }
454 /// ```
455 ///
456 /// # Memory Behavior
457 ///
458 /// - Only one chunk is held in memory at a time (~few MB for 64K rows)
459 /// - Safe for result sets of any size (millions/billions of rows)
460 /// - Memory usage is `O(chunk_size)`, not `O(total_rows)`
461 ///
462 /// # Errors
463 ///
464 /// - Returns [`Error::Server`] wrapping a `hyperdb_api_core::client::Error` if the
465 /// SQL fails to parse, execute, or if the server reports an error
466 /// while streaming.
467 /// - Returns [`Error::Io`] on transport-level I/O failures.
468 pub fn execute_query(&self, query: &str) -> Result<Rowset<'_>> {
469 let token = self.stats_before_query(query);
470
471 let result = match &self.transport {
472 Transport::Tcp(tcp) => {
473 let stream = tcp
474 .client
475 .query_streaming(query, DEFAULT_BINARY_CHUNK_SIZE)?;
476 Ok(Rowset::new(stream))
477 }
478 Transport::Grpc(grpc) => {
479 // gRPC streaming: pull chunks lazily so peak memory is
480 // bounded by one gRPC message (tonic default 64 MB), not
481 // by the full result size. Matches TCP's
482 // constant-memory streaming shape.
483 //
484 // The transport module already creates a fresh gRPC client
485 // per query (gRPC client needs &mut self to execute), so we
486 // do the same here: connect, start the stream, wrap as a
487 // `ChunkSource`. The stream keeps the channel and runtime
488 // alive via refcounted handles inside `GrpcChunkStreamSync`.
489 let mut client =
490 hyperdb_api_core::client::grpc::GrpcClientSync::connect(grpc.config.clone())?;
491 let stream = client.execute_query_stream(query)?;
492 let source = Box::new(crate::grpc_connection::GrpcChunkStreamSource::new(stream));
493 let arrow_rowset = crate::arrow_result::ArrowRowset::from_stream(source)?;
494 Ok(Rowset::from_arrow(arrow_rowset))
495 }
496 };
497
498 // Store the pending token — Hyper logs the execution stats after the
499 // result is consumed (streamed), so we defer resolution until
500 // last_query_stats() is called.
501 self.stats_store_pending(token, query);
502
503 result
504 }
505
506 // =========================================================================
507 // Arrow Format Queries
508 // =========================================================================
509
510 /// Executes a SELECT query and returns results as Arrow IPC stream bytes.
511 ///
512 /// # Example
513 ///
514 /// ```no_run
515 /// use hyperdb_api::{Connection, CreateMode, Result};
516 ///
517 /// fn main() -> Result<()> {
518 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
519 ///
520 /// // Create and populate a table
521 /// conn.execute_command("CREATE TABLE data (id INT, value DOUBLE PRECISION)")?;
522 /// conn.execute_command("INSERT INTO data VALUES (1, 1.5), (2, 2.5)")?;
523 ///
524 /// // Get results as Arrow IPC stream
525 /// let arrow_data = conn.execute_query_to_arrow("SELECT * FROM data")?;
526 /// println!("Got {} bytes of Arrow IPC data", arrow_data.len());
527 ///
528 /// Ok(())
529 /// }
530 /// ```
531 ///
532 /// # Errors
533 ///
534 /// Propagates any [`Error::Server`] from the TCP or gRPC transport when
535 /// the query fails or the server cannot produce Arrow IPC output.
536 pub fn execute_query_to_arrow(&self, select_query: &str) -> Result<bytes::Bytes> {
537 self.transport.execute_query_to_arrow(select_query)
538 }
539
540 /// Exports an entire table to Arrow IPC stream format.
541 ///
542 /// This is a convenience method equivalent to
543 /// `execute_query_to_arrow("SELECT * FROM table_name")`.
544 ///
545 /// # Arguments
546 ///
547 /// * `table_name` - The table name
548 ///
549 /// # Returns
550 ///
551 /// Raw Arrow IPC stream bytes containing all rows from the table.
552 ///
553 /// # Example
554 ///
555 /// ```no_run
556 /// use hyperdb_api::{Connection, CreateMode, Result};
557 ///
558 /// fn main() -> Result<()> {
559 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
560 /// let arrow_data = conn.export_table_to_arrow("my_table")?;
561 /// Ok(())
562 /// }
563 /// ```
564 ///
565 /// # Errors
566 ///
567 /// Returns whatever [`execute_query_to_arrow`](Self::execute_query_to_arrow)
568 /// would return for `SELECT * FROM <table_name>` — typically
569 /// [`Error::Server`] if the table does not exist or the query is rejected.
570 pub fn export_table_to_arrow(&self, table_name: &str) -> Result<bytes::Bytes> {
571 self.execute_query_to_arrow(&format!("SELECT * FROM {table_name}"))
572 }
573
574 /// Executes a SELECT query and returns results as Arrow `RecordBatch`es.
575 ///
576 /// This is the recommended method for Arrow-native workflows (`DataFusion`,
577 /// Polars, etc.) where you want direct `RecordBatch` access without going
578 /// through the `Row` abstraction.
579 ///
580 /// # Example
581 ///
582 /// ```no_run
583 /// use hyperdb_api::{Connection, CreateMode, Result};
584 /// use arrow::record_batch::RecordBatch;
585 ///
586 /// fn main() -> Result<()> {
587 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
588 ///
589 /// let batches: Vec<RecordBatch> = conn.execute_query_to_batches("SELECT * FROM data")?;
590 /// for batch in &batches {
591 /// println!("batch: {} rows x {} cols", batch.num_rows(), batch.num_columns());
592 /// }
593 /// Ok(())
594 /// }
595 /// ```
596 ///
597 /// # Errors
598 ///
599 /// - Returns [`Error::Server`] if the query itself fails.
600 /// - Returns [`Error::Conversion`] if the Arrow IPC payload returned by the
601 /// server is malformed and cannot be decoded into record batches.
602 pub fn execute_query_to_batches(
603 &self,
604 select_query: &str,
605 ) -> Result<Vec<arrow::record_batch::RecordBatch>> {
606 let arrow_data = self.execute_query_to_arrow(select_query)?;
607 crate::arrow_result::parse_arrow_ipc(arrow_data)
608 }
609
610 /// Fetches a single row from a query.
611 ///
612 /// Returns an error if the query returns no rows.
613 ///
614 /// # Example
615 ///
616 /// ```no_run
617 /// use hyperdb_api::{Connection, CreateMode, Result};
618 ///
619 /// fn main() -> Result<()> {
620 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
621 /// let row = conn.fetch_one("SELECT * FROM users WHERE id = 1")?;
622 /// let id: Option<i32> = row.get(0);
623 /// let name: Option<String> = row.get(1);
624 /// Ok(())
625 /// }
626 /// ```
627 ///
628 /// # Errors
629 ///
630 /// - Returns the error from [`execute_query`](Self::execute_query) if
631 /// the query itself fails.
632 /// - Returns [`Error::Conversion`] with message `"Query returned no rows"` if
633 /// the query produced zero rows.
634 pub fn fetch_one<Q>(&self, query: Q) -> Result<crate::Row>
635 where
636 Q: AsRef<str>,
637 {
638 let query = query.as_ref();
639 let result = self.execute_query(query)?;
640 result.require_first_row()
641 }
642
643 /// Fetches an optional single row from a query.
644 ///
645 /// Returns `None` if the query returns no rows.
646 ///
647 /// # Example
648 ///
649 /// ```no_run
650 /// use hyperdb_api::{Connection, CreateMode, Result};
651 ///
652 /// fn main() -> Result<()> {
653 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
654 /// if let Some(row) = conn.fetch_optional("SELECT * FROM users WHERE id = 999")? {
655 /// let name: Option<String> = row.get(1);
656 /// println!("Found user: {:?}", name);
657 /// }
658 /// Ok(())
659 /// }
660 /// ```
661 ///
662 /// # Errors
663 ///
664 /// Returns the error from [`execute_query`](Self::execute_query) if the
665 /// query itself fails. An empty result set is not an error — it yields
666 /// `Ok(None)`.
667 pub fn fetch_optional<Q>(&self, query: Q) -> Result<Option<crate::Row>>
668 where
669 Q: AsRef<str>,
670 {
671 let query = query.as_ref();
672 let result = self.execute_query(query)?;
673 result.first_row()
674 }
675
676 /// Fetches all rows from a query.
677 ///
678 /// # Example
679 ///
680 /// ```no_run
681 /// use hyperdb_api::{Connection, CreateMode, Result};
682 ///
683 /// fn main() -> Result<()> {
684 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
685 /// let rows = conn.fetch_all("SELECT * FROM users WHERE active = true ORDER BY name")?;
686 /// for row in rows {
687 /// let id: Option<i32> = row.get(0);
688 /// let name: Option<String> = row.get(1);
689 /// println!("User {}: {:?}", id.unwrap_or(-1), name);
690 /// }
691 /// Ok(())
692 /// }
693 /// ```
694 ///
695 /// # Errors
696 ///
697 /// Returns the error from [`execute_query`](Self::execute_query), or a
698 /// transport error produced while draining every chunk of the streamed
699 /// result set.
700 pub fn fetch_all<Q>(&self, query: Q) -> Result<Vec<crate::Row>>
701 where
702 Q: AsRef<str>,
703 {
704 let query = query.as_ref();
705 let result = self.execute_query(query)?;
706 result.collect_rows()
707 }
708
709 /// Fetches a single row and maps it to a struct using [`FromRow`](crate::FromRow).
710 ///
711 /// Returns an error if the query returns no rows or if mapping fails.
712 ///
713 /// # Example
714 ///
715 /// ```no_run
716 /// use hyperdb_api::{Connection, CreateMode, FromRow, RowAccessor, Result};
717 ///
718 /// struct User { id: i32, name: String }
719 ///
720 /// impl FromRow for User {
721 /// fn from_row(row: RowAccessor<'_>) -> Result<Self> {
722 /// Ok(User {
723 /// id: row.get("id")?,
724 /// name: row.get_opt("name")?.unwrap_or_default(),
725 /// })
726 /// }
727 /// }
728 ///
729 /// fn main() -> Result<()> {
730 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
731 /// let user: User = conn.fetch_one_as("SELECT id, name FROM users WHERE id = 1")?;
732 /// Ok(())
733 /// }
734 /// ```
735 ///
736 /// # Errors
737 ///
738 /// - Returns the error from [`fetch_one`](Self::fetch_one) if the query
739 /// fails or returns no rows.
740 /// - Returns whatever error [`FromRow::from_row`](crate::FromRow::from_row)
741 /// produces when the row cannot be mapped into `T`.
742 pub fn fetch_one_as<T: crate::FromRow>(&self, query: &str) -> Result<T> {
743 let row = self.fetch_one(query)?;
744 let indices = row
745 .schema()
746 .map(crate::row_accessor::RowAccessor::build_indices)
747 .unwrap_or_default();
748 T::from_row(crate::RowAccessor::new(&row, &indices))
749 }
750
751 /// Fetches all rows and maps them to structs using [`FromRow`](crate::FromRow).
752 ///
753 /// # Example
754 ///
755 /// ```no_run
756 /// # use hyperdb_api::{Connection, FromRow, RowAccessor, Result};
757 /// # struct User { id: i32, name: String }
758 /// # impl FromRow for User {
759 /// # fn from_row(row: RowAccessor<'_>) -> Result<Self> {
760 /// # Ok(User { id: row.get("id")?, name: row.get_opt("name")?.unwrap_or_default() })
761 /// # }
762 /// # }
763 /// # fn example(conn: &Connection) -> Result<()> {
764 /// let users: Vec<User> = conn.fetch_all_as("SELECT id, name FROM users")?;
765 /// # Ok(())
766 /// # }
767 /// ```
768 ///
769 /// # Errors
770 ///
771 /// - Returns the error from [`fetch_all`](Self::fetch_all) if the query
772 /// fails.
773 /// - Returns the first error produced by
774 /// [`FromRow::from_row`](crate::FromRow::from_row) on any of the rows.
775 pub fn fetch_all_as<T: crate::FromRow>(&self, query: &str) -> Result<Vec<T>> {
776 let rows = self.fetch_all(query)?;
777 // Build the column-name → index lookup once from the first
778 // row's schema; reuse for every row. All rows in a result set
779 // share the same `Arc<ResultSchema>`, so this is safe.
780 let indices = rows
781 .first()
782 .and_then(crate::result::Row::schema)
783 .map(crate::row_accessor::RowAccessor::build_indices)
784 .unwrap_or_default();
785 rows.iter()
786 .map(|r| T::from_row(crate::RowAccessor::new(r, &indices)))
787 .collect()
788 }
789
790 /// Returns a lazy iterator over rows, mapping each to `T` via
791 /// [`FromRow`].
792 ///
793 /// This is the streaming variant of [`fetch_all_as`](Self::fetch_all_as):
794 /// memory usage is bounded by the chunk size (default 64K rows), not by
795 /// the total row count. Use this for large result sets where collecting
796 /// all rows into a `Vec` would exceed memory limits.
797 ///
798 /// The column-name → index lookup table is built exactly once (on the
799 /// first non-empty chunk) and reused for all rows, so per-row mapping is
800 /// O(1) in column count.
801 ///
802 /// # Example
803 ///
804 /// ```no_run
805 /// # use hyperdb_api::{Connection, CreateMode, FromRow, RowAccessor, Result};
806 /// # struct User { id: i32, name: String }
807 /// # impl FromRow for User {
808 /// # fn from_row(row: RowAccessor<'_>) -> Result<Self> {
809 /// # Ok(User { id: row.get("id")?, name: row.get("name")? })
810 /// # }
811 /// # }
812 /// # fn example(conn: &Connection) -> Result<()> {
813 /// for row_result in conn.stream_as::<User>("SELECT id, name FROM users")? {
814 /// let user = row_result?;
815 /// println!("{}: {}", user.id, user.name);
816 /// }
817 /// # Ok(())
818 /// # }
819 /// ```
820 ///
821 /// # Errors
822 ///
823 /// - The returned `Result` wraps errors detected while *opening* the
824 /// result stream — transport/connection failures, and (on the gRPC
825 /// transport, which establishes the query stream eagerly) SQL parse and
826 /// server errors. On the default TCP transport the query is streamed
827 /// lazily, so SQL errors such as a missing table are typically reported
828 /// as the **first yielded item** rather than by this outer `Result`.
829 /// - Each yielded item is itself a `Result<T>`:
830 /// - `Ok(T)` if the row was successfully mapped via `FromRow`.
831 /// - `Err(e)` for a server/transport error encountered while streaming a
832 /// later chunk, or for a per-row mapping failure (missing column, type
833 /// mismatch, NULL in a non-optional field).
834 ///
835 /// In short: always handle errors *both* on the outer `Result` and on each
836 /// item — do not assume a successfully-returned iterator means the query
837 /// succeeded.
838 ///
839 /// [`FromRow`]: crate::FromRow
840 pub fn stream_as<'a, T>(&'a self, query: &str) -> Result<impl Iterator<Item = Result<T>> + 'a>
841 where
842 T: crate::FromRow + 'a,
843 {
844 let rowset = self.execute_query(query)?;
845 Ok(crate::result::TypedRowIterator::<T>::new(rowset))
846 }
847
848 /// Fetches a single scalar value from a query.
849 ///
850 /// Returns an error if the query returns no rows or NULL.
851 ///
852 /// # Example
853 ///
854 /// ```no_run
855 /// use hyperdb_api::{Connection, CreateMode, Result};
856 ///
857 /// fn main() -> Result<()> {
858 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
859 /// let count: i64 = conn.fetch_scalar("SELECT COUNT(*) FROM users")?;
860 /// println!("User count: {}", count);
861 /// Ok(())
862 /// }
863 /// ```
864 ///
865 /// # Errors
866 ///
867 /// - Returns the error from [`execute_query`](Self::execute_query) if
868 /// the query itself fails.
869 /// - Returns [`Error::Conversion`] with message `"Query returned no rows"` if
870 /// the query produced zero rows.
871 /// - Returns [`Error::Conversion`] with message `"Scalar query returned NULL"`
872 /// if the single cell is SQL `NULL`.
873 pub fn fetch_scalar<T, Q>(&self, query: Q) -> Result<T>
874 where
875 T: crate::connection::ScalarValue + crate::result::RowValue,
876 Q: AsRef<str>,
877 {
878 let query = query.as_ref();
879 let result = self.execute_query(query)?;
880 result.require_scalar()
881 }
882
883 /// Fetches an optional scalar value from a query.
884 ///
885 /// Returns `None` if the query returns no rows or NULL.
886 ///
887 /// # Example
888 ///
889 /// ```no_run
890 /// use hyperdb_api::{Connection, CreateMode, Result};
891 ///
892 /// fn main() -> Result<()> {
893 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
894 /// let max_id: Option<i32> = conn.fetch_optional_scalar("SELECT MAX(id) FROM users")?;
895 /// println!("Max ID: {:?}", max_id);
896 /// Ok(())
897 /// }
898 /// ```
899 ///
900 /// # Errors
901 ///
902 /// - Returns the error from [`execute_query`](Self::execute_query) if
903 /// the query itself fails.
904 /// - Returns [`Error::Conversion`] with message `"Query returned no rows"` if
905 /// the query produced zero rows. (An empty result is treated as an
906 /// error here because we need at least one row to inspect; SQL `NULL`
907 /// in the single cell yields `Ok(None)`.)
908 pub fn fetch_optional_scalar<T, Q>(&self, query: Q) -> Result<Option<T>>
909 where
910 T: crate::connection::ScalarValue + crate::result::RowValue,
911 Q: AsRef<str>,
912 {
913 let query = query.as_ref();
914 let result = self.execute_query(query)?;
915 result.scalar()
916 }
917
918 /// Executes a scalar query and returns a single value of type `T`.
919 ///
920 /// Alias for [`fetch_optional_scalar`](Self::fetch_optional_scalar) for C++ API compatibility.
921 ///
922 /// # Errors
923 ///
924 /// See [`fetch_optional_scalar`](Self::fetch_optional_scalar).
925 #[inline]
926 pub fn execute_scalar_query<T>(&self, query: &str) -> Result<Option<T>>
927 where
928 T: ScalarValue + crate::result::RowValue,
929 {
930 self.fetch_optional_scalar(query)
931 }
932
933 /// Queries for a count value, defaulting to 0 if NULL.
934 ///
935 /// This is optimized for COUNT queries which typically return 0
936 /// instead of NULL when there are no matching rows.
937 ///
938 /// # Example
939 ///
940 /// ```no_run
941 /// use hyperdb_api::{Connection, CreateMode, Result};
942 ///
943 /// fn main() -> Result<()> {
944 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
945 /// let count = conn.query_count("SELECT COUNT(*) FROM users WHERE active = true")?;
946 /// println!("Active users: {}", count);
947 /// Ok(())
948 /// }
949 /// ```
950 ///
951 /// # Errors
952 ///
953 /// Returns the error from [`execute_query`](Self::execute_query) if the
954 /// query fails or produces no rows. SQL `NULL` is mapped to `0`, not an
955 /// error.
956 pub fn query_count(&self, query: &str) -> Result<i64> {
957 self.fetch_optional_scalar::<i64, _>(query)
958 .map(|opt| opt.unwrap_or(0))
959 }
960
961 // =========================================================================
962 // Parameterized Queries (SQL Injection Safe)
963 // =========================================================================
964
965 /// Executes a parameterized query, returning streaming results.
966 ///
967 /// This is safe to use with untrusted user input: parameters travel
968 /// through the extended query protocol (Parse/Bind/Execute) as
969 /// binary `HyperBinary` values and are never interpolated into the
970 /// SQL string. For repeated executions of the same SQL with different
971 /// values, prefer the explicit [`prepare`](Self::prepare) API — it
972 /// returns a reusable [`PreparedStatement`](crate::PreparedStatement)
973 /// that skips the Parse round-trip on every call.
974 ///
975 /// Under the hood, `query_params` is a one-shot
976 /// prepare+execute+close: it prepares an unnamed statement, binds
977 /// the parameters, starts streaming, and closes the statement when
978 /// the returned [`Rowset`] is dropped.
979 ///
980 /// # Arguments
981 ///
982 /// * `query` - The SQL query with parameter placeholders (`$1`, `$2`, etc.)
983 /// * `params` - Parameter values matching the placeholders
984 ///
985 /// # SQL Injection Prevention
986 ///
987 /// ```no_run
988 /// use hyperdb_api::{Connection, CreateMode, Result};
989 ///
990 /// fn search_users(conn: &Connection, user_input: &str) -> Result<()> {
991 /// // DANGEROUS - vulnerable to SQL injection:
992 /// // let query = format!("SELECT * FROM users WHERE name = '{}'", user_input);
993 ///
994 /// // SAFE - parameterized query:
995 /// let mut result = conn.query_params(
996 /// "SELECT * FROM users WHERE name = $1",
997 /// &[&user_input],
998 /// )?;
999 ///
1000 /// while let Some(chunk) = result.next_chunk()? {
1001 /// for row in &chunk {
1002 /// let id: Option<i32> = row.get(0);
1003 /// let name: Option<String> = row.get(1);
1004 /// println!("Found: {:?} - {:?}", id, name);
1005 /// }
1006 /// }
1007 /// Ok(())
1008 /// }
1009 /// ```
1010 ///
1011 /// # Multiple Parameters
1012 ///
1013 /// ```no_run
1014 /// use hyperdb_api::{Connection, CreateMode, Result};
1015 ///
1016 /// fn main() -> Result<()> {
1017 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
1018 ///
1019 /// // Multiple parameters of different types
1020 /// let result = conn.query_params(
1021 /// "SELECT * FROM orders WHERE customer_id = $1 AND total > $2",
1022 /// &[&42i32, &100.0f64],
1023 /// )?;
1024 /// Ok(())
1025 /// }
1026 /// ```
1027 ///
1028 /// # Errors
1029 ///
1030 /// - Returns [`Error::FeatureNotSupported`] if the connection is using gRPC transport
1031 /// (prepared statements are TCP-only).
1032 /// - Returns [`Error::Server`] if the server rejects the statement at
1033 /// `Parse`, `Bind`, or `Execute` time, including on type-mismatch
1034 /// between `params` and the inferred OIDs.
1035 /// - Returns [`Error::Io`] on transport-level I/O failures.
1036 pub fn query_params(
1037 &self,
1038 query: &str,
1039 params: &[&dyn crate::params::ToSqlParam],
1040 ) -> Result<Rowset<'_>> {
1041 // Implementation note: routes through the extended query protocol
1042 // via Parse/Bind/Execute so parameters travel in HyperBinary
1043 // format — no SQL escaping, full SQL-injection safety regardless of
1044 // parameter content. The statement handle is stashed inside the
1045 // returned Rowset so its Drop-time close_statement fires *after*
1046 // the rowset releases its connection lock (otherwise the close
1047 // would deadlock on the still-held mutex).
1048 let client = match &self.transport {
1049 Transport::Tcp(tcp) => &tcp.client,
1050 Transport::Grpc(_) => {
1051 return Err(Error::feature_not_supported(
1052 "prepared statements are not supported over gRPC transport",
1053 ));
1054 }
1055 };
1056 let oids: Vec<crate::Oid> = params.iter().map(|p| p.sql_oid()).collect();
1057 let stmt = client.prepare_typed(query, &oids)?;
1058 let encoded: Vec<Option<Vec<u8>>> = params.iter().map(|p| p.encode_param()).collect();
1059 let stream =
1060 client.execute_streaming(&stmt, encoded, crate::result::DEFAULT_BINARY_CHUNK_SIZE)?;
1061 Ok(Rowset::from_prepared(stream).with_statement_guard(stmt))
1062 }
1063
1064 /// Executes a parameterized command that doesn't return rows.
1065 ///
1066 /// Use this for INSERT, UPDATE, DELETE, or DDL statements with parameters.
1067 /// Returns the number of affected rows.
1068 ///
1069 /// See [`query_params`](Self::query_params) for details on parameter
1070 /// handling and SQL injection prevention.
1071 ///
1072 /// # Example
1073 ///
1074 /// ```no_run
1075 /// use hyperdb_api::{Connection, CreateMode, Result};
1076 ///
1077 /// fn delete_user(conn: &Connection, user_id: i32) -> Result<u64> {
1078 /// // Safe from SQL injection
1079 /// conn.command_params("DELETE FROM users WHERE id = $1", &[&user_id])
1080 /// }
1081 /// ```
1082 ///
1083 /// # Errors
1084 ///
1085 /// - Returns [`Error::FeatureNotSupported`] if the connection is using gRPC transport.
1086 /// - Returns [`Error::Server`] if the server rejects the statement at
1087 /// `Parse`, `Bind`, or `Execute` time.
1088 /// - Returns [`Error::Io`] on transport-level I/O failures.
1089 pub fn command_params(
1090 &self,
1091 query: &str,
1092 params: &[&dyn crate::params::ToSqlParam],
1093 ) -> Result<u64> {
1094 // One-shot prepare+execute with explicit OIDs — see `query_params`
1095 // for why we collect OIDs from each parameter.
1096 let client = match &self.transport {
1097 Transport::Tcp(tcp) => &tcp.client,
1098 Transport::Grpc(_) => {
1099 return Err(Error::feature_not_supported(
1100 "prepared statements are not supported over gRPC transport",
1101 ));
1102 }
1103 };
1104 let oids: Vec<crate::Oid> = params.iter().map(|p| p.sql_oid()).collect();
1105 let stmt = client.prepare_typed(query, &oids)?;
1106 let encoded: Vec<Option<Vec<u8>>> = params.iter().map(|p| p.encode_param()).collect();
1107 Ok(client.execute_no_result(&stmt, encoded)?)
1108 }
1109
1110 /// Executes multiple SQL statements in a single call.
1111 ///
1112 /// Each statement is executed sequentially. If any statement fails,
1113 /// execution stops and the error is returned. Returns the total number
1114 /// of affected rows across all statements.
1115 ///
1116 /// This is more efficient than calling `execute_command` in a loop
1117 /// because it reduces round-trips for DDL scripts and multi-statement setup.
1118 ///
1119 /// # Example
1120 ///
1121 /// ```no_run
1122 /// use hyperdb_api::{Connection, CreateMode, Result};
1123 ///
1124 /// fn main() -> Result<()> {
1125 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
1126 /// let total = conn.execute_batch(&[
1127 /// "CREATE TABLE users (id INT, name TEXT)",
1128 /// "INSERT INTO users VALUES (1, 'Alice')",
1129 /// "INSERT INTO users VALUES (2, 'Bob')",
1130 /// ])?;
1131 /// println!("Total affected: {}", total);
1132 /// Ok(())
1133 /// }
1134 /// ```
1135 ///
1136 /// # Errors
1137 ///
1138 /// Returns a wrapped [`Error::Internal`] on the first statement that fails;
1139 /// its `source` is the original [`Error::Server`] from
1140 /// [`execute_command`](Self::execute_command). The error message
1141 /// includes the failing statement's ordinal and an 80-character preview
1142 /// of its SQL.
1143 pub fn execute_batch(&self, statements: &[&str]) -> Result<u64> {
1144 let mut total = 0u64;
1145 for (i, stmt) in statements.iter().enumerate() {
1146 if !stmt.trim().is_empty() {
1147 total += self.execute_command(stmt).map_err(|e| {
1148 let preview: String = stmt.chars().take(80).collect();
1149 Error::internal(format!(
1150 "execute_batch failed at statement {} of {}: {}: {}",
1151 i + 1,
1152 statements.len(),
1153 preview,
1154 e,
1155 ))
1156 })?;
1157 }
1158 }
1159 Ok(total)
1160 }
1161
1162 /// Returns the attached database path, if any.
1163 pub fn database(&self) -> Option<&str> {
1164 self.database.as_deref()
1165 }
1166
1167 /// Creates a new database file.
1168 ///
1169 /// # Example
1170 ///
1171 /// ```no_run
1172 /// use hyperdb_api::{Connection, Result};
1173 ///
1174 /// fn main() -> Result<()> {
1175 /// let conn = Connection::without_database("localhost:7483")?;
1176 /// conn.create_database("new_database.hyper")?;
1177 /// Ok(())
1178 /// }
1179 /// ```
1180 ///
1181 /// # Errors
1182 ///
1183 /// Returns [`Error::Server`] if the server rejects the
1184 /// `CREATE DATABASE IF NOT EXISTS` statement (e.g. the path is not
1185 /// writable on the server).
1186 pub fn create_database(&self, path: &str) -> Result<()> {
1187 let sql = format!("CREATE DATABASE IF NOT EXISTS {}", escape_sql_path(path));
1188 self.execute_command(&sql)?;
1189 Ok(())
1190 }
1191
1192 /// Drops (deletes) a database file.
1193 ///
1194 /// # Example
1195 ///
1196 /// ```no_run
1197 /// use hyperdb_api::{Connection, Result};
1198 ///
1199 /// fn main() -> Result<()> {
1200 /// let conn = Connection::without_database("localhost:7483")?;
1201 /// conn.drop_database("old_database.hyper")?;
1202 /// Ok(())
1203 /// }
1204 /// ```
1205 ///
1206 /// # Errors
1207 ///
1208 /// Returns [`Error::Server`] if the server rejects the
1209 /// `DROP DATABASE IF EXISTS` statement (e.g. the database is still
1210 /// attached or permissions deny deletion).
1211 pub fn drop_database(&self, path: &str) -> Result<()> {
1212 let sql = format!("DROP DATABASE IF EXISTS {}", escape_sql_path(path));
1213 self.execute_command(&sql)?;
1214 Ok(())
1215 }
1216
1217 /// Attaches a database file to the connection.
1218 ///
1219 /// Once attached, the database can be queried and modified.
1220 /// The database is identified by its alias (or by its path if no alias is provided).
1221 ///
1222 /// # Arguments
1223 ///
1224 /// * `path` - The path to the database file to attach.
1225 /// * `alias` - Optional alias for the database. If `None`, the database is
1226 /// attached without an explicit alias (typically using its filename).
1227 ///
1228 /// # Errors
1229 ///
1230 /// Returns an error if the database file doesn't exist or if attachment fails.
1231 ///
1232 /// # Example
1233 ///
1234 /// ```no_run
1235 /// use hyperdb_api::{Connection, Result};
1236 ///
1237 /// fn main() -> Result<()> {
1238 /// let conn = Connection::without_database("localhost:7483")?;
1239 ///
1240 /// // Attach with an alias
1241 /// conn.attach_database("data.hyper", Some("mydata"))?;
1242 ///
1243 /// // Attach without an alias
1244 /// conn.attach_database("other.hyper", None)?;
1245 /// Ok(())
1246 /// }
1247 /// ```
1248 pub fn attach_database(&self, path: &str, alias: Option<&str>) -> Result<()> {
1249 let sql = if let Some(alias) = alias {
1250 format!(
1251 "ATTACH DATABASE {} AS {}",
1252 escape_sql_path(path),
1253 escape_sql_path(alias)
1254 )
1255 } else {
1256 format!("ATTACH DATABASE {}", escape_sql_path(path))
1257 };
1258 self.execute_command(&sql)?;
1259 Ok(())
1260 }
1261
1262 /// Detaches a database from this connection.
1263 ///
1264 /// After detaching, the database file is released and can be accessed
1265 /// externally (e.g., copied, moved, etc.). All pending updates are
1266 /// written to disk before detaching.
1267 ///
1268 /// # Arguments
1269 ///
1270 /// * `alias` - The alias of the database to detach.
1271 ///
1272 /// # Errors
1273 ///
1274 /// Returns an error if the database is not attached or if detachment fails.
1275 ///
1276 /// # Example
1277 ///
1278 /// ```no_run
1279 /// use hyperdb_api::{Connection, Result};
1280 ///
1281 /// fn main() -> Result<()> {
1282 /// let conn = Connection::without_database("localhost:7483")?;
1283 /// conn.attach_database("data.hyper", Some("mydata"))?;
1284 /// // ... work with the database ...
1285 /// conn.detach_database("mydata")?;
1286 /// Ok(())
1287 /// }
1288 /// ```
1289 pub fn detach_database(&self, alias: &str) -> Result<()> {
1290 let sql = format!("DETACH DATABASE {}", escape_sql_path(alias));
1291 self.execute_command(&sql)?;
1292 Ok(())
1293 }
1294
1295 /// Detaches all databases from this connection.
1296 ///
1297 /// This is useful for cleanup before closing a connection or when
1298 /// you need to release all database files.
1299 ///
1300 /// # Errors
1301 ///
1302 /// Returns [`Error::Server`] if the server rejects the
1303 /// `DETACH ALL DATABASES` statement (e.g. a database is still in use by
1304 /// another session).
1305 pub fn detach_all_databases(&self) -> Result<()> {
1306 self.execute_command("DETACH ALL DATABASES")?;
1307 Ok(())
1308 }
1309
1310 /// Creates a schema in the database.
1311 ///
1312 /// # Errors
1313 ///
1314 /// - Returns an error if `schema_name` cannot be converted into a
1315 /// [`SchemaName`](crate::SchemaName) (invalid identifier).
1316 /// - Returns [`Error::Server`] if the server rejects the
1317 /// `CREATE SCHEMA` statement (e.g. the schema already exists).
1318 pub fn create_schema<T>(&self, schema_name: T) -> Result<()>
1319 where
1320 T: TryInto<crate::SchemaName>,
1321 crate::Error: From<T::Error>,
1322 {
1323 crate::catalog::Catalog::new(self).create_schema(schema_name)
1324 }
1325
1326 /// Checks whether a schema exists.
1327 ///
1328 /// # Arguments
1329 ///
1330 /// * `schema` - The schema name (can include database qualifier).
1331 ///
1332 /// # Example
1333 ///
1334 /// ```no_run
1335 /// use hyperdb_api::{Connection, Result};
1336 ///
1337 /// fn main() -> Result<()> {
1338 /// let conn = Connection::without_database("localhost:7483")?;
1339 /// if conn.has_schema("public")? {
1340 /// println!("Schema 'public' exists");
1341 /// }
1342 /// Ok(())
1343 /// }
1344 /// ```
1345 ///
1346 /// # Errors
1347 ///
1348 /// - Returns an error if `schema` cannot be converted into a
1349 /// [`SchemaName`](crate::SchemaName).
1350 /// - Returns [`Error::Server`] if the catalog lookup query fails.
1351 pub fn has_schema<T>(&self, schema: T) -> Result<bool>
1352 where
1353 T: TryInto<crate::SchemaName>,
1354 crate::Error: From<T::Error>,
1355 {
1356 use crate::catalog::Catalog;
1357 Catalog::new(self).has_schema(schema)
1358 }
1359
1360 /// Checks whether a table exists.
1361 ///
1362 /// # Arguments
1363 ///
1364 /// * `table_name` - The table name (can include database and schema qualifiers).
1365 ///
1366 /// # Example
1367 ///
1368 /// ```no_run
1369 /// use hyperdb_api::{Connection, Result};
1370 ///
1371 /// fn main() -> Result<()> {
1372 /// let conn = Connection::without_database("localhost:7483")?;
1373 /// if conn.has_table("public.users")? {
1374 /// println!("Table 'users' exists");
1375 /// }
1376 /// Ok(())
1377 /// }
1378 /// ```
1379 ///
1380 /// # Errors
1381 ///
1382 /// - Returns an error if `table_name` cannot be converted into a
1383 /// [`TableName`](crate::TableName).
1384 /// - Returns [`Error::Server`] if the catalog lookup query fails.
1385 pub fn has_table<T>(&self, table_name: T) -> Result<bool>
1386 where
1387 T: TryInto<crate::TableName>,
1388 crate::Error: From<T::Error>,
1389 {
1390 use crate::catalog::Catalog;
1391 Catalog::new(self).has_table(table_name)
1392 }
1393
1394 /// Returns the server version as a parsed struct.
1395 ///
1396 /// Returns `None` if the version cannot be determined (e.g., gRPC connection).
1397 ///
1398 /// # Example
1399 ///
1400 /// ```no_run
1401 /// use hyperdb_api::{Connection, CreateMode, HyperProcess, ServerVersion, Result};
1402 ///
1403 /// fn main() -> Result<()> {
1404 /// let hyper = HyperProcess::new(None, None)?;
1405 /// let conn = Connection::new(&hyper, "test.hyper", CreateMode::CreateIfNotExists)?;
1406 /// if let Some(version) = conn.server_version() {
1407 /// println!("Hyper {}", version);
1408 /// if version >= ServerVersion::new(0, 1, 0) {
1409 /// println!("Has feature X");
1410 /// }
1411 /// }
1412 /// Ok(())
1413 /// }
1414 /// ```
1415 pub fn server_version(&self) -> Option<crate::ServerVersion> {
1416 let version_str = self.parameter_status("server_version")?;
1417 crate::ServerVersion::parse(&version_str)
1418 }
1419
1420 /// Copies a database file to a new path.
1421 ///
1422 /// The source database must be attached to this connection.
1423 ///
1424 /// # Example
1425 ///
1426 /// ```no_run
1427 /// use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
1428 ///
1429 /// fn main() -> Result<()> {
1430 /// let hyper = HyperProcess::new(None, None)?;
1431 /// let conn = Connection::new(&hyper, "source.hyper", CreateMode::DoNotCreate)?;
1432 /// conn.copy_database("source.hyper", "backup.hyper")?;
1433 /// Ok(())
1434 /// }
1435 /// ```
1436 ///
1437 /// # Errors
1438 ///
1439 /// Returns [`Error::Server`] if the server rejects the
1440 /// `COPY DATABASE` statement — e.g. the source is not attached, the
1441 /// destination path is not writable, or it already exists.
1442 pub fn copy_database(&self, source: &str, destination: &str) -> Result<()> {
1443 let sql = format!(
1444 "COPY DATABASE {} TO {}",
1445 escape_sql_path(source),
1446 escape_sql_path(destination)
1447 );
1448 self.execute_command(&sql)?;
1449 Ok(())
1450 }
1451
1452 /// Executes EXPLAIN on a query and returns the plan as a string.
1453 ///
1454 /// # Example
1455 ///
1456 /// ```no_run
1457 /// use hyperdb_api::{Connection, CreateMode, Result};
1458 ///
1459 /// fn main() -> Result<()> {
1460 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
1461 /// let plan = conn.explain("SELECT * FROM users WHERE id = 1")?;
1462 /// println!("{}", plan);
1463 /// Ok(())
1464 /// }
1465 /// ```
1466 ///
1467 /// # Errors
1468 ///
1469 /// Returns [`Error::Server`] if `EXPLAIN <query>` fails to parse or
1470 /// plan, or if the streamed result cannot be consumed.
1471 pub fn explain(&self, query: &str) -> Result<String> {
1472 let explain_sql = format!("EXPLAIN {query}");
1473 let result = self.execute_query(&explain_sql)?;
1474 let mut lines = Vec::new();
1475 for row in result.rows() {
1476 let row = row?;
1477 if let Some(line) = row.get::<String>(0) {
1478 lines.push(line);
1479 }
1480 }
1481 Ok(lines.join("\n"))
1482 }
1483
1484 /// Executes EXPLAIN ANALYZE on a query and returns the plan with timing info.
1485 ///
1486 /// **Note:** This actually executes the query to collect timing information.
1487 ///
1488 /// # Errors
1489 ///
1490 /// Returns [`Error::Server`] if `EXPLAIN ANALYZE <query>` fails — this
1491 /// includes any runtime error raised by actually executing `query`.
1492 pub fn explain_analyze(&self, query: &str) -> Result<String> {
1493 let explain_sql = format!("EXPLAIN ANALYZE {query}");
1494 let result = self.execute_query(&explain_sql)?;
1495 let mut lines = Vec::new();
1496 for row in result.rows() {
1497 let row = row?;
1498 if let Some(line) = row.get::<String>(0) {
1499 lines.push(line);
1500 }
1501 }
1502 Ok(lines.join("\n"))
1503 }
1504
1505 /// Returns a reference to the underlying TCP client.
1506 ///
1507 /// # Panics
1508 ///
1509 /// This method returns `None` if the connection is using gRPC transport.
1510 pub fn tcp_client(&self) -> Option<&Client> {
1511 match &self.transport {
1512 Transport::Tcp(tcp) => Some(&tcp.client),
1513 Transport::Grpc(_) => None,
1514 }
1515 }
1516
1517 /// Crate-internal accessor for the transport. Used by
1518 /// [`PreparedStatement`](crate::PreparedStatement) to reach the
1519 /// underlying `hyperdb_api_core::client::Client`.
1520 pub(crate) fn transport(&self) -> &Transport {
1521 &self.transport
1522 }
1523
1524 /// Prepares a SQL statement with automatic parameter type inference.
1525 ///
1526 /// The returned [`PreparedStatement`](crate::PreparedStatement) can
1527 /// be executed many times with different parameter values; the
1528 /// server caches the parsed plan. This is the preferred way to
1529 /// execute a statement repeatedly inside a loop.
1530 ///
1531 /// For explicit parameter types (necessary when `$N` placeholders
1532 /// would otherwise be ambiguous), use
1533 /// [`prepare_typed`](Self::prepare_typed).
1534 ///
1535 /// # Example
1536 ///
1537 /// ```no_run
1538 /// # use hyperdb_api::{Connection, CreateMode, Result};
1539 /// # fn example(conn: &Connection) -> Result<()> {
1540 /// let stmt = conn.prepare("SELECT name FROM users WHERE id = $1")?;
1541 /// for id in [1_i32, 2, 3] {
1542 /// let name: String = stmt.fetch_scalar(&[&id])?;
1543 /// println!("{id}: {name}");
1544 /// }
1545 /// # Ok(())
1546 /// # }
1547 /// ```
1548 ///
1549 /// # Errors
1550 ///
1551 /// See [`prepare_typed`](Self::prepare_typed) — this method delegates
1552 /// to it with an empty OID list.
1553 pub fn prepare(&self, query: &str) -> Result<crate::PreparedStatement<'_>> {
1554 self.prepare_typed(query, &[])
1555 }
1556
1557 /// Prepares a SQL statement with explicit parameter type OIDs.
1558 ///
1559 /// Use this when the server cannot infer parameter types from the
1560 /// SQL alone (e.g. a bare `$1` in a `WHERE v > $1` clause with no
1561 /// other context). Constants for common types live in
1562 /// [`hyperdb_api_core::types::oids`].
1563 ///
1564 /// # Errors
1565 ///
1566 /// - Returns [`Error::FeatureNotSupported`] if the connection is using gRPC transport
1567 /// (prepared statements are TCP-only).
1568 /// - Returns [`Error::Server`] if the server rejects the `Parse`
1569 /// message, e.g. SQL syntax error or unknown OID.
1570 /// - Returns [`Error::Io`] on transport-level I/O failures.
1571 pub fn prepare_typed(
1572 &self,
1573 query: &str,
1574 param_types: &[crate::Oid],
1575 ) -> Result<crate::PreparedStatement<'_>> {
1576 let client = match &self.transport {
1577 Transport::Tcp(tcp) => &tcp.client,
1578 Transport::Grpc(_) => {
1579 return Err(Error::feature_not_supported(
1580 "prepared statements are not supported over gRPC transport",
1581 ));
1582 }
1583 };
1584 let inner = client.prepare_typed(query, param_types)?;
1585 crate::PreparedStatement::new(self, inner)
1586 }
1587
1588 /// Returns true if the connection is alive (passive check).
1589 ///
1590 /// This is a lightweight check that does not send any data to the server.
1591 /// For an active health check, use [`ping`](Self::ping).
1592 pub fn is_alive(&self) -> bool {
1593 match &self.transport {
1594 Transport::Tcp(tcp) => tcp.client.is_alive(),
1595 Transport::Grpc(_) => true, // gRPC connections are stateless
1596 }
1597 }
1598
1599 /// Actively checks that the connection is healthy by executing a trivial query.
1600 ///
1601 /// Unlike [`is_alive`](Self::is_alive) which only checks local state,
1602 /// this method sends `SELECT 1` to the server and verifies a response.
1603 ///
1604 /// # Example
1605 ///
1606 /// ```no_run
1607 /// # use hyperdb_api::{Connection, CreateMode, Result};
1608 /// # fn example(conn: &Connection) -> Result<()> {
1609 /// if conn.ping().is_ok() {
1610 /// println!("Connection is healthy");
1611 /// }
1612 /// # Ok(())
1613 /// # }
1614 /// ```
1615 ///
1616 /// # Errors
1617 ///
1618 /// Returns [`Error::Server`] or [`Error::Io`] if the `SELECT 1`
1619 /// round-trip fails — i.e. the connection is no longer usable.
1620 pub fn ping(&self) -> Result<()> {
1621 self.execute_command("SELECT 1")?;
1622 Ok(())
1623 }
1624
1625 /// Returns the process ID of the backend server connection.
1626 ///
1627 /// Returns 0 for gRPC connections (not applicable).
1628 pub fn process_id(&self) -> i32 {
1629 match &self.transport {
1630 Transport::Tcp(tcp) => tcp.client.process_id(),
1631 Transport::Grpc(_) => 0,
1632 }
1633 }
1634
1635 /// Returns the secret key for the backend server connection.
1636 ///
1637 /// This is used for cancellation requests.
1638 /// Returns 0 for gRPC connections (not applicable).
1639 pub fn secret_key(&self) -> i32 {
1640 match &self.transport {
1641 Transport::Tcp(tcp) => tcp.client.secret_key(),
1642 Transport::Grpc(_) => 0,
1643 }
1644 }
1645
1646 /// Returns a server parameter value by name.
1647 ///
1648 /// Server parameters are sent by the server during connection startup.
1649 /// Common parameters include:
1650 /// - `server_version` - The server version string
1651 /// - `server_encoding` - The server's character encoding
1652 /// - `client_encoding` - The client's character encoding
1653 /// - `DateStyle` - Date display format
1654 /// - `TimeZone` - Server timezone
1655 /// - `session_identifier` - Session ID for connection migration (if routing enabled)
1656 ///
1657 /// Returns `None` if the parameter is not known.
1658 ///
1659 /// # Example
1660 ///
1661 /// ```no_run
1662 /// use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
1663 ///
1664 /// fn main() -> Result<()> {
1665 /// let hyper = HyperProcess::new(None, None)?;
1666 /// let conn = Connection::new(&hyper, "test.hyper", CreateMode::CreateIfNotExists)?;
1667 ///
1668 /// if let Some(version) = conn.parameter_status("server_version") {
1669 /// println!("Connected to Hyper version: {}", version);
1670 /// }
1671 /// Ok(())
1672 /// }
1673 /// ```
1674 pub fn parameter_status(&self, name: &str) -> Option<String> {
1675 match &self.transport {
1676 Transport::Tcp(tcp) => tcp.client.parameter_status(name),
1677 Transport::Grpc(_) => None, // gRPC doesn't have server parameters
1678 }
1679 }
1680
1681 /// Sets the notice receiver for this connection.
1682 ///
1683 /// Server notices and warnings are passed to this callback instead of being
1684 /// logged. Pass `None` to restore default logging behavior.
1685 pub fn set_notice_receiver(
1686 &mut self,
1687 receiver: Option<hyperdb_api_core::client::NoticeReceiver>,
1688 ) {
1689 match &mut self.transport {
1690 Transport::Tcp(tcp) => tcp.client.set_notice_receiver(receiver),
1691 Transport::Grpc(_) => {} // gRPC doesn't support notice receivers
1692 }
1693 }
1694
1695 /// Cancels the currently executing query (thread-safe).
1696 ///
1697 /// # Errors
1698 ///
1699 /// - Returns [`Error::FeatureNotSupported`] on gRPC connections — cancellation is not
1700 /// yet implemented for gRPC transport.
1701 /// - Returns [`Error::Connection`] or [`Error::Io`] if the separate
1702 /// cancel-request connection to the server fails.
1703 pub fn cancel(&self) -> Result<()> {
1704 match &self.transport {
1705 Transport::Tcp(tcp) => tcp.client.cancel().map_err(Error::from),
1706 Transport::Grpc(_) => Err(Error::feature_not_supported(
1707 "Query cancellation is not yet supported for gRPC connections.",
1708 )),
1709 }
1710 }
1711
1712 /// Closes the connection, detaching all databases first.
1713 ///
1714 /// # Errors
1715 ///
1716 /// - Returns [`Error::Internal`] wrapping the underlying close failure
1717 /// (its `source` is the transport error) if the client cannot be
1718 /// shut down cleanly.
1719 /// - Returns [`Error::Internal`] wrapping the detach failure if the
1720 /// attached database could not be detached but close itself
1721 /// succeeded.
1722 pub fn close(self) -> Result<()> {
1723 // Detach the attached database to ensure files are flushed and released.
1724 // Always attempt close, even if detach fails.
1725 let detach_err = if let Some(ref db_path) = self.database {
1726 let db_alias = std::path::Path::new(db_path)
1727 .file_stem()
1728 .and_then(|s| s.to_str())
1729 .unwrap_or("db");
1730 self.execute_command(&format!("DETACH DATABASE {}", escape_sql_path(db_alias)))
1731 .err()
1732 } else {
1733 None
1734 };
1735
1736 // Always attempt to close the client to release the connection.
1737 let close_result = match self.transport {
1738 Transport::Tcp(tcp) => tcp.client.close(),
1739 Transport::Grpc(_) => Ok(()), // gRPC connections are stateless
1740 };
1741
1742 if let Err(e) = close_result {
1743 return Err(Error::internal(format!("Failed to close connection: {e}")));
1744 }
1745
1746 if let Some(e) = detach_err {
1747 // Detach failed but close succeeded; surface the detach error.
1748 return Err(Error::internal(format!(
1749 "Failed to detach database during close: {e}"
1750 )));
1751 }
1752
1753 Ok(())
1754 }
1755
1756 /// Unloads the database from memory while keeping the connection active.
1757 ///
1758 /// This executes the `UNLOAD DATABASE` command, which releases the database
1759 /// from memory but keeps the session and connection open. The database can
1760 /// be accessed again by subsequent queries that will automatically reload it.
1761 ///
1762 /// This is useful for releasing memory locks when switching between databases
1763 /// or when working with multiple database files.
1764 ///
1765 /// # Example
1766 ///
1767 /// ```no_run
1768 /// use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
1769 ///
1770 /// fn main() -> Result<()> {
1771 /// let hyper = HyperProcess::new(None, None)?;
1772 /// let conn = Connection::new(&hyper, "test.hyper", CreateMode::Create)?;
1773 ///
1774 /// // Do some work with the database
1775 /// conn.execute_command("CREATE TABLE test (id INT)")?;
1776 ///
1777 /// // Unload from memory (but keep connection)
1778 /// conn.unload_database()?;
1779 ///
1780 /// // Database can still be accessed (will be reloaded automatically)
1781 /// let count: i64 = conn.fetch_scalar("SELECT COUNT(*) FROM test")?;
1782 /// println!("Count: {}", count);
1783 ///
1784 /// Ok(())
1785 /// }
1786 /// ```
1787 ///
1788 /// # Errors
1789 ///
1790 /// Returns [`Error::Server`] if the server rejects the `UNLOAD DATABASE`
1791 /// command (e.g. the database is still in use by another session).
1792 pub fn unload_database(&self) -> Result<()> {
1793 self.execute_command("UNLOAD DATABASE")?;
1794 Ok(())
1795 }
1796
1797 /// Releases the database completely from the session.
1798 ///
1799 /// This executes the `UNLOAD RELEASE` command, which completely releases
1800 /// the database from the session. After this call, the database cannot
1801 /// be accessed until a new connection is established.
1802 ///
1803 /// This is useful for completely freeing database resources when you're
1804 /// done with a database and want to ensure no locks are held.
1805 ///
1806 /// **Note:** This should only be used when the session has exactly one
1807 /// database attached. Hyper does not support `UNLOAD RELEASE` with
1808 /// multiple databases attached to the same session.
1809 ///
1810 /// # Example
1811 ///
1812 /// ```no_run
1813 /// use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
1814 ///
1815 /// fn main() -> Result<()> {
1816 /// let hyper = HyperProcess::new(None, None)?;
1817 /// let conn = Connection::new(&hyper, "test.hyper", CreateMode::Create)?;
1818 ///
1819 /// // Do some work with the database
1820 /// conn.execute_command("CREATE TABLE test (id INT)")?;
1821 ///
1822 /// // Release database completely from session
1823 /// conn.unload_release()?;
1824 ///
1825 /// // Database cannot be accessed after this point without new connection
1826 /// // conn.execute_command("SELECT * FROM test")?; // This would fail
1827 ///
1828 /// Ok(())
1829 /// }
1830 /// ```
1831 ///
1832 /// # Errors
1833 ///
1834 /// Returns [`Error::Server`] if the server rejects `UNLOAD RELEASE`, most
1835 /// commonly because multiple databases are attached to the same session
1836 /// (Hyper only supports `UNLOAD RELEASE` with exactly one attached DB).
1837 pub fn unload_release(&self) -> Result<()> {
1838 self.execute_command("UNLOAD RELEASE")?;
1839 Ok(())
1840 }
1841
1842 // =========================================================================
1843 // Query Statistics
1844 // =========================================================================
1845
1846 /// Enables query statistics collection for this connection.
1847 ///
1848 /// After enabling, each `execute_command()` or `execute_query()` call will
1849 /// capture detailed performance metrics from Hyper. Retrieve them via
1850 /// [`last_query_stats()`](Self::last_query_stats).
1851 ///
1852 /// The provider determines how stats are collected. Use
1853 /// [`LogFileStatsProvider`](crate::LogFileStatsProvider) to parse Hyper's log file (requires local
1854 /// `hyperd.log`), or implement a custom [`QueryStatsProvider`](crate::QueryStatsProvider).
1855 ///
1856 /// # Example
1857 ///
1858 /// ```no_run
1859 /// # use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
1860 /// # fn main() -> Result<()> {
1861 /// # let hyper = HyperProcess::new(None, None)?;
1862 /// # let mut conn = Connection::new(&hyper, "test.hyper", CreateMode::CreateIfNotExists)?;
1863 /// use hyperdb_api::LogFileStatsProvider;
1864 ///
1865 /// // Auto-detect log path from HyperProcess
1866 /// conn.enable_query_stats(LogFileStatsProvider::from_process(&hyper));
1867 ///
1868 /// // Or specify an explicit log path
1869 /// // conn.enable_query_stats(LogFileStatsProvider::new("/path/to/hyperd.log"));
1870 /// # Ok(())
1871 /// # }
1872 /// ```
1873 pub fn enable_query_stats(&mut self, provider: impl QueryStatsProvider + 'static) {
1874 self.stats_provider = Some(Arc::new(provider));
1875 }
1876
1877 /// Disables query statistics collection.
1878 ///
1879 /// After calling this, `last_query_stats()` will return `None`.
1880 pub fn disable_query_stats(&mut self) {
1881 self.stats_provider = None;
1882 if let Ok(mut guard) = self.pending_stats.lock() {
1883 *guard = None;
1884 }
1885 }
1886
1887 /// Returns the query statistics from the most recent query execution.
1888 ///
1889 /// Stats are resolved **lazily** — the log file is read when this method
1890 /// is called, not when the query executes. This is important for streaming
1891 /// queries (`execute_query`), where Hyper writes the execution stats only
1892 /// after the result set is fully consumed.
1893 ///
1894 /// **Call this after consuming the result set** (e.g., after `collect_rows()`,
1895 /// iterating all chunks, or dropping the `Rowset`).
1896 ///
1897 /// Returns `None` if:
1898 /// - Query stats collection is not enabled
1899 /// - No query has been executed yet
1900 /// - Stats could not be found for the last query (e.g., log entry not matched)
1901 ///
1902 /// # Example
1903 ///
1904 /// ```no_run
1905 /// # use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
1906 /// # fn main() -> Result<()> {
1907 /// # let hyper = HyperProcess::new(None, None)?;
1908 /// # let mut conn = Connection::new(&hyper, "test.hyper", CreateMode::CreateIfNotExists)?;
1909 /// # use hyperdb_api::LogFileStatsProvider;
1910 /// # conn.enable_query_stats(LogFileStatsProvider::from_process(&hyper));
1911 /// conn.execute_command("CREATE TABLE t (id INT)")?;
1912 ///
1913 /// if let Some(stats) = conn.last_query_stats() {
1914 /// println!("Total: {}s", stats.elapsed_s);
1915 /// if let Some(ref pre) = stats.pre_execution {
1916 /// println!(" Parse: {:?}s", pre.parsing_time_s);
1917 /// println!(" Compile: {:?}s", pre.compilation_time_s);
1918 /// }
1919 /// if let Some(ref exec) = stats.execution {
1920 /// println!(" Execute: {:?}s", exec.elapsed_s);
1921 /// println!(" Peak mem: {:?} MB", exec.peak_memory_mb);
1922 /// }
1923 /// }
1924 /// # Ok(())
1925 /// # }
1926 /// ```
1927 pub fn last_query_stats(&self) -> Option<QueryStats> {
1928 let provider = self.stats_provider.as_ref()?;
1929 let mut guard = self.pending_stats.lock().ok()?;
1930 let (token, sql) = guard.take()?;
1931 provider.after_query(token, &sql)
1932 }
1933
1934 /// Internal: call provider's `before_query` if stats are enabled.
1935 fn stats_before_query(&self, sql: &str) -> Option<Box<dyn Any + Send>> {
1936 self.stats_provider.as_ref().map(|p| p.before_query(sql))
1937 }
1938
1939 /// Internal: store the pending token+sql for lazy resolution.
1940 fn stats_store_pending(&self, token: Option<Box<dyn Any + Send>>, sql: &str) {
1941 if let Some(token) = token {
1942 if let Ok(mut guard) = self.pending_stats.lock() {
1943 *guard = Some((token, sql.to_string()));
1944 }
1945 }
1946 }
1947}
1948
1949impl Connection {
1950 // =========================================================================
1951 // Transaction Control
1952 // =========================================================================
1953
1954 // -------------------------------------------------------------------
1955 // Raw transaction control (internal)
1956 // -------------------------------------------------------------------
1957 //
1958 // The `*_raw` methods below are `pub(crate)` and form the canonical
1959 // implementation of session-level transaction control. The RAII
1960 // guard at `crate::Transaction` and any internal helper that
1961 // genuinely needs `&self` (rather than the guard's `&mut self`)
1962 // delegate to these.
1963 //
1964 // The matching `pub` methods (`begin_transaction`, `commit`,
1965 // `rollback`) are thin `#[doc(hidden)] #[deprecated]` wrappers
1966 // retained only so any pre-existing downstream caller sees a
1967 // compiler warning rather than a hard break. They will be deleted
1968 // in a future release; the `_raw` methods stay.
1969
1970 /// Issues `BEGIN TRANSACTION`. Crate-internal use only.
1971 pub(crate) fn begin_transaction_raw(&self) -> Result<()> {
1972 self.execute_command("BEGIN TRANSACTION")?;
1973 Ok(())
1974 }
1975
1976 /// Issues `COMMIT`. Crate-internal use only.
1977 pub(crate) fn commit_raw(&self) -> Result<()> {
1978 self.execute_command("COMMIT")?;
1979 Ok(())
1980 }
1981
1982 /// Issues `ROLLBACK`. Crate-internal use only.
1983 pub(crate) fn rollback_raw(&self) -> Result<()> {
1984 self.execute_command("ROLLBACK")?;
1985 Ok(())
1986 }
1987
1988 /// Begins an explicit transaction.
1989 ///
1990 /// **Prefer [`transaction()`](Self::transaction)** — the RAII guard
1991 /// auto-rolls back on drop and cannot leak a half-open transaction
1992 /// across error paths. This method is hidden from generated
1993 /// rustdoc and marked deprecated; it will be removed in a future
1994 /// release.
1995 ///
1996 /// # Errors
1997 ///
1998 /// Returns [`Error::Server`] if the server rejects `BEGIN TRANSACTION`
1999 /// (e.g. a transaction is already open on this session).
2000 #[doc(hidden)]
2001 #[deprecated(
2002 note = "Use `Connection::transaction()` for an RAII guard. This method will be removed \
2003 in a future release."
2004 )]
2005 pub fn begin_transaction(&self) -> Result<()> {
2006 self.begin_transaction_raw()
2007 }
2008
2009 /// Commits the current transaction.
2010 ///
2011 /// **Prefer [`Transaction::commit`](crate::Transaction::commit)** on
2012 /// the RAII guard returned by [`transaction()`](Self::transaction).
2013 /// Hidden from generated rustdoc and deprecated; slated for removal.
2014 ///
2015 /// # Errors
2016 ///
2017 /// Returns [`Error::Server`] if the server rejects `COMMIT`.
2018 #[doc(hidden)]
2019 #[deprecated(
2020 note = "Use `Transaction::commit()` on the RAII guard from `Connection::transaction()`. \
2021 This method will be removed in a future release."
2022 )]
2023 pub fn commit(&self) -> Result<()> {
2024 self.commit_raw()
2025 }
2026
2027 /// Rolls back the current transaction.
2028 ///
2029 /// **Prefer [`Transaction::rollback`](crate::Transaction::rollback)**
2030 /// on the RAII guard returned by [`transaction()`](Self::transaction).
2031 /// Hidden from generated rustdoc and deprecated; slated for removal.
2032 ///
2033 /// # Errors
2034 ///
2035 /// Returns [`Error::Server`] if the server rejects `ROLLBACK`.
2036 #[doc(hidden)]
2037 #[deprecated(
2038 note = "Use `Transaction::rollback()` on the RAII guard from `Connection::transaction()`. \
2039 This method will be removed in a future release."
2040 )]
2041 pub fn rollback(&self) -> Result<()> {
2042 self.rollback_raw()
2043 }
2044
2045 /// Starts a transaction and returns an RAII guard that auto-rolls back on drop.
2046 ///
2047 /// The returned [`Transaction`](crate::Transaction) exclusively borrows this connection,
2048 /// preventing any other use of the connection while the transaction is active.
2049 /// This is enforced at compile time by Rust's borrow checker. The guard provides
2050 /// `commit()` and `rollback()` methods. If dropped without calling either, the
2051 /// transaction is automatically rolled back.
2052 ///
2053 /// # Example
2054 ///
2055 /// ```no_run
2056 /// # use hyperdb_api::{Connection, CreateMode, Result};
2057 /// # fn main() -> Result<()> {
2058 /// # let mut conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
2059 /// let txn = conn.transaction()?;
2060 /// txn.execute_command("INSERT INTO users VALUES (1, 'Alice')")?;
2061 /// txn.commit()?; // or drop `txn` to auto-rollback
2062 /// # Ok(())
2063 /// # }
2064 /// ```
2065 ///
2066 /// # Errors
2067 ///
2068 /// Returns [`Error::Server`] if the server rejects the `BEGIN`
2069 /// statement issued internally by
2070 /// [`Transaction::new`](crate::Transaction).
2071 pub fn transaction(&mut self) -> Result<crate::Transaction<'_>> {
2072 crate::Transaction::new(self)
2073 }
2074}
2075
2076/// Checks if an error indicates an "already exists" condition based on SQLSTATE codes.
2077///
2078/// This function uses `PostgreSQL` SQLSTATE codes to reliably detect duplicate object errors
2079/// regardless of server locale or message formatting. The codes checked are:
2080/// - `42P04`: Database already exists
2081/// - `42710`: Duplicate object
2082/// - `42P06`: Duplicate schema
2083/// - `42P07`: Duplicate table
2084///
2085/// See: <https://www.postgresql.org/docs/current/errcodes-appendix.html>
2086fn is_already_exists_error(err: &Error) -> bool {
2087 err.sqlstate()
2088 .is_some_and(|code| matches!(code, "42P04" | "42710" | "42P06" | "42P07"))
2089}