hyperdb_api/connection.rs
1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Database connection management.
5//!
6//! The [`Connection`] type provides a unified interface for connecting to Hyper
7//! databases via either TCP (`PostgreSQL` wire protocol) or gRPC transport.
8//! The transport is automatically detected from the endpoint URL:
9//!
10//! - `https://` or `http://` → gRPC transport
11//! - Otherwise → TCP transport (e.g., `localhost:7483`)
12
13use std::path::Path;
14
15use hyperdb_api_core::client::Client;
16
17use crate::error::{Error, Result};
18use crate::names::escape_sql_path;
19use crate::process::HyperProcess;
20use crate::result::{Row, Rowset, DEFAULT_BINARY_CHUNK_SIZE};
21use crate::transport::Transport;
22
23use std::any::Any;
24use std::sync::{Arc, Mutex};
25
26use crate::query_stats::{QueryStats, QueryStatsProvider};
27
28/// Trait for types that can be extracted from a scalar query result.
29///
30/// This trait enables the generic [`Connection::execute_scalar_query`] method,
31/// similar to C++'s `executeScalarQuery<T>()` template.
32///
33/// # Implementing Custom Types
34///
35/// You can implement this trait for custom types to use them with `execute_scalar_query`:
36///
37/// ```no_run
38/// # use hyperdb_api::{Row, ScalarValue};
39/// # struct MyType;
40/// # impl MyType { fn parse(s: &str) -> Self { MyType } }
41/// impl ScalarValue for MyType {
42/// fn from_row(row: &Row, col: usize) -> Option<Self> {
43/// row.get_string(col).map(|s| MyType::parse(&s))
44/// }
45/// }
46/// ```
47pub trait ScalarValue: Sized {
48 /// Extracts a value of this type from a row at the given column.
49 fn from_row(row: &Row, col: usize) -> Option<Self>;
50}
51
52impl ScalarValue for i64 {
53 fn from_row(row: &Row, col: usize) -> Option<Self> {
54 row.get_i64(col)
55 }
56}
57
58impl ScalarValue for i32 {
59 fn from_row(row: &Row, col: usize) -> Option<Self> {
60 row.get_i32(col)
61 }
62}
63
64impl ScalarValue for i16 {
65 fn from_row(row: &Row, col: usize) -> Option<Self> {
66 row.get_i16(col)
67 }
68}
69
70impl ScalarValue for f64 {
71 fn from_row(row: &Row, col: usize) -> Option<Self> {
72 row.get_f64(col)
73 }
74}
75
76impl ScalarValue for bool {
77 fn from_row(row: &Row, col: usize) -> Option<Self> {
78 row.get_bool(col)
79 }
80}
81
82impl ScalarValue for String {
83 fn from_row(row: &Row, col: usize) -> Option<Self> {
84 row.get_string(col)
85 }
86}
87
88/// Database creation mode when connecting.
89#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
90pub enum CreateMode {
91 /// Do not create the database. Method will fail if database doesn't exist.
92 #[default]
93 DoNotCreate,
94 /// Create the database. Method will fail if the database already exists.
95 Create,
96 /// Create the database if it doesn't exist.
97 CreateIfNotExists,
98 /// Create the database. If it already exists, drop the old one first.
99 CreateAndReplace,
100}
101
102/// A connection to a Hyper database.
103///
104/// This struct represents an active connection to a Hyper server and optionally
105/// an attached database. The connection is automatically closed when dropped.
106///
107/// # Transport Auto-Detection
108///
109/// The transport is automatically detected from the endpoint URL:
110/// - `https://` or `http://` → gRPC transport (read-only until server supports writes)
111/// - Otherwise → TCP transport (full read/write support)
112///
113/// # CSV / Text Import & Export
114///
115/// For CSV, TSV, and other delimited-text formats, see the [`copy`](crate::copy)
116/// module which provides [`export_csv()`](Self::export_csv),
117/// [`import_csv()`](Self::import_csv), and related methods on this struct.
118///
119/// # Example
120///
121/// ```no_run
122/// use hyperdb_api::{Connection, CreateMode, Result};
123///
124/// fn main() -> Result<()> {
125/// // TCP connection (full read/write)
126/// let conn = Connection::connect("localhost:7483", "example.hyper", CreateMode::CreateIfNotExists)?;
127///
128/// // Execute SQL commands
129/// conn.execute_command("CREATE TABLE test (id INT, name TEXT)")?;
130/// conn.execute_command("INSERT INTO test VALUES (1, 'Hello')")?;
131///
132/// Ok(())
133/// }
134/// ```
135///
136/// ```no_run
137/// # use hyperdb_api::{Connection, CreateMode, Result};
138/// # fn example() -> Result<()> {
139/// // gRPC connection (read-only, auto-detected from URL)
140/// let conn = Connection::connect(
141/// "https://hyper-server.example.com:443",
142/// "example.hyper",
143/// CreateMode::DoNotCreate, // Must be DoNotCreate for gRPC
144/// )?;
145/// # Ok(())
146/// # }
147/// ```
148pub struct Connection {
149 transport: Transport,
150 database: Option<String>,
151 stats_provider: Option<Arc<dyn QueryStatsProvider>>,
152 /// Pending stats token + SQL from the most recent query, resolved lazily.
153 pending_stats: Mutex<Option<(Box<dyn Any + Send>, String)>>,
154}
155
156impl std::fmt::Debug for Connection {
157 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158 f.debug_struct("Connection")
159 .field("database", &self.database)
160 .finish_non_exhaustive()
161 }
162}
163
164impl Connection {
165 /// Creates a new connection to a Hyper instance with a database.
166 ///
167 /// This is the primary way to connect to a running [`HyperProcess`].
168 ///
169 /// # Arguments
170 ///
171 /// * `instance` - The Hyper server instance to connect to.
172 /// * `database_path` - Path to the database file.
173 /// * `create_mode` - How to handle database creation.
174 ///
175 /// # Errors
176 ///
177 /// Returns an error if the connection could not be established.
178 ///
179 /// # Example
180 ///
181 /// ```no_run
182 /// use hyperdb_api::{HyperProcess, Connection, CreateMode, Result};
183 ///
184 /// fn main() -> Result<()> {
185 /// let hyper = HyperProcess::new(None, None)?;
186 /// let conn = Connection::new(&hyper, "database.hyper", CreateMode::CreateIfNotExists)?;
187 /// Ok(())
188 /// }
189 /// ```
190 pub fn new(
191 instance: &HyperProcess,
192 database_path: impl AsRef<Path>,
193 create_mode: CreateMode,
194 ) -> Result<Self> {
195 // Prefer using the connection_endpoint which properly handles UDS/Named Pipes
196 if let Some(conn_endpoint) = instance.connection_endpoint() {
197 return Self::connect_with_endpoint(
198 conn_endpoint,
199 &database_path.as_ref().to_string_lossy(),
200 create_mode,
201 );
202 }
203
204 // Fall back to string endpoint (TCP)
205 let endpoint = instance.require_endpoint()?;
206 Self::connect(
207 endpoint,
208 &database_path.as_ref().to_string_lossy(),
209 create_mode,
210 )
211 }
212
213 /// Connects using a `ConnectionEndpoint` (supports TCP, UDS, and Named Pipes).
214 fn connect_with_endpoint(
215 endpoint: &hyperdb_api_core::client::ConnectionEndpoint,
216 database_path: &str,
217 create_mode: CreateMode,
218 ) -> Result<Self> {
219 let db_path_str = Some(database_path.to_string());
220
221 let config = hyperdb_api_core::client::Config::new().with_user("tableau_internal_user");
222
223 let client = hyperdb_api_core::client::Client::connect_endpoint(endpoint, &config)?;
224
225 let conn = Connection::from_client(client, db_path_str.clone());
226
227 // Handle database creation
228 if let Some(db_path) = db_path_str {
229 conn.handle_creation_mode(&db_path, create_mode)?;
230 conn.attach_and_set_path(&db_path)?;
231 }
232
233 Ok(conn)
234 }
235
236 /// Connects to a Hyper server and optionally attaches a database.
237 ///
238 /// # Arguments
239 ///
240 /// * `endpoint` - The server endpoint (host:port).
241 /// * `database_path` - Path to the database file.
242 /// * `create_mode` - How to handle database creation.
243 ///
244 /// # Errors
245 ///
246 /// Returns an error if the connection could not be established.
247 pub fn connect(endpoint: &str, database_path: &str, create_mode: CreateMode) -> Result<Self> {
248 crate::ConnectionBuilder::new(endpoint)
249 .database(database_path)
250 .create_mode(create_mode)
251 .build()
252 }
253
254 /// Returns a connection builder for advanced configuration.
255 ///
256 /// This is useful when you need to set authentication, timeouts, or
257 /// other advanced options before connecting.
258 #[must_use]
259 pub fn builder(endpoint: &str) -> crate::ConnectionBuilder {
260 crate::ConnectionBuilder::new(endpoint)
261 }
262
263 /// Creates a Connection from a low-level Client (internal use, TCP only).
264 pub(crate) fn from_client(client: Client, database: Option<String>) -> Self {
265 Connection {
266 transport: Transport::Tcp(Box::new(crate::transport::TcpTransport { client })),
267 database,
268 stats_provider: None,
269 pending_stats: Mutex::new(None),
270 }
271 }
272
273 /// Creates a Connection from a Transport (internal use).
274 #[allow(
275 dead_code,
276 reason = "used by ConnectionBuilder for the gRPC path; not reached under non-gRPC feature builds"
277 )]
278 pub(crate) fn from_transport(transport: Transport, database: Option<String>) -> Self {
279 Connection {
280 transport,
281 database,
282 stats_provider: None,
283 pending_stats: Mutex::new(None),
284 }
285 }
286
287 /// Returns the transport type name (e.g., "TCP", "gRPC", "Unix Socket").
288 pub fn transport_type(&self) -> &'static str {
289 self.transport.transport_type().as_str()
290 }
291
292 /// Returns true if this connection supports write operations.
293 ///
294 /// Currently, only TCP connections support writes. gRPC connections are
295 /// read-only until the server supports write operations over gRPC.
296 pub fn supports_writes(&self) -> bool {
297 self.transport.supports_writes()
298 }
299
300 /// Handles database creation logic (internal use).
301 pub(crate) fn handle_creation_mode(
302 &self,
303 database_path: &str,
304 create_mode: CreateMode,
305 ) -> Result<()> {
306 match create_mode {
307 CreateMode::DoNotCreate => {}
308 CreateMode::Create => {
309 self.execute_command(&format!(
310 "CREATE DATABASE {}",
311 escape_sql_path(database_path)
312 ))?;
313 }
314 CreateMode::CreateIfNotExists => {
315 if let Err(e) = self.execute_command(&format!(
316 "CREATE DATABASE IF NOT EXISTS {}",
317 escape_sql_path(database_path)
318 )) {
319 if !is_already_exists_error(&e) {
320 return Err(Error::internal(format!(
321 "Failed to create database '{database_path}': {e}"
322 )));
323 }
324 }
325 }
326 CreateMode::CreateAndReplace => {
327 let _ = self.execute_command(&format!(
328 "DROP DATABASE IF EXISTS {}",
329 escape_sql_path(database_path)
330 ));
331 self.execute_command(&format!(
332 "CREATE DATABASE {}",
333 escape_sql_path(database_path)
334 ))?;
335 }
336 }
337 Ok(())
338 }
339
340 /// Attaches and sets the database path (internal use).
341 pub(crate) fn attach_and_set_path(&self, database_path: &str) -> Result<()> {
342 let db_alias = std::path::Path::new(database_path)
343 .file_stem()
344 .and_then(|s| s.to_str())
345 .unwrap_or("db");
346
347 self.execute_command(&format!(
348 "ATTACH DATABASE {} AS {}",
349 escape_sql_path(database_path),
350 escape_sql_path(db_alias)
351 ))?;
352
353 self.execute_command(&format!(
354 "SET search_path TO {}, public",
355 escape_sql_path(db_alias)
356 ))?;
357
358 Ok(())
359 }
360
361 /// Connects to a Hyper server with authentication.
362 ///
363 /// # Arguments
364 ///
365 /// * `endpoint` - The server endpoint (host:port).
366 /// * `database_path` - Path to the database file.
367 /// * `create_mode` - How to handle database creation.
368 /// * `user` - Username for authentication.
369 /// * `password` - Password for authentication.
370 ///
371 /// # Errors
372 ///
373 /// Returns an error if the connection or authentication fails.
374 pub fn connect_with_auth(
375 endpoint: &str,
376 database_path: &str,
377 create_mode: CreateMode,
378 user: &str,
379 password: &str,
380 ) -> Result<Self> {
381 crate::ConnectionBuilder::new(endpoint)
382 .database(database_path)
383 .create_mode(create_mode)
384 .user(user.to_string())
385 .password(password)
386 .build()
387 }
388
389 /// Creates a connection to a Hyper server without attaching a database.
390 ///
391 /// # Errors
392 ///
393 /// Returns [`Error::Connection`] if the TCP or gRPC handshake fails, and
394 /// [`Error::Io`] if the endpoint cannot be reached.
395 pub fn without_database(endpoint: &str) -> Result<Self> {
396 crate::ConnectionBuilder::new(endpoint).build()
397 }
398
399 /// Executes a SQL command that doesn't return results.
400 ///
401 /// Use this for DDL statements (CREATE, ALTER, DROP) and DML statements
402 /// (INSERT, UPDATE, DELETE).
403 ///
404 /// # Arguments
405 ///
406 /// * `command` - The SQL command to execute.
407 ///
408 /// # Returns
409 ///
410 /// The number of affected rows, or 0 if not applicable.
411 ///
412 /// # Errors
413 ///
414 /// Returns an error if:
415 /// - The connection is using gRPC transport (write operations not yet supported)
416 /// - The command fails to execute
417 pub fn execute_command(&self, command: &str) -> Result<u64> {
418 let token = self.stats_before_query(command);
419
420 let result = self.transport.execute_command(command);
421
422 // For commands, the query is fully executed synchronously, so we
423 // can store the pending token immediately for lazy resolution.
424 self.stats_store_pending(token, command);
425
426 result
427 }
428
429 /// Executes a SQL query and returns a streaming result set.
430 ///
431 /// Results are streamed in chunks (default 64K rows), keeping memory usage
432 /// constant regardless of result set size. This makes it safe for any
433 /// result size, from a single row to billions of rows.
434 ///
435 /// # Example
436 ///
437 /// ```no_run
438 /// # use hyperdb_api::{Connection, Result};
439 /// # fn example(conn: &Connection) -> Result<()> {
440 /// let mut result = conn.execute_query("SELECT id, value FROM measurements")?;
441 /// while let Some(chunk) = result.next_chunk()? {
442 /// for row in &chunk {
443 /// // Generic typed access (like C++ row.get<T>())
444 /// let id: Option<i32> = row.get(0);
445 /// let value: Option<f64> = row.get(1);
446 ///
447 /// // Or direct accessors
448 /// let id = row.get_i32(0);
449 /// let value = row.get_f64(1);
450 /// }
451 /// }
452 /// # Ok(())
453 /// # }
454 /// ```
455 ///
456 /// # Memory Behavior
457 ///
458 /// - Only one chunk is held in memory at a time (~few MB for 64K rows)
459 /// - Safe for result sets of any size (millions/billions of rows)
460 /// - Memory usage is `O(chunk_size)`, not `O(total_rows)`
461 ///
462 /// # Errors
463 ///
464 /// - Returns [`Error::Server`] wrapping a `hyperdb_api_core::client::Error` if the
465 /// SQL fails to parse, execute, or if the server reports an error
466 /// while streaming.
467 /// - Returns [`Error::Io`] on transport-level I/O failures.
468 pub fn execute_query(&self, query: &str) -> Result<Rowset<'_>> {
469 let token = self.stats_before_query(query);
470
471 let result = match &self.transport {
472 Transport::Tcp(tcp) => {
473 let stream = tcp
474 .client
475 .query_streaming(query, DEFAULT_BINARY_CHUNK_SIZE)?;
476 Ok(Rowset::new(stream))
477 }
478 Transport::Grpc(grpc) => {
479 // gRPC streaming: pull chunks lazily so peak memory is
480 // bounded by one gRPC message (tonic default 64 MB), not
481 // by the full result size. Matches TCP's
482 // constant-memory streaming shape.
483 //
484 // The transport module already creates a fresh gRPC client
485 // per query (gRPC client needs &mut self to execute), so we
486 // do the same here: connect, start the stream, wrap as a
487 // `ChunkSource`. The stream keeps the channel and runtime
488 // alive via refcounted handles inside `GrpcChunkStreamSync`.
489 let mut client =
490 hyperdb_api_core::client::grpc::GrpcClientSync::connect(grpc.config.clone())?;
491 let stream = client.execute_query_stream(query)?;
492 let source = Box::new(crate::grpc_connection::GrpcChunkStreamSource::new(stream));
493 let arrow_rowset = crate::arrow_result::ArrowRowset::from_stream(source)?;
494 Ok(Rowset::from_arrow(arrow_rowset))
495 }
496 };
497
498 // Store the pending token — Hyper logs the execution stats after the
499 // result is consumed (streamed), so we defer resolution until
500 // last_query_stats() is called.
501 self.stats_store_pending(token, query);
502
503 result
504 }
505
506 // =========================================================================
507 // Arrow Format Queries
508 // =========================================================================
509
510 /// Executes a SELECT query and returns results as Arrow IPC stream bytes.
511 ///
512 /// # Example
513 ///
514 /// ```no_run
515 /// use hyperdb_api::{Connection, CreateMode, Result};
516 ///
517 /// fn main() -> Result<()> {
518 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
519 ///
520 /// // Create and populate a table
521 /// conn.execute_command("CREATE TABLE data (id INT, value DOUBLE PRECISION)")?;
522 /// conn.execute_command("INSERT INTO data VALUES (1, 1.5), (2, 2.5)")?;
523 ///
524 /// // Get results as Arrow IPC stream
525 /// let arrow_data = conn.execute_query_to_arrow("SELECT * FROM data")?;
526 /// println!("Got {} bytes of Arrow IPC data", arrow_data.len());
527 ///
528 /// Ok(())
529 /// }
530 /// ```
531 ///
532 /// # Errors
533 ///
534 /// Propagates any [`Error::Server`] from the TCP or gRPC transport when
535 /// the query fails or the server cannot produce Arrow IPC output.
536 pub fn execute_query_to_arrow(&self, select_query: &str) -> Result<bytes::Bytes> {
537 self.transport.execute_query_to_arrow(select_query)
538 }
539
540 /// Exports an entire table to Arrow IPC stream format.
541 ///
542 /// This is a convenience method equivalent to
543 /// `execute_query_to_arrow("SELECT * FROM table_name")`.
544 ///
545 /// # Arguments
546 ///
547 /// * `table_name` - The table name
548 ///
549 /// # Returns
550 ///
551 /// Raw Arrow IPC stream bytes containing all rows from the table.
552 ///
553 /// # Example
554 ///
555 /// ```no_run
556 /// use hyperdb_api::{Connection, CreateMode, Result};
557 ///
558 /// fn main() -> Result<()> {
559 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
560 /// let arrow_data = conn.export_table_to_arrow("my_table")?;
561 /// Ok(())
562 /// }
563 /// ```
564 ///
565 /// # Errors
566 ///
567 /// Returns whatever [`execute_query_to_arrow`](Self::execute_query_to_arrow)
568 /// would return for `SELECT * FROM <table_name>` — typically
569 /// [`Error::Server`] if the table does not exist or the query is rejected.
570 pub fn export_table_to_arrow(&self, table_name: &str) -> Result<bytes::Bytes> {
571 self.execute_query_to_arrow(&format!("SELECT * FROM {table_name}"))
572 }
573
574 /// Executes a SELECT query and returns results as Arrow `RecordBatch`es.
575 ///
576 /// This is the recommended method for Arrow-native workflows (`DataFusion`,
577 /// Polars, etc.) where you want direct `RecordBatch` access without going
578 /// through the `Row` abstraction.
579 ///
580 /// # Example
581 ///
582 /// ```no_run
583 /// use hyperdb_api::{Connection, CreateMode, Result};
584 /// use arrow::record_batch::RecordBatch;
585 ///
586 /// fn main() -> Result<()> {
587 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
588 ///
589 /// let batches: Vec<RecordBatch> = conn.execute_query_to_batches("SELECT * FROM data")?;
590 /// for batch in &batches {
591 /// println!("batch: {} rows x {} cols", batch.num_rows(), batch.num_columns());
592 /// }
593 /// Ok(())
594 /// }
595 /// ```
596 ///
597 /// # Errors
598 ///
599 /// - Returns [`Error::Server`] if the query itself fails.
600 /// - Returns [`Error::Conversion`] if the Arrow IPC payload returned by the
601 /// server is malformed and cannot be decoded into record batches.
602 pub fn execute_query_to_batches(
603 &self,
604 select_query: &str,
605 ) -> Result<Vec<arrow::record_batch::RecordBatch>> {
606 let arrow_data = self.execute_query_to_arrow(select_query)?;
607 crate::arrow_result::parse_arrow_ipc(arrow_data)
608 }
609
610 /// Fetches a single row from a query.
611 ///
612 /// Returns an error if the query returns no rows.
613 ///
614 /// # Example
615 ///
616 /// ```no_run
617 /// use hyperdb_api::{Connection, CreateMode, Result};
618 ///
619 /// fn main() -> Result<()> {
620 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
621 /// let row = conn.fetch_one("SELECT * FROM users WHERE id = 1")?;
622 /// let id: Option<i32> = row.get(0);
623 /// let name: Option<String> = row.get(1);
624 /// Ok(())
625 /// }
626 /// ```
627 ///
628 /// # Errors
629 ///
630 /// - Returns the error from [`execute_query`](Self::execute_query) if
631 /// the query itself fails.
632 /// - Returns [`Error::Conversion`] with message `"Query returned no rows"` if
633 /// the query produced zero rows.
634 pub fn fetch_one<Q>(&self, query: Q) -> Result<crate::Row>
635 where
636 Q: AsRef<str>,
637 {
638 let query = query.as_ref();
639 let result = self.execute_query(query)?;
640 result.require_first_row()
641 }
642
643 /// Fetches an optional single row from a query.
644 ///
645 /// Returns `None` if the query returns no rows.
646 ///
647 /// # Example
648 ///
649 /// ```no_run
650 /// use hyperdb_api::{Connection, CreateMode, Result};
651 ///
652 /// fn main() -> Result<()> {
653 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
654 /// if let Some(row) = conn.fetch_optional("SELECT * FROM users WHERE id = 999")? {
655 /// let name: Option<String> = row.get(1);
656 /// println!("Found user: {:?}", name);
657 /// }
658 /// Ok(())
659 /// }
660 /// ```
661 ///
662 /// # Errors
663 ///
664 /// Returns the error from [`execute_query`](Self::execute_query) if the
665 /// query itself fails. An empty result set is not an error — it yields
666 /// `Ok(None)`.
667 pub fn fetch_optional<Q>(&self, query: Q) -> Result<Option<crate::Row>>
668 where
669 Q: AsRef<str>,
670 {
671 let query = query.as_ref();
672 let result = self.execute_query(query)?;
673 result.first_row()
674 }
675
676 /// Fetches all rows from a query.
677 ///
678 /// # Example
679 ///
680 /// ```no_run
681 /// use hyperdb_api::{Connection, CreateMode, Result};
682 ///
683 /// fn main() -> Result<()> {
684 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
685 /// let rows = conn.fetch_all("SELECT * FROM users WHERE active = true ORDER BY name")?;
686 /// for row in rows {
687 /// let id: Option<i32> = row.get(0);
688 /// let name: Option<String> = row.get(1);
689 /// println!("User {}: {:?}", id.unwrap_or(-1), name);
690 /// }
691 /// Ok(())
692 /// }
693 /// ```
694 ///
695 /// # Errors
696 ///
697 /// Returns the error from [`execute_query`](Self::execute_query), or a
698 /// transport error produced while draining every chunk of the streamed
699 /// result set.
700 pub fn fetch_all<Q>(&self, query: Q) -> Result<Vec<crate::Row>>
701 where
702 Q: AsRef<str>,
703 {
704 let query = query.as_ref();
705 let result = self.execute_query(query)?;
706 result.collect_rows()
707 }
708
709 /// Fetches a single row and maps it to a struct using [`FromRow`](crate::FromRow).
710 ///
711 /// Returns an error if the query returns no rows or if mapping fails.
712 ///
713 /// # Example
714 ///
715 /// ```no_run
716 /// use hyperdb_api::{Connection, CreateMode, FromRow, RowAccessor, Result};
717 ///
718 /// struct User { id: i32, name: String }
719 ///
720 /// impl FromRow for User {
721 /// fn from_row(row: RowAccessor<'_>) -> Result<Self> {
722 /// Ok(User {
723 /// id: row.get("id")?,
724 /// name: row.get_opt("name")?.unwrap_or_default(),
725 /// })
726 /// }
727 /// }
728 ///
729 /// fn main() -> Result<()> {
730 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
731 /// let user: User = conn.fetch_one_as("SELECT id, name FROM users WHERE id = 1")?;
732 /// Ok(())
733 /// }
734 /// ```
735 ///
736 /// # Errors
737 ///
738 /// - Returns the error from [`fetch_one`](Self::fetch_one) if the query
739 /// fails or returns no rows.
740 /// - Returns whatever error [`FromRow::from_row`](crate::FromRow::from_row)
741 /// produces when the row cannot be mapped into `T`.
742 pub fn fetch_one_as<T: crate::FromRow>(&self, query: &str) -> Result<T> {
743 let row = self.fetch_one(query)?;
744 let indices = row
745 .schema()
746 .map(crate::row_accessor::RowAccessor::build_indices)
747 .unwrap_or_default();
748 T::from_row(crate::RowAccessor::new(&row, &indices))
749 }
750
751 /// Fetches all rows and maps them to structs using [`FromRow`](crate::FromRow).
752 ///
753 /// # Example
754 ///
755 /// ```no_run
756 /// # use hyperdb_api::{Connection, FromRow, RowAccessor, Result};
757 /// # struct User { id: i32, name: String }
758 /// # impl FromRow for User {
759 /// # fn from_row(row: RowAccessor<'_>) -> Result<Self> {
760 /// # Ok(User { id: row.get("id")?, name: row.get_opt("name")?.unwrap_or_default() })
761 /// # }
762 /// # }
763 /// # fn example(conn: &Connection) -> Result<()> {
764 /// let users: Vec<User> = conn.fetch_all_as("SELECT id, name FROM users")?;
765 /// # Ok(())
766 /// # }
767 /// ```
768 ///
769 /// # Errors
770 ///
771 /// - Returns the error from [`fetch_all`](Self::fetch_all) if the query
772 /// fails.
773 /// - Returns the first error produced by
774 /// [`FromRow::from_row`](crate::FromRow::from_row) on any of the rows.
775 pub fn fetch_all_as<T: crate::FromRow>(&self, query: &str) -> Result<Vec<T>> {
776 let rows = self.fetch_all(query)?;
777 // Build the column-name → index lookup once from the first
778 // row's schema; reuse for every row. All rows in a result set
779 // share the same `Arc<ResultSchema>`, so this is safe.
780 let indices = rows
781 .first()
782 .and_then(crate::result::Row::schema)
783 .map(crate::row_accessor::RowAccessor::build_indices)
784 .unwrap_or_default();
785 rows.iter()
786 .map(|r| T::from_row(crate::RowAccessor::new(r, &indices)))
787 .collect()
788 }
789
790 /// Returns a lazy iterator over rows, mapping each to `T` via
791 /// [`FromRow`].
792 ///
793 /// This is the streaming variant of [`fetch_all_as`](Self::fetch_all_as):
794 /// memory usage is bounded by the chunk size (default 64K rows), not by
795 /// the total row count. Use this for large result sets where collecting
796 /// all rows into a `Vec` would exceed memory limits.
797 ///
798 /// The column-name → index lookup table is built exactly once (on the
799 /// first non-empty chunk) and reused for all rows, so per-row mapping is
800 /// O(1) in column count.
801 ///
802 /// # Example
803 ///
804 /// ```no_run
805 /// # use hyperdb_api::{Connection, CreateMode, FromRow, RowAccessor, Result};
806 /// # struct User { id: i32, name: String }
807 /// # impl FromRow for User {
808 /// # fn from_row(row: RowAccessor<'_>) -> Result<Self> {
809 /// # Ok(User { id: row.get("id")?, name: row.get("name")? })
810 /// # }
811 /// # }
812 /// # fn example(conn: &Connection) -> Result<()> {
813 /// for row_result in conn.stream_as::<User>("SELECT id, name FROM users")? {
814 /// let user = row_result?;
815 /// println!("{}: {}", user.id, user.name);
816 /// }
817 /// # Ok(())
818 /// # }
819 /// ```
820 ///
821 /// # Errors
822 ///
823 /// - The returned `Result` wraps errors detected while *opening* the
824 /// result stream — transport/connection failures, and (on the gRPC
825 /// transport, which establishes the query stream eagerly) SQL parse and
826 /// server errors. On the default TCP transport the query is streamed
827 /// lazily, so SQL errors such as a missing table are typically reported
828 /// as the **first yielded item** rather than by this outer `Result`.
829 /// - Each yielded item is itself a `Result<T>`:
830 /// - `Ok(T)` if the row was successfully mapped via `FromRow`.
831 /// - `Err(e)` for a server/transport error encountered while streaming a
832 /// later chunk, or for a per-row mapping failure (missing column, type
833 /// mismatch, NULL in a non-optional field).
834 ///
835 /// In short: always handle errors *both* on the outer `Result` and on each
836 /// item — do not assume a successfully-returned iterator means the query
837 /// succeeded.
838 ///
839 /// [`FromRow`]: crate::FromRow
840 pub fn stream_as<'a, T>(&'a self, query: &str) -> Result<impl Iterator<Item = Result<T>> + 'a>
841 where
842 T: crate::FromRow + 'a,
843 {
844 let rowset = self.execute_query(query)?;
845 Ok(crate::result::TypedRowIterator::<T>::new(rowset))
846 }
847
848 /// Fetches a single row from a **parameterized** query and maps it to a
849 /// struct using [`FromRow`](crate::FromRow).
850 ///
851 /// This is the parameterized counterpart to
852 /// [`fetch_one_as`](Self::fetch_one_as): it binds `$1`, `$2`, … placeholders
853 /// from `params` (via [`ToSqlParam`](crate::params::ToSqlParam), exactly as
854 /// [`query_params`](Self::query_params) does) and maps the first result row
855 /// into `T`. Use it when a parameterized `SELECT` should yield a typed
856 /// struct rather than a raw [`Row`](crate::Row).
857 ///
858 /// # Example
859 ///
860 /// ```no_run
861 /// # use hyperdb_api::{Connection, FromRow, RowAccessor, Result};
862 /// # struct User { id: i32, name: String }
863 /// # impl FromRow for User {
864 /// # fn from_row(row: RowAccessor<'_>) -> Result<Self> {
865 /// # Ok(User { id: row.get("id")?, name: row.get("name")? })
866 /// # }
867 /// # }
868 /// # fn example(conn: &Connection) -> Result<()> {
869 /// let user: User = conn.fetch_one_as_params(
870 /// "SELECT id, name FROM users WHERE id = $1",
871 /// &[&1i32],
872 /// )?;
873 /// # Ok(())
874 /// # }
875 /// ```
876 ///
877 /// # Errors
878 ///
879 /// - Returns [`Error::FeatureNotSupported`] if the connection is using gRPC
880 /// transport (prepared statements are TCP-only).
881 /// - Returns the error from [`query_params`](Self::query_params) if the
882 /// server rejects the statement at `Parse`, `Bind`, or `Execute` time, or
883 /// on transport-level I/O failures.
884 /// - Returns [`Error::Conversion`] with message `"Query returned no rows"`
885 /// if the query produced zero rows.
886 /// - Returns whatever [`FromRow::from_row`](crate::FromRow::from_row)
887 /// produces when the row cannot be mapped into `T`.
888 pub fn fetch_one_as_params<T: crate::FromRow>(
889 &self,
890 query: &str,
891 params: &[&dyn crate::params::ToSqlParam],
892 ) -> Result<T> {
893 let row = self.query_params(query, params)?.require_first_row()?;
894 let indices = row
895 .schema()
896 .map(crate::row_accessor::RowAccessor::build_indices)
897 .unwrap_or_default();
898 T::from_row(crate::RowAccessor::new(&row, &indices))
899 }
900
901 /// Fetches all rows from a **parameterized** query and maps them to structs
902 /// using [`FromRow`](crate::FromRow).
903 ///
904 /// This is the parameterized counterpart to
905 /// [`fetch_all_as`](Self::fetch_all_as): it binds `$1`, `$2`, … placeholders
906 /// from `params` (see [`query_params`](Self::query_params)) and maps every
907 /// result row into `T`.
908 ///
909 /// # Example
910 ///
911 /// ```no_run
912 /// # use hyperdb_api::{Connection, FromRow, RowAccessor, Result};
913 /// # struct User { id: i32, name: String }
914 /// # impl FromRow for User {
915 /// # fn from_row(row: RowAccessor<'_>) -> Result<Self> {
916 /// # Ok(User { id: row.get("id")?, name: row.get("name")? })
917 /// # }
918 /// # }
919 /// # fn example(conn: &Connection) -> Result<()> {
920 /// let users: Vec<User> = conn.fetch_all_as_params(
921 /// "SELECT id, name FROM users WHERE org_id = $1",
922 /// &[&42i32],
923 /// )?;
924 /// # Ok(())
925 /// # }
926 /// ```
927 ///
928 /// # Errors
929 ///
930 /// - Returns [`Error::FeatureNotSupported`] if the connection is using gRPC
931 /// transport.
932 /// - Returns the error from [`query_params`](Self::query_params) if the
933 /// server rejects the statement, or on transport-level I/O failures.
934 /// - Returns the first error produced by
935 /// [`FromRow::from_row`](crate::FromRow::from_row) on any of the rows.
936 pub fn fetch_all_as_params<T: crate::FromRow>(
937 &self,
938 query: &str,
939 params: &[&dyn crate::params::ToSqlParam],
940 ) -> Result<Vec<T>> {
941 let rows = self.query_params(query, params)?.collect_rows()?;
942 // Build the column-name → index lookup once from the first row's
943 // schema; reuse for every row. See `fetch_all_as`.
944 let indices = rows
945 .first()
946 .and_then(crate::result::Row::schema)
947 .map(crate::row_accessor::RowAccessor::build_indices)
948 .unwrap_or_default();
949 rows.iter()
950 .map(|r| T::from_row(crate::RowAccessor::new(r, &indices)))
951 .collect()
952 }
953
954 /// Returns a lazy iterator over the rows of a **parameterized** query,
955 /// mapping each to `T` via [`FromRow`].
956 ///
957 /// This is the parameterized counterpart to
958 /// [`stream_as`](Self::stream_as): it binds `$1`, `$2`, … placeholders from
959 /// `params` (see [`query_params`](Self::query_params)) and streams the
960 /// result, mapping each row into `T` while holding only one transport chunk
961 /// in memory at a time. The column-index map is built once on the first
962 /// chunk and reused, so per-row mapping is O(1) in the column count.
963 ///
964 /// # Example
965 ///
966 /// ```no_run
967 /// # use hyperdb_api::{Connection, FromRow, RowAccessor, Result};
968 /// # struct User { id: i32, name: String }
969 /// # impl FromRow for User {
970 /// # fn from_row(row: RowAccessor<'_>) -> Result<Self> {
971 /// # Ok(User { id: row.get("id")?, name: row.get("name")? })
972 /// # }
973 /// # }
974 /// # fn example(conn: &Connection) -> Result<()> {
975 /// for row_result in conn.stream_as_params::<User>(
976 /// "SELECT id, name FROM users WHERE org_id = $1",
977 /// &[&42i32],
978 /// )? {
979 /// let user = row_result?;
980 /// println!("{}: {}", user.id, user.name);
981 /// }
982 /// # Ok(())
983 /// # }
984 /// ```
985 ///
986 /// # Errors
987 ///
988 /// - The returned outer `Result` wraps errors detected while *opening* the
989 /// stream: [`Error::FeatureNotSupported`] on gRPC transport, and any
990 /// `Parse`/`Bind` rejection or transport failure surfaced by
991 /// [`query_params`](Self::query_params).
992 /// - Each yielded item is itself a `Result<T>`: `Ok(T)` when the row mapped
993 /// cleanly, or `Err(e)` for a per-row mapping failure (missing column,
994 /// type mismatch, NULL in a non-optional field) or a server/transport
995 /// error hit while streaming a later chunk.
996 ///
997 /// As with [`stream_as`](Self::stream_as), always handle errors *both* on
998 /// the outer `Result` and on each item.
999 ///
1000 /// [`FromRow`]: crate::FromRow
1001 pub fn stream_as_params<'a, T>(
1002 &'a self,
1003 query: &str,
1004 params: &[&dyn crate::params::ToSqlParam],
1005 ) -> Result<impl Iterator<Item = Result<T>> + 'a>
1006 where
1007 T: crate::FromRow + 'a,
1008 {
1009 // `query_params` returns a Rowset that already carries the prepared
1010 // statement guard, so Drop ordering (close_statement after the rowset
1011 // releases its connection lock) is preserved with no extra work here.
1012 let rowset = self.query_params(query, params)?;
1013 Ok(crate::result::TypedRowIterator::<T>::new(rowset))
1014 }
1015
1016 /// Fetches a single scalar value from a query.
1017 ///
1018 /// Returns an error if the query returns no rows or NULL.
1019 ///
1020 /// # Example
1021 ///
1022 /// ```no_run
1023 /// use hyperdb_api::{Connection, CreateMode, Result};
1024 ///
1025 /// fn main() -> Result<()> {
1026 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
1027 /// let count: i64 = conn.fetch_scalar("SELECT COUNT(*) FROM users")?;
1028 /// println!("User count: {}", count);
1029 /// Ok(())
1030 /// }
1031 /// ```
1032 ///
1033 /// # Errors
1034 ///
1035 /// - Returns the error from [`execute_query`](Self::execute_query) if
1036 /// the query itself fails.
1037 /// - Returns [`Error::Conversion`] with message `"Query returned no rows"` if
1038 /// the query produced zero rows.
1039 /// - Returns [`Error::Conversion`] with message `"Scalar query returned NULL"`
1040 /// if the single cell is SQL `NULL`.
1041 pub fn fetch_scalar<T, Q>(&self, query: Q) -> Result<T>
1042 where
1043 T: crate::connection::ScalarValue + crate::result::RowValue,
1044 Q: AsRef<str>,
1045 {
1046 let query = query.as_ref();
1047 let result = self.execute_query(query)?;
1048 result.require_scalar()
1049 }
1050
1051 /// Fetches an optional scalar value from a query.
1052 ///
1053 /// Returns `None` if the query returns no rows or NULL.
1054 ///
1055 /// # Example
1056 ///
1057 /// ```no_run
1058 /// use hyperdb_api::{Connection, CreateMode, Result};
1059 ///
1060 /// fn main() -> Result<()> {
1061 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
1062 /// let max_id: Option<i32> = conn.fetch_optional_scalar("SELECT MAX(id) FROM users")?;
1063 /// println!("Max ID: {:?}", max_id);
1064 /// Ok(())
1065 /// }
1066 /// ```
1067 ///
1068 /// # Errors
1069 ///
1070 /// - Returns the error from [`execute_query`](Self::execute_query) if
1071 /// the query itself fails.
1072 /// - Returns [`Error::Conversion`] with message `"Query returned no rows"` if
1073 /// the query produced zero rows. (An empty result is treated as an
1074 /// error here because we need at least one row to inspect; SQL `NULL`
1075 /// in the single cell yields `Ok(None)`.)
1076 pub fn fetch_optional_scalar<T, Q>(&self, query: Q) -> Result<Option<T>>
1077 where
1078 T: crate::connection::ScalarValue + crate::result::RowValue,
1079 Q: AsRef<str>,
1080 {
1081 let query = query.as_ref();
1082 let result = self.execute_query(query)?;
1083 result.scalar()
1084 }
1085
1086 /// Executes a scalar query and returns a single value of type `T`.
1087 ///
1088 /// Alias for [`fetch_optional_scalar`](Self::fetch_optional_scalar) for C++ API compatibility.
1089 ///
1090 /// # Errors
1091 ///
1092 /// See [`fetch_optional_scalar`](Self::fetch_optional_scalar).
1093 #[inline]
1094 pub fn execute_scalar_query<T>(&self, query: &str) -> Result<Option<T>>
1095 where
1096 T: ScalarValue + crate::result::RowValue,
1097 {
1098 self.fetch_optional_scalar(query)
1099 }
1100
1101 /// Queries for a count value, defaulting to 0 if NULL.
1102 ///
1103 /// This is optimized for COUNT queries which typically return 0
1104 /// instead of NULL when there are no matching rows.
1105 ///
1106 /// # Example
1107 ///
1108 /// ```no_run
1109 /// use hyperdb_api::{Connection, CreateMode, Result};
1110 ///
1111 /// fn main() -> Result<()> {
1112 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
1113 /// let count = conn.query_count("SELECT COUNT(*) FROM users WHERE active = true")?;
1114 /// println!("Active users: {}", count);
1115 /// Ok(())
1116 /// }
1117 /// ```
1118 ///
1119 /// # Errors
1120 ///
1121 /// Returns the error from [`execute_query`](Self::execute_query) if the
1122 /// query fails or produces no rows. SQL `NULL` is mapped to `0`, not an
1123 /// error.
1124 pub fn query_count(&self, query: &str) -> Result<i64> {
1125 self.fetch_optional_scalar::<i64, _>(query)
1126 .map(|opt| opt.unwrap_or(0))
1127 }
1128
1129 // =========================================================================
1130 // Parameterized Queries (SQL Injection Safe)
1131 // =========================================================================
1132
1133 /// Executes a parameterized query, returning streaming results.
1134 ///
1135 /// This is safe to use with untrusted user input: parameters travel
1136 /// through the extended query protocol (Parse/Bind/Execute) as
1137 /// binary `HyperBinary` values and are never interpolated into the
1138 /// SQL string. For repeated executions of the same SQL with different
1139 /// values, prefer the explicit [`prepare`](Self::prepare) API — it
1140 /// returns a reusable [`PreparedStatement`](crate::PreparedStatement)
1141 /// that skips the Parse round-trip on every call.
1142 ///
1143 /// Under the hood, `query_params` is a one-shot
1144 /// prepare+execute+close: it prepares an unnamed statement, binds
1145 /// the parameters, starts streaming, and closes the statement when
1146 /// the returned [`Rowset`] is dropped.
1147 ///
1148 /// # Arguments
1149 ///
1150 /// * `query` - The SQL query with parameter placeholders (`$1`, `$2`, etc.)
1151 /// * `params` - Parameter values matching the placeholders
1152 ///
1153 /// # SQL Injection Prevention
1154 ///
1155 /// ```no_run
1156 /// use hyperdb_api::{Connection, CreateMode, Result};
1157 ///
1158 /// fn search_users(conn: &Connection, user_input: &str) -> Result<()> {
1159 /// // DANGEROUS - vulnerable to SQL injection:
1160 /// // let query = format!("SELECT * FROM users WHERE name = '{}'", user_input);
1161 ///
1162 /// // SAFE - parameterized query:
1163 /// let mut result = conn.query_params(
1164 /// "SELECT * FROM users WHERE name = $1",
1165 /// &[&user_input],
1166 /// )?;
1167 ///
1168 /// while let Some(chunk) = result.next_chunk()? {
1169 /// for row in &chunk {
1170 /// let id: Option<i32> = row.get(0);
1171 /// let name: Option<String> = row.get(1);
1172 /// println!("Found: {:?} - {:?}", id, name);
1173 /// }
1174 /// }
1175 /// Ok(())
1176 /// }
1177 /// ```
1178 ///
1179 /// # Multiple Parameters
1180 ///
1181 /// ```no_run
1182 /// use hyperdb_api::{Connection, CreateMode, Result};
1183 ///
1184 /// fn main() -> Result<()> {
1185 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
1186 ///
1187 /// // Multiple parameters of different types
1188 /// let result = conn.query_params(
1189 /// "SELECT * FROM orders WHERE customer_id = $1 AND total > $2",
1190 /// &[&42i32, &100.0f64],
1191 /// )?;
1192 /// Ok(())
1193 /// }
1194 /// ```
1195 ///
1196 /// # Errors
1197 ///
1198 /// - Returns [`Error::FeatureNotSupported`] if the connection is using gRPC transport
1199 /// (prepared statements are TCP-only).
1200 /// - Returns [`Error::Server`] if the server rejects the statement at
1201 /// `Parse`, `Bind`, or `Execute` time, including on type-mismatch
1202 /// between `params` and the inferred OIDs.
1203 /// - Returns [`Error::Io`] on transport-level I/O failures.
1204 pub fn query_params(
1205 &self,
1206 query: &str,
1207 params: &[&dyn crate::params::ToSqlParam],
1208 ) -> Result<Rowset<'_>> {
1209 // Implementation note: routes through the extended query protocol
1210 // via Parse/Bind/Execute so parameters travel in HyperBinary
1211 // format — no SQL escaping, full SQL-injection safety regardless of
1212 // parameter content. The statement handle is stashed inside the
1213 // returned Rowset so its Drop-time close_statement fires *after*
1214 // the rowset releases its connection lock (otherwise the close
1215 // would deadlock on the still-held mutex).
1216 let client = match &self.transport {
1217 Transport::Tcp(tcp) => &tcp.client,
1218 Transport::Grpc(_) => {
1219 return Err(Error::feature_not_supported(
1220 "prepared statements are not supported over gRPC transport",
1221 ));
1222 }
1223 };
1224 let oids: Vec<crate::Oid> = params.iter().map(|p| p.sql_oid()).collect();
1225 let stmt = client.prepare_typed(query, &oids)?;
1226 let encoded: Vec<Option<Vec<u8>>> = params.iter().map(|p| p.encode_param()).collect();
1227 let stream =
1228 client.execute_streaming(&stmt, encoded, crate::result::DEFAULT_BINARY_CHUNK_SIZE)?;
1229 Ok(Rowset::from_prepared(stream).with_statement_guard(stmt))
1230 }
1231
1232 /// Executes a parameterized command that doesn't return rows.
1233 ///
1234 /// Use this for INSERT, UPDATE, DELETE, or DDL statements with parameters.
1235 /// Returns the number of affected rows.
1236 ///
1237 /// See [`query_params`](Self::query_params) for details on parameter
1238 /// handling and SQL injection prevention.
1239 ///
1240 /// # Example
1241 ///
1242 /// ```no_run
1243 /// use hyperdb_api::{Connection, CreateMode, Result};
1244 ///
1245 /// fn delete_user(conn: &Connection, user_id: i32) -> Result<u64> {
1246 /// // Safe from SQL injection
1247 /// conn.command_params("DELETE FROM users WHERE id = $1", &[&user_id])
1248 /// }
1249 /// ```
1250 ///
1251 /// # Errors
1252 ///
1253 /// - Returns [`Error::FeatureNotSupported`] if the connection is using gRPC transport.
1254 /// - Returns [`Error::Server`] if the server rejects the statement at
1255 /// `Parse`, `Bind`, or `Execute` time.
1256 /// - Returns [`Error::Io`] on transport-level I/O failures.
1257 pub fn command_params(
1258 &self,
1259 query: &str,
1260 params: &[&dyn crate::params::ToSqlParam],
1261 ) -> Result<u64> {
1262 // One-shot prepare+execute with explicit OIDs — see `query_params`
1263 // for why we collect OIDs from each parameter.
1264 let client = match &self.transport {
1265 Transport::Tcp(tcp) => &tcp.client,
1266 Transport::Grpc(_) => {
1267 return Err(Error::feature_not_supported(
1268 "prepared statements are not supported over gRPC transport",
1269 ));
1270 }
1271 };
1272 let oids: Vec<crate::Oid> = params.iter().map(|p| p.sql_oid()).collect();
1273 let stmt = client.prepare_typed(query, &oids)?;
1274 let encoded: Vec<Option<Vec<u8>>> = params.iter().map(|p| p.encode_param()).collect();
1275 Ok(client.execute_no_result(&stmt, encoded)?)
1276 }
1277
1278 /// Executes multiple SQL statements in a single call.
1279 ///
1280 /// Each statement is executed sequentially. If any statement fails,
1281 /// execution stops and the error is returned. Returns the total number
1282 /// of affected rows across all statements.
1283 ///
1284 /// This is more efficient than calling `execute_command` in a loop
1285 /// because it reduces round-trips for DDL scripts and multi-statement setup.
1286 ///
1287 /// # Example
1288 ///
1289 /// ```no_run
1290 /// use hyperdb_api::{Connection, CreateMode, Result};
1291 ///
1292 /// fn main() -> Result<()> {
1293 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
1294 /// let total = conn.execute_batch(&[
1295 /// "CREATE TABLE users (id INT, name TEXT)",
1296 /// "INSERT INTO users VALUES (1, 'Alice')",
1297 /// "INSERT INTO users VALUES (2, 'Bob')",
1298 /// ])?;
1299 /// println!("Total affected: {}", total);
1300 /// Ok(())
1301 /// }
1302 /// ```
1303 ///
1304 /// # Errors
1305 ///
1306 /// Returns a wrapped [`Error::Internal`] on the first statement that fails;
1307 /// its `source` is the original [`Error::Server`] from
1308 /// [`execute_command`](Self::execute_command). The error message
1309 /// includes the failing statement's ordinal and an 80-character preview
1310 /// of its SQL.
1311 pub fn execute_batch(&self, statements: &[&str]) -> Result<u64> {
1312 let mut total = 0u64;
1313 for (i, stmt) in statements.iter().enumerate() {
1314 if !stmt.trim().is_empty() {
1315 total += self.execute_command(stmt).map_err(|e| {
1316 let preview: String = stmt.chars().take(80).collect();
1317 Error::internal(format!(
1318 "execute_batch failed at statement {} of {}: {}: {}",
1319 i + 1,
1320 statements.len(),
1321 preview,
1322 e,
1323 ))
1324 })?;
1325 }
1326 }
1327 Ok(total)
1328 }
1329
1330 /// Returns the attached database path, if any.
1331 pub fn database(&self) -> Option<&str> {
1332 self.database.as_deref()
1333 }
1334
1335 /// Creates a new database file.
1336 ///
1337 /// # Example
1338 ///
1339 /// ```no_run
1340 /// use hyperdb_api::{Connection, Result};
1341 ///
1342 /// fn main() -> Result<()> {
1343 /// let conn = Connection::without_database("localhost:7483")?;
1344 /// conn.create_database("new_database.hyper")?;
1345 /// Ok(())
1346 /// }
1347 /// ```
1348 ///
1349 /// # Errors
1350 ///
1351 /// Returns [`Error::Server`] if the server rejects the
1352 /// `CREATE DATABASE IF NOT EXISTS` statement (e.g. the path is not
1353 /// writable on the server).
1354 pub fn create_database(&self, path: &str) -> Result<()> {
1355 let sql = format!("CREATE DATABASE IF NOT EXISTS {}", escape_sql_path(path));
1356 self.execute_command(&sql)?;
1357 Ok(())
1358 }
1359
1360 /// Drops (deletes) a database file.
1361 ///
1362 /// # Example
1363 ///
1364 /// ```no_run
1365 /// use hyperdb_api::{Connection, Result};
1366 ///
1367 /// fn main() -> Result<()> {
1368 /// let conn = Connection::without_database("localhost:7483")?;
1369 /// conn.drop_database("old_database.hyper")?;
1370 /// Ok(())
1371 /// }
1372 /// ```
1373 ///
1374 /// # Errors
1375 ///
1376 /// Returns [`Error::Server`] if the server rejects the
1377 /// `DROP DATABASE IF EXISTS` statement (e.g. the database is still
1378 /// attached or permissions deny deletion).
1379 pub fn drop_database(&self, path: &str) -> Result<()> {
1380 let sql = format!("DROP DATABASE IF EXISTS {}", escape_sql_path(path));
1381 self.execute_command(&sql)?;
1382 Ok(())
1383 }
1384
1385 /// Attaches a database file to the connection.
1386 ///
1387 /// Once attached, the database can be queried and modified.
1388 /// The database is identified by its alias (or by its path if no alias is provided).
1389 ///
1390 /// # Arguments
1391 ///
1392 /// * `path` - The path to the database file to attach.
1393 /// * `alias` - Optional alias for the database. If `None`, the database is
1394 /// attached without an explicit alias (typically using its filename).
1395 ///
1396 /// # Errors
1397 ///
1398 /// Returns an error if the database file doesn't exist or if attachment fails.
1399 ///
1400 /// # Example
1401 ///
1402 /// ```no_run
1403 /// use hyperdb_api::{Connection, Result};
1404 ///
1405 /// fn main() -> Result<()> {
1406 /// let conn = Connection::without_database("localhost:7483")?;
1407 ///
1408 /// // Attach with an alias
1409 /// conn.attach_database("data.hyper", Some("mydata"))?;
1410 ///
1411 /// // Attach without an alias
1412 /// conn.attach_database("other.hyper", None)?;
1413 /// Ok(())
1414 /// }
1415 /// ```
1416 pub fn attach_database(&self, path: &str, alias: Option<&str>) -> Result<()> {
1417 let sql = if let Some(alias) = alias {
1418 format!(
1419 "ATTACH DATABASE {} AS {}",
1420 escape_sql_path(path),
1421 escape_sql_path(alias)
1422 )
1423 } else {
1424 format!("ATTACH DATABASE {}", escape_sql_path(path))
1425 };
1426 self.execute_command(&sql)?;
1427 Ok(())
1428 }
1429
1430 /// Detaches a database from this connection.
1431 ///
1432 /// After detaching, the database file is released and can be accessed
1433 /// externally (e.g., copied, moved, etc.). All pending updates are
1434 /// written to disk before detaching.
1435 ///
1436 /// # Arguments
1437 ///
1438 /// * `alias` - The alias of the database to detach.
1439 ///
1440 /// # Errors
1441 ///
1442 /// Returns an error if the database is not attached or if detachment fails.
1443 ///
1444 /// # Example
1445 ///
1446 /// ```no_run
1447 /// use hyperdb_api::{Connection, Result};
1448 ///
1449 /// fn main() -> Result<()> {
1450 /// let conn = Connection::without_database("localhost:7483")?;
1451 /// conn.attach_database("data.hyper", Some("mydata"))?;
1452 /// // ... work with the database ...
1453 /// conn.detach_database("mydata")?;
1454 /// Ok(())
1455 /// }
1456 /// ```
1457 pub fn detach_database(&self, alias: &str) -> Result<()> {
1458 let sql = format!("DETACH DATABASE {}", escape_sql_path(alias));
1459 self.execute_command(&sql)?;
1460 Ok(())
1461 }
1462
1463 /// Detaches all databases from this connection.
1464 ///
1465 /// This is useful for cleanup before closing a connection or when
1466 /// you need to release all database files.
1467 ///
1468 /// # Errors
1469 ///
1470 /// Returns [`Error::Server`] if the server rejects the
1471 /// `DETACH ALL DATABASES` statement (e.g. a database is still in use by
1472 /// another session).
1473 pub fn detach_all_databases(&self) -> Result<()> {
1474 self.execute_command("DETACH ALL DATABASES")?;
1475 Ok(())
1476 }
1477
1478 /// Creates a schema in the database.
1479 ///
1480 /// # Errors
1481 ///
1482 /// - Returns an error if `schema_name` cannot be converted into a
1483 /// [`SchemaName`](crate::SchemaName) (invalid identifier).
1484 /// - Returns [`Error::Server`] if the server rejects the
1485 /// `CREATE SCHEMA` statement (e.g. the schema already exists).
1486 pub fn create_schema<T>(&self, schema_name: T) -> Result<()>
1487 where
1488 T: TryInto<crate::SchemaName>,
1489 crate::Error: From<T::Error>,
1490 {
1491 crate::catalog::Catalog::new(self).create_schema(schema_name)
1492 }
1493
1494 /// Checks whether a schema exists.
1495 ///
1496 /// # Arguments
1497 ///
1498 /// * `schema` - The schema name (can include database qualifier).
1499 ///
1500 /// # Example
1501 ///
1502 /// ```no_run
1503 /// use hyperdb_api::{Connection, Result};
1504 ///
1505 /// fn main() -> Result<()> {
1506 /// let conn = Connection::without_database("localhost:7483")?;
1507 /// if conn.has_schema("public")? {
1508 /// println!("Schema 'public' exists");
1509 /// }
1510 /// Ok(())
1511 /// }
1512 /// ```
1513 ///
1514 /// # Errors
1515 ///
1516 /// - Returns an error if `schema` cannot be converted into a
1517 /// [`SchemaName`](crate::SchemaName).
1518 /// - Returns [`Error::Server`] if the catalog lookup query fails.
1519 pub fn has_schema<T>(&self, schema: T) -> Result<bool>
1520 where
1521 T: TryInto<crate::SchemaName>,
1522 crate::Error: From<T::Error>,
1523 {
1524 use crate::catalog::Catalog;
1525 Catalog::new(self).has_schema(schema)
1526 }
1527
1528 /// Checks whether a table exists.
1529 ///
1530 /// # Arguments
1531 ///
1532 /// * `table_name` - The table name (can include database and schema qualifiers).
1533 ///
1534 /// # Example
1535 ///
1536 /// ```no_run
1537 /// use hyperdb_api::{Connection, Result};
1538 ///
1539 /// fn main() -> Result<()> {
1540 /// let conn = Connection::without_database("localhost:7483")?;
1541 /// if conn.has_table("public.users")? {
1542 /// println!("Table 'users' exists");
1543 /// }
1544 /// Ok(())
1545 /// }
1546 /// ```
1547 ///
1548 /// # Errors
1549 ///
1550 /// - Returns an error if `table_name` cannot be converted into a
1551 /// [`TableName`](crate::TableName).
1552 /// - Returns [`Error::Server`] if the catalog lookup query fails.
1553 pub fn has_table<T>(&self, table_name: T) -> Result<bool>
1554 where
1555 T: TryInto<crate::TableName>,
1556 crate::Error: From<T::Error>,
1557 {
1558 use crate::catalog::Catalog;
1559 Catalog::new(self).has_table(table_name)
1560 }
1561
1562 /// Returns the server version as a parsed struct.
1563 ///
1564 /// Returns `None` if the version cannot be determined (e.g., gRPC connection).
1565 ///
1566 /// # Example
1567 ///
1568 /// ```no_run
1569 /// use hyperdb_api::{Connection, CreateMode, HyperProcess, ServerVersion, Result};
1570 ///
1571 /// fn main() -> Result<()> {
1572 /// let hyper = HyperProcess::new(None, None)?;
1573 /// let conn = Connection::new(&hyper, "test.hyper", CreateMode::CreateIfNotExists)?;
1574 /// if let Some(version) = conn.server_version() {
1575 /// println!("Hyper {}", version);
1576 /// if version >= ServerVersion::new(0, 1, 0) {
1577 /// println!("Has feature X");
1578 /// }
1579 /// }
1580 /// Ok(())
1581 /// }
1582 /// ```
1583 pub fn server_version(&self) -> Option<crate::ServerVersion> {
1584 let version_str = self.parameter_status("server_version")?;
1585 crate::ServerVersion::parse(&version_str)
1586 }
1587
1588 /// Copies a database file to a new path.
1589 ///
1590 /// The source database must be attached to this connection.
1591 ///
1592 /// # Example
1593 ///
1594 /// ```no_run
1595 /// use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
1596 ///
1597 /// fn main() -> Result<()> {
1598 /// let hyper = HyperProcess::new(None, None)?;
1599 /// let conn = Connection::new(&hyper, "source.hyper", CreateMode::DoNotCreate)?;
1600 /// conn.copy_database("source.hyper", "backup.hyper")?;
1601 /// Ok(())
1602 /// }
1603 /// ```
1604 ///
1605 /// # Errors
1606 ///
1607 /// Returns [`Error::Server`] if the server rejects the
1608 /// `COPY DATABASE` statement — e.g. the source is not attached, the
1609 /// destination path is not writable, or it already exists.
1610 pub fn copy_database(&self, source: &str, destination: &str) -> Result<()> {
1611 let sql = format!(
1612 "COPY DATABASE {} TO {}",
1613 escape_sql_path(source),
1614 escape_sql_path(destination)
1615 );
1616 self.execute_command(&sql)?;
1617 Ok(())
1618 }
1619
1620 /// Executes EXPLAIN on a query and returns the plan as a string.
1621 ///
1622 /// # Example
1623 ///
1624 /// ```no_run
1625 /// use hyperdb_api::{Connection, CreateMode, Result};
1626 ///
1627 /// fn main() -> Result<()> {
1628 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
1629 /// let plan = conn.explain("SELECT * FROM users WHERE id = 1")?;
1630 /// println!("{}", plan);
1631 /// Ok(())
1632 /// }
1633 /// ```
1634 ///
1635 /// # Errors
1636 ///
1637 /// Returns [`Error::Server`] if `EXPLAIN <query>` fails to parse or
1638 /// plan, or if the streamed result cannot be consumed.
1639 pub fn explain(&self, query: &str) -> Result<String> {
1640 let explain_sql = format!("EXPLAIN {query}");
1641 let result = self.execute_query(&explain_sql)?;
1642 let mut lines = Vec::new();
1643 for row in result.rows() {
1644 let row = row?;
1645 if let Some(line) = row.get::<String>(0) {
1646 lines.push(line);
1647 }
1648 }
1649 Ok(lines.join("\n"))
1650 }
1651
1652 /// Executes EXPLAIN ANALYZE on a query and returns the plan with timing info.
1653 ///
1654 /// **Note:** This actually executes the query to collect timing information.
1655 ///
1656 /// # Errors
1657 ///
1658 /// Returns [`Error::Server`] if `EXPLAIN ANALYZE <query>` fails — this
1659 /// includes any runtime error raised by actually executing `query`.
1660 pub fn explain_analyze(&self, query: &str) -> Result<String> {
1661 let explain_sql = format!("EXPLAIN ANALYZE {query}");
1662 let result = self.execute_query(&explain_sql)?;
1663 let mut lines = Vec::new();
1664 for row in result.rows() {
1665 let row = row?;
1666 if let Some(line) = row.get::<String>(0) {
1667 lines.push(line);
1668 }
1669 }
1670 Ok(lines.join("\n"))
1671 }
1672
1673 /// Returns a reference to the underlying TCP client.
1674 ///
1675 /// # Panics
1676 ///
1677 /// This method returns `None` if the connection is using gRPC transport.
1678 pub fn tcp_client(&self) -> Option<&Client> {
1679 match &self.transport {
1680 Transport::Tcp(tcp) => Some(&tcp.client),
1681 Transport::Grpc(_) => None,
1682 }
1683 }
1684
1685 /// Crate-internal accessor for the transport. Used by
1686 /// [`PreparedStatement`](crate::PreparedStatement) to reach the
1687 /// underlying `hyperdb_api_core::client::Client`.
1688 pub(crate) fn transport(&self) -> &Transport {
1689 &self.transport
1690 }
1691
1692 /// Prepares a SQL statement with automatic parameter type inference.
1693 ///
1694 /// The returned [`PreparedStatement`](crate::PreparedStatement) can
1695 /// be executed many times with different parameter values; the
1696 /// server caches the parsed plan. This is the preferred way to
1697 /// execute a statement repeatedly inside a loop.
1698 ///
1699 /// For explicit parameter types (necessary when `$N` placeholders
1700 /// would otherwise be ambiguous), use
1701 /// [`prepare_typed`](Self::prepare_typed).
1702 ///
1703 /// # Example
1704 ///
1705 /// ```no_run
1706 /// # use hyperdb_api::{Connection, CreateMode, Result};
1707 /// # fn example(conn: &Connection) -> Result<()> {
1708 /// let stmt = conn.prepare("SELECT name FROM users WHERE id = $1")?;
1709 /// for id in [1_i32, 2, 3] {
1710 /// let name: String = stmt.fetch_scalar(&[&id])?;
1711 /// println!("{id}: {name}");
1712 /// }
1713 /// # Ok(())
1714 /// # }
1715 /// ```
1716 ///
1717 /// # Errors
1718 ///
1719 /// See [`prepare_typed`](Self::prepare_typed) — this method delegates
1720 /// to it with an empty OID list.
1721 pub fn prepare(&self, query: &str) -> Result<crate::PreparedStatement<'_>> {
1722 self.prepare_typed(query, &[])
1723 }
1724
1725 /// Prepares a SQL statement with explicit parameter type OIDs.
1726 ///
1727 /// Use this when the server cannot infer parameter types from the
1728 /// SQL alone (e.g. a bare `$1` in a `WHERE v > $1` clause with no
1729 /// other context). Constants for common types live in
1730 /// [`hyperdb_api_core::types::oids`].
1731 ///
1732 /// # Errors
1733 ///
1734 /// - Returns [`Error::FeatureNotSupported`] if the connection is using gRPC transport
1735 /// (prepared statements are TCP-only).
1736 /// - Returns [`Error::Server`] if the server rejects the `Parse`
1737 /// message, e.g. SQL syntax error or unknown OID.
1738 /// - Returns [`Error::Io`] on transport-level I/O failures.
1739 pub fn prepare_typed(
1740 &self,
1741 query: &str,
1742 param_types: &[crate::Oid],
1743 ) -> Result<crate::PreparedStatement<'_>> {
1744 let client = match &self.transport {
1745 Transport::Tcp(tcp) => &tcp.client,
1746 Transport::Grpc(_) => {
1747 return Err(Error::feature_not_supported(
1748 "prepared statements are not supported over gRPC transport",
1749 ));
1750 }
1751 };
1752 let inner = client.prepare_typed(query, param_types)?;
1753 crate::PreparedStatement::new(self, inner)
1754 }
1755
1756 /// Returns true if the connection is alive (passive check).
1757 ///
1758 /// This is a lightweight check that does not send any data to the server.
1759 /// For an active health check, use [`ping`](Self::ping).
1760 pub fn is_alive(&self) -> bool {
1761 match &self.transport {
1762 Transport::Tcp(tcp) => tcp.client.is_alive(),
1763 Transport::Grpc(_) => true, // gRPC connections are stateless
1764 }
1765 }
1766
1767 /// Actively checks that the connection is healthy by executing a trivial query.
1768 ///
1769 /// Unlike [`is_alive`](Self::is_alive) which only checks local state,
1770 /// this method sends `SELECT 1` to the server and verifies a response.
1771 ///
1772 /// # Example
1773 ///
1774 /// ```no_run
1775 /// # use hyperdb_api::{Connection, CreateMode, Result};
1776 /// # fn example(conn: &Connection) -> Result<()> {
1777 /// if conn.ping().is_ok() {
1778 /// println!("Connection is healthy");
1779 /// }
1780 /// # Ok(())
1781 /// # }
1782 /// ```
1783 ///
1784 /// # Errors
1785 ///
1786 /// Returns [`Error::Server`] or [`Error::Io`] if the `SELECT 1`
1787 /// round-trip fails — i.e. the connection is no longer usable.
1788 pub fn ping(&self) -> Result<()> {
1789 self.execute_command("SELECT 1")?;
1790 Ok(())
1791 }
1792
1793 /// Returns the process ID of the backend server connection.
1794 ///
1795 /// Returns 0 for gRPC connections (not applicable).
1796 pub fn process_id(&self) -> i32 {
1797 match &self.transport {
1798 Transport::Tcp(tcp) => tcp.client.process_id(),
1799 Transport::Grpc(_) => 0,
1800 }
1801 }
1802
1803 /// Returns the secret key for the backend server connection.
1804 ///
1805 /// This is used for cancellation requests.
1806 /// Returns 0 for gRPC connections (not applicable).
1807 pub fn secret_key(&self) -> i32 {
1808 match &self.transport {
1809 Transport::Tcp(tcp) => tcp.client.secret_key(),
1810 Transport::Grpc(_) => 0,
1811 }
1812 }
1813
1814 /// Returns a server parameter value by name.
1815 ///
1816 /// Server parameters are sent by the server during connection startup.
1817 /// Common parameters include:
1818 /// - `server_version` - The server version string
1819 /// - `server_encoding` - The server's character encoding
1820 /// - `client_encoding` - The client's character encoding
1821 /// - `DateStyle` - Date display format
1822 /// - `TimeZone` - Server timezone
1823 /// - `session_identifier` - Session ID for connection migration (if routing enabled)
1824 ///
1825 /// Returns `None` if the parameter is not known.
1826 ///
1827 /// # Example
1828 ///
1829 /// ```no_run
1830 /// use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
1831 ///
1832 /// fn main() -> Result<()> {
1833 /// let hyper = HyperProcess::new(None, None)?;
1834 /// let conn = Connection::new(&hyper, "test.hyper", CreateMode::CreateIfNotExists)?;
1835 ///
1836 /// if let Some(version) = conn.parameter_status("server_version") {
1837 /// println!("Connected to Hyper version: {}", version);
1838 /// }
1839 /// Ok(())
1840 /// }
1841 /// ```
1842 pub fn parameter_status(&self, name: &str) -> Option<String> {
1843 match &self.transport {
1844 Transport::Tcp(tcp) => tcp.client.parameter_status(name),
1845 Transport::Grpc(_) => None, // gRPC doesn't have server parameters
1846 }
1847 }
1848
1849 /// Sets the notice receiver for this connection.
1850 ///
1851 /// Server notices and warnings are passed to this callback instead of being
1852 /// logged. Pass `None` to restore default logging behavior.
1853 pub fn set_notice_receiver(
1854 &mut self,
1855 receiver: Option<hyperdb_api_core::client::NoticeReceiver>,
1856 ) {
1857 match &mut self.transport {
1858 Transport::Tcp(tcp) => tcp.client.set_notice_receiver(receiver),
1859 Transport::Grpc(_) => {} // gRPC doesn't support notice receivers
1860 }
1861 }
1862
1863 /// Cancels the currently executing query (thread-safe).
1864 ///
1865 /// # Errors
1866 ///
1867 /// - Returns [`Error::FeatureNotSupported`] on gRPC connections — cancellation is not
1868 /// yet implemented for gRPC transport.
1869 /// - Returns [`Error::Connection`] or [`Error::Io`] if the separate
1870 /// cancel-request connection to the server fails.
1871 pub fn cancel(&self) -> Result<()> {
1872 match &self.transport {
1873 Transport::Tcp(tcp) => tcp.client.cancel().map_err(Error::from),
1874 Transport::Grpc(_) => Err(Error::feature_not_supported(
1875 "Query cancellation is not yet supported for gRPC connections.",
1876 )),
1877 }
1878 }
1879
1880 /// Closes the connection, detaching all databases first.
1881 ///
1882 /// # Errors
1883 ///
1884 /// - Returns [`Error::Internal`] wrapping the underlying close failure
1885 /// (its `source` is the transport error) if the client cannot be
1886 /// shut down cleanly.
1887 /// - Returns [`Error::Internal`] wrapping the detach failure if the
1888 /// attached database could not be detached but close itself
1889 /// succeeded.
1890 pub fn close(self) -> Result<()> {
1891 // Detach the attached database to ensure files are flushed and released.
1892 // Always attempt close, even if detach fails.
1893 let detach_err = if let Some(ref db_path) = self.database {
1894 let db_alias = std::path::Path::new(db_path)
1895 .file_stem()
1896 .and_then(|s| s.to_str())
1897 .unwrap_or("db");
1898 self.execute_command(&format!("DETACH DATABASE {}", escape_sql_path(db_alias)))
1899 .err()
1900 } else {
1901 None
1902 };
1903
1904 // Always attempt to close the client to release the connection.
1905 let close_result = match self.transport {
1906 Transport::Tcp(tcp) => tcp.client.close(),
1907 Transport::Grpc(_) => Ok(()), // gRPC connections are stateless
1908 };
1909
1910 if let Err(e) = close_result {
1911 return Err(Error::internal(format!("Failed to close connection: {e}")));
1912 }
1913
1914 if let Some(e) = detach_err {
1915 // Detach failed but close succeeded; surface the detach error.
1916 return Err(Error::internal(format!(
1917 "Failed to detach database during close: {e}"
1918 )));
1919 }
1920
1921 Ok(())
1922 }
1923
1924 /// Unloads the database from memory while keeping the connection active.
1925 ///
1926 /// This executes the `UNLOAD DATABASE` command, which releases the database
1927 /// from memory but keeps the session and connection open. The database can
1928 /// be accessed again by subsequent queries that will automatically reload it.
1929 ///
1930 /// This is useful for releasing memory locks when switching between databases
1931 /// or when working with multiple database files.
1932 ///
1933 /// # Example
1934 ///
1935 /// ```no_run
1936 /// use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
1937 ///
1938 /// fn main() -> Result<()> {
1939 /// let hyper = HyperProcess::new(None, None)?;
1940 /// let conn = Connection::new(&hyper, "test.hyper", CreateMode::Create)?;
1941 ///
1942 /// // Do some work with the database
1943 /// conn.execute_command("CREATE TABLE test (id INT)")?;
1944 ///
1945 /// // Unload from memory (but keep connection)
1946 /// conn.unload_database()?;
1947 ///
1948 /// // Database can still be accessed (will be reloaded automatically)
1949 /// let count: i64 = conn.fetch_scalar("SELECT COUNT(*) FROM test")?;
1950 /// println!("Count: {}", count);
1951 ///
1952 /// Ok(())
1953 /// }
1954 /// ```
1955 ///
1956 /// # Errors
1957 ///
1958 /// Returns [`Error::Server`] if the server rejects the `UNLOAD DATABASE`
1959 /// command (e.g. the database is still in use by another session).
1960 pub fn unload_database(&self) -> Result<()> {
1961 self.execute_command("UNLOAD DATABASE")?;
1962 Ok(())
1963 }
1964
1965 /// Releases the database completely from the session.
1966 ///
1967 /// This executes the `UNLOAD RELEASE` command, which completely releases
1968 /// the database from the session. After this call, the database cannot
1969 /// be accessed until a new connection is established.
1970 ///
1971 /// This is useful for completely freeing database resources when you're
1972 /// done with a database and want to ensure no locks are held.
1973 ///
1974 /// **Note:** This should only be used when the session has exactly one
1975 /// database attached. Hyper does not support `UNLOAD RELEASE` with
1976 /// multiple databases attached to the same session.
1977 ///
1978 /// # Example
1979 ///
1980 /// ```no_run
1981 /// use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
1982 ///
1983 /// fn main() -> Result<()> {
1984 /// let hyper = HyperProcess::new(None, None)?;
1985 /// let conn = Connection::new(&hyper, "test.hyper", CreateMode::Create)?;
1986 ///
1987 /// // Do some work with the database
1988 /// conn.execute_command("CREATE TABLE test (id INT)")?;
1989 ///
1990 /// // Release database completely from session
1991 /// conn.unload_release()?;
1992 ///
1993 /// // Database cannot be accessed after this point without new connection
1994 /// // conn.execute_command("SELECT * FROM test")?; // This would fail
1995 ///
1996 /// Ok(())
1997 /// }
1998 /// ```
1999 ///
2000 /// # Errors
2001 ///
2002 /// Returns [`Error::Server`] if the server rejects `UNLOAD RELEASE`, most
2003 /// commonly because multiple databases are attached to the same session
2004 /// (Hyper only supports `UNLOAD RELEASE` with exactly one attached DB).
2005 pub fn unload_release(&self) -> Result<()> {
2006 self.execute_command("UNLOAD RELEASE")?;
2007 Ok(())
2008 }
2009
2010 // =========================================================================
2011 // Query Statistics
2012 // =========================================================================
2013
2014 /// Enables query statistics collection for this connection.
2015 ///
2016 /// After enabling, each `execute_command()` or `execute_query()` call will
2017 /// capture detailed performance metrics from Hyper. Retrieve them via
2018 /// [`last_query_stats()`](Self::last_query_stats).
2019 ///
2020 /// The provider determines how stats are collected. Use
2021 /// [`LogFileStatsProvider`](crate::LogFileStatsProvider) to parse Hyper's log file (requires local
2022 /// `hyperd.log`), or implement a custom [`QueryStatsProvider`](crate::QueryStatsProvider).
2023 ///
2024 /// # Example
2025 ///
2026 /// ```no_run
2027 /// # use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
2028 /// # fn main() -> Result<()> {
2029 /// # let hyper = HyperProcess::new(None, None)?;
2030 /// # let mut conn = Connection::new(&hyper, "test.hyper", CreateMode::CreateIfNotExists)?;
2031 /// use hyperdb_api::LogFileStatsProvider;
2032 ///
2033 /// // Auto-detect log path from HyperProcess
2034 /// conn.enable_query_stats(LogFileStatsProvider::from_process(&hyper));
2035 ///
2036 /// // Or specify an explicit log path
2037 /// // conn.enable_query_stats(LogFileStatsProvider::new("/path/to/hyperd.log"));
2038 /// # Ok(())
2039 /// # }
2040 /// ```
2041 pub fn enable_query_stats(&mut self, provider: impl QueryStatsProvider + 'static) {
2042 self.stats_provider = Some(Arc::new(provider));
2043 }
2044
2045 /// Disables query statistics collection.
2046 ///
2047 /// After calling this, `last_query_stats()` will return `None`.
2048 pub fn disable_query_stats(&mut self) {
2049 self.stats_provider = None;
2050 if let Ok(mut guard) = self.pending_stats.lock() {
2051 *guard = None;
2052 }
2053 }
2054
2055 /// Returns the query statistics from the most recent query execution.
2056 ///
2057 /// Stats are resolved **lazily** — the log file is read when this method
2058 /// is called, not when the query executes. This is important for streaming
2059 /// queries (`execute_query`), where Hyper writes the execution stats only
2060 /// after the result set is fully consumed.
2061 ///
2062 /// **Call this after consuming the result set** (e.g., after `collect_rows()`,
2063 /// iterating all chunks, or dropping the `Rowset`).
2064 ///
2065 /// Returns `None` if:
2066 /// - Query stats collection is not enabled
2067 /// - No query has been executed yet
2068 /// - Stats could not be found for the last query (e.g., log entry not matched)
2069 ///
2070 /// # Example
2071 ///
2072 /// ```no_run
2073 /// # use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
2074 /// # fn main() -> Result<()> {
2075 /// # let hyper = HyperProcess::new(None, None)?;
2076 /// # let mut conn = Connection::new(&hyper, "test.hyper", CreateMode::CreateIfNotExists)?;
2077 /// # use hyperdb_api::LogFileStatsProvider;
2078 /// # conn.enable_query_stats(LogFileStatsProvider::from_process(&hyper));
2079 /// conn.execute_command("CREATE TABLE t (id INT)")?;
2080 ///
2081 /// if let Some(stats) = conn.last_query_stats() {
2082 /// println!("Total: {}s", stats.elapsed_s);
2083 /// if let Some(ref pre) = stats.pre_execution {
2084 /// println!(" Parse: {:?}s", pre.parsing_time_s);
2085 /// println!(" Compile: {:?}s", pre.compilation_time_s);
2086 /// }
2087 /// if let Some(ref exec) = stats.execution {
2088 /// println!(" Execute: {:?}s", exec.elapsed_s);
2089 /// println!(" Peak mem: {:?} MB", exec.peak_memory_mb);
2090 /// }
2091 /// }
2092 /// # Ok(())
2093 /// # }
2094 /// ```
2095 pub fn last_query_stats(&self) -> Option<QueryStats> {
2096 let provider = self.stats_provider.as_ref()?;
2097 let mut guard = self.pending_stats.lock().ok()?;
2098 let (token, sql) = guard.take()?;
2099 provider.after_query(token, &sql)
2100 }
2101
2102 /// Internal: call provider's `before_query` if stats are enabled.
2103 fn stats_before_query(&self, sql: &str) -> Option<Box<dyn Any + Send>> {
2104 self.stats_provider.as_ref().map(|p| p.before_query(sql))
2105 }
2106
2107 /// Internal: store the pending token+sql for lazy resolution.
2108 fn stats_store_pending(&self, token: Option<Box<dyn Any + Send>>, sql: &str) {
2109 if let Some(token) = token {
2110 if let Ok(mut guard) = self.pending_stats.lock() {
2111 *guard = Some((token, sql.to_string()));
2112 }
2113 }
2114 }
2115}
2116
2117impl Connection {
2118 // =========================================================================
2119 // Transaction Control
2120 // =========================================================================
2121
2122 // -------------------------------------------------------------------
2123 // Raw transaction control (internal)
2124 // -------------------------------------------------------------------
2125 //
2126 // The `*_raw` methods below are `pub(crate)` and form the canonical
2127 // implementation of session-level transaction control. The RAII
2128 // guard at `crate::Transaction` and any internal helper that
2129 // genuinely needs `&self` (rather than the guard's `&mut self`)
2130 // delegate to these.
2131 //
2132 // The matching `pub` methods (`begin_transaction`, `commit`,
2133 // `rollback`) are thin `#[doc(hidden)] #[deprecated]` wrappers
2134 // retained only so any pre-existing downstream caller sees a
2135 // compiler warning rather than a hard break. They will be deleted
2136 // in a future release; the `_raw` methods stay.
2137
2138 /// Issues `BEGIN TRANSACTION`. Crate-internal use only.
2139 pub(crate) fn begin_transaction_raw(&self) -> Result<()> {
2140 self.execute_command("BEGIN TRANSACTION")?;
2141 Ok(())
2142 }
2143
2144 /// Issues `COMMIT`. Crate-internal use only.
2145 pub(crate) fn commit_raw(&self) -> Result<()> {
2146 self.execute_command("COMMIT")?;
2147 Ok(())
2148 }
2149
2150 /// Issues `ROLLBACK`. Crate-internal use only.
2151 pub(crate) fn rollback_raw(&self) -> Result<()> {
2152 self.execute_command("ROLLBACK")?;
2153 Ok(())
2154 }
2155
2156 /// Begins an explicit transaction.
2157 ///
2158 /// **Prefer [`transaction()`](Self::transaction)** — the RAII guard
2159 /// auto-rolls back on drop and cannot leak a half-open transaction
2160 /// across error paths. This method is hidden from generated
2161 /// rustdoc and marked deprecated; it will be removed in a future
2162 /// release.
2163 ///
2164 /// # Errors
2165 ///
2166 /// Returns [`Error::Server`] if the server rejects `BEGIN TRANSACTION`
2167 /// (e.g. a transaction is already open on this session).
2168 #[doc(hidden)]
2169 #[deprecated(
2170 note = "Use `Connection::transaction()` for an RAII guard. This method will be removed \
2171 in a future release."
2172 )]
2173 pub fn begin_transaction(&self) -> Result<()> {
2174 self.begin_transaction_raw()
2175 }
2176
2177 /// Commits the current transaction.
2178 ///
2179 /// **Prefer [`Transaction::commit`](crate::Transaction::commit)** on
2180 /// the RAII guard returned by [`transaction()`](Self::transaction).
2181 /// Hidden from generated rustdoc and deprecated; slated for removal.
2182 ///
2183 /// # Errors
2184 ///
2185 /// Returns [`Error::Server`] if the server rejects `COMMIT`.
2186 #[doc(hidden)]
2187 #[deprecated(
2188 note = "Use `Transaction::commit()` on the RAII guard from `Connection::transaction()`. \
2189 This method will be removed in a future release."
2190 )]
2191 pub fn commit(&self) -> Result<()> {
2192 self.commit_raw()
2193 }
2194
2195 /// Rolls back the current transaction.
2196 ///
2197 /// **Prefer [`Transaction::rollback`](crate::Transaction::rollback)**
2198 /// on the RAII guard returned by [`transaction()`](Self::transaction).
2199 /// Hidden from generated rustdoc and deprecated; slated for removal.
2200 ///
2201 /// # Errors
2202 ///
2203 /// Returns [`Error::Server`] if the server rejects `ROLLBACK`.
2204 #[doc(hidden)]
2205 #[deprecated(
2206 note = "Use `Transaction::rollback()` on the RAII guard from `Connection::transaction()`. \
2207 This method will be removed in a future release."
2208 )]
2209 pub fn rollback(&self) -> Result<()> {
2210 self.rollback_raw()
2211 }
2212
2213 /// Starts a transaction and returns an RAII guard that auto-rolls back on drop.
2214 ///
2215 /// The returned [`Transaction`](crate::Transaction) exclusively borrows this connection,
2216 /// preventing any other use of the connection while the transaction is active.
2217 /// This is enforced at compile time by Rust's borrow checker. The guard provides
2218 /// `commit()` and `rollback()` methods. If dropped without calling either, the
2219 /// transaction is automatically rolled back.
2220 ///
2221 /// # Example
2222 ///
2223 /// ```no_run
2224 /// # use hyperdb_api::{Connection, CreateMode, Result};
2225 /// # fn main() -> Result<()> {
2226 /// # let mut conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
2227 /// let txn = conn.transaction()?;
2228 /// txn.execute_command("INSERT INTO users VALUES (1, 'Alice')")?;
2229 /// txn.commit()?; // or drop `txn` to auto-rollback
2230 /// # Ok(())
2231 /// # }
2232 /// ```
2233 ///
2234 /// # Errors
2235 ///
2236 /// Returns [`Error::Server`] if the server rejects the `BEGIN`
2237 /// statement issued internally by
2238 /// [`Transaction::new`](crate::Transaction).
2239 pub fn transaction(&mut self) -> Result<crate::Transaction<'_>> {
2240 crate::Transaction::new(self)
2241 }
2242}
2243
2244/// Checks if an error indicates an "already exists" condition based on SQLSTATE codes.
2245///
2246/// This function uses `PostgreSQL` SQLSTATE codes to reliably detect duplicate object errors
2247/// regardless of server locale or message formatting. The codes checked are:
2248/// - `42P04`: Database already exists
2249/// - `42710`: Duplicate object
2250/// - `42P06`: Duplicate schema
2251/// - `42P07`: Duplicate table
2252///
2253/// See: <https://www.postgresql.org/docs/current/errcodes-appendix.html>
2254fn is_already_exists_error(err: &Error) -> bool {
2255 err.sqlstate()
2256 .is_some_and(|code| matches!(code, "42P04" | "42710" | "42P06" | "42P07"))
2257}