Skip to main content

hyperdb_api/
grpc_connection.rs

1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! gRPC-based connection to Hyper databases.
5//!
6//! This module provides [`GrpcConnection`] for query-only access to Hyper databases
7//! via gRPC, with results returned in Apache Arrow IPC format.
8//!
9//! # Differences from TCP Connection
10//!
11//! gRPC connections have these limitations compared to TCP:
12//!
13//! - **Read-only**: Only SELECT queries are supported
14//! - **No DDL/DML**: CREATE, DROP, INSERT, UPDATE, DELETE not supported
15//! - **No COPY**: Bulk insertion via COPY protocol not available
16//! - **No prepared statements**: Parameters not supported
17//!
18//! Use [`Connection`](crate::Connection) for full read-write access.
19//!
20//! # Transfer Modes
21//!
22//! gRPC queries support three transfer modes via
23//! [`TransferMode`](crate::grpc::TransferMode), which control how
24//! result data flows between server and client:
25//!
26//! - **`Sync`** — All results are returned in a single response message.
27//!   Simple but limited to a 100-second server timeout, making it unsuitable
28//!   for long-running queries or large result sets.
29//!
30//! - **`Async`** — The initial response contains only a header (query ID);
31//!   results must be fetched separately via `GetQueryResult`. Useful when
32//!   the client needs to decouple query submission from result consumption.
33//!
34//! - **`Adaptive`** (default, recommended) — The first chunk of results is
35//!   returned inline with the response; remaining chunks are streamed via
36//!   `GetQueryResult`. This avoids the sync timeout limit while keeping
37//!   latency low for small results that fit in a single chunk.
38//!
39//! # Example
40//!
41//! ```no_run
42//! use hyperdb_api::grpc::{GrpcConnection, GrpcConnectionAsync};
43//!
44//! // Async usage
45//! #[tokio::main]
46//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
47//!     let mut conn = GrpcConnectionAsync::connect(
48//!         "http://localhost:7484",
49//!         "my_database.hyper"
50//!     ).await?;
51//!
52//!     // Execute a query
53//!     let arrow_data = conn.execute_query_to_arrow("SELECT * FROM users").await?;
54//!
55//!     // Or get a structured result
56//!     let result = conn.execute_query("SELECT id, name FROM users").await?;
57//!     println!("Got {} bytes of Arrow data", result.arrow_data().len());
58//!
59//!     Ok(())
60//! }
61//!
62//! // Sync usage
63//! fn main_sync() -> Result<(), Box<dyn std::error::Error>> {
64//!     let mut conn = GrpcConnection::connect(
65//!         "http://localhost:7484",
66//!         "my_database.hyper"
67//!     )?;
68//!
69//!     let arrow_data = conn.execute_query_to_arrow("SELECT * FROM users")?;
70//!     Ok(())
71//! }
72//! ```
73
74use crate::error::Result;
75
76// Re-export types from hyperdb_api_core::client::grpc for convenience
77pub(crate) use hyperdb_api_core::client::grpc::{
78    GrpcClient, GrpcClientSync, GrpcConfig, GrpcQueryResult,
79};
80
81/// A gRPC connection to a Hyper database (query-only).
82///
83/// Unlike [`Connection`](crate::Connection), this provides read-only access via gRPC
84/// with results in Apache Arrow IPC format. This is useful for:
85///
86/// - Load-balanced deployments where gRPC provides better routing
87/// - Integration with Arrow-based data pipelines
88/// - Scenarios where HTTP/2 benefits are needed
89///
90/// # Limitations
91///
92/// gRPC connections only support SELECT queries. Attempting to execute
93/// DDL (CREATE, DROP), DML (INSERT, UPDATE, DELETE), or use features like
94/// prepared statements will result in an error.
95///
96/// # Async vs Sync
97///
98/// This struct provides both async and sync APIs:
99///
100/// - `connect()` / `execute_query()` - blocking (uses internal tokio runtime)
101/// - `connect_async()` / `execute_query_async()` - async (requires tokio runtime)
102///
103/// For applications already using tokio, the async methods are preferred.
104#[derive(Debug)]
105pub struct GrpcConnection {
106    /// The underlying gRPC client (sync wrapper)
107    client: GrpcClientSync,
108    /// Database path
109    database: Option<String>,
110}
111
112impl GrpcConnection {
113    /// Connects to a Hyper server via gRPC (blocking).
114    ///
115    /// # Arguments
116    ///
117    /// * `endpoint` - The gRPC endpoint URL (e.g., "<http://localhost:7484>")
118    /// * `database_path` - Path to the database to query
119    ///
120    /// # Example
121    ///
122    /// ```no_run
123    /// # use hyperdb_api::grpc::GrpcConnection;
124    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
125    /// let conn = GrpcConnection::connect(
126    ///     "http://localhost:7484",
127    ///     "my_database.hyper"
128    /// )?;
129    /// # Ok(())
130    /// # }
131    /// ```
132    ///
133    /// # Errors
134    ///
135    /// Returns [`crate::Error::Client`] wrapping a
136    /// `hyperdb_api_core::client::Error` if the HTTP/2 channel cannot be established
137    /// (endpoint unreachable, TLS handshake failure, auth rejection).
138    pub fn connect(endpoint: &str, database_path: &str) -> Result<Self> {
139        let config = GrpcConfig::new(endpoint).database(database_path);
140        let client = GrpcClientSync::connect(config)?;
141
142        Ok(GrpcConnection {
143            client,
144            database: Some(database_path.to_string()),
145        })
146    }
147
148    /// Connects to a Hyper server via gRPC with custom configuration (blocking).
149    ///
150    /// # Example
151    ///
152    /// ```no_run
153    /// # use hyperdb_api::grpc::{GrpcConnection, GrpcConfig};
154    /// # use std::time::Duration;
155    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
156    /// let config = GrpcConfig::new("http://localhost:7484")
157    ///     .database("my_database.hyper")
158    ///     .connect_timeout(Duration::from_secs(10))
159    ///     .header("x-custom-header", "value");
160    ///
161    /// let conn = GrpcConnection::connect_with_config(config)?;
162    /// # Ok(())
163    /// # }
164    /// ```
165    ///
166    /// # Errors
167    ///
168    /// Returns [`crate::Error::Client`] if the HTTP/2 channel cannot be
169    /// established with the supplied configuration.
170    pub fn connect_with_config(config: GrpcConfig) -> Result<Self> {
171        let database = config.database_path().map(std::string::ToString::to_string);
172        let client = GrpcClientSync::connect(config)?;
173
174        Ok(GrpcConnection { client, database })
175    }
176
177    /// Executes a SQL query and returns raw Arrow IPC bytes (blocking).
178    ///
179    /// This is the most efficient method when you need the raw Arrow data
180    /// for processing with an Arrow library.
181    ///
182    /// # Example
183    ///
184    /// ```no_run
185    /// # use hyperdb_api::grpc::GrpcConnection;
186    /// # fn example(conn: &mut GrpcConnection) -> Result<(), Box<dyn std::error::Error>> {
187    /// let arrow_bytes = conn.execute_query_to_arrow("SELECT * FROM users")?;
188    ///
189    /// // Process with arrow crate using the zero-copy helper
190    /// let rowset = hyperdb_api::ArrowRowset::from_bytes(arrow_bytes)?;
191    /// # Ok(())
192    /// # }
193    /// ```
194    ///
195    /// # Errors
196    ///
197    /// Returns [`crate::Error::Client`] if the gRPC server rejects the query
198    /// or if the HTTP/2 channel fails mid-stream.
199    pub fn execute_query_to_arrow(&mut self, sql: &str) -> Result<bytes::Bytes> {
200        Ok(self.client.execute_query_to_arrow(sql)?)
201    }
202
203    /// Executes a SQL query and returns a structured result (blocking).
204    ///
205    /// The result contains the Arrow IPC data along with metadata about
206    /// the query execution.
207    ///
208    /// # Example
209    ///
210    /// ```no_run
211    /// # use hyperdb_api::grpc::GrpcConnection;
212    /// # fn example(conn: &mut GrpcConnection) -> Result<(), Box<dyn std::error::Error>> {
213    /// let result = conn.execute_query("SELECT id, name FROM users")?;
214    /// println!("Query ID: {:?}", result.query_id());
215    /// println!("Columns: {}", result.column_count());
216    /// let arrow_data = result.into_arrow_data();
217    /// # Ok(())
218    /// # }
219    /// ```
220    ///
221    /// # Errors
222    ///
223    /// Returns [`crate::Error::Client`] if the gRPC server rejects the query
224    /// or the HTTP/2 channel fails.
225    pub fn execute_query(&mut self, sql: &str) -> Result<GrpcQueryResult> {
226        Ok(self.client.execute_query(sql)?)
227    }
228
229    /// Cancels an in-flight gRPC query by its `query_id` (blocking).
230    ///
231    /// This is the gRPC analogue of PG wire's `CancelRequest`. Unlike PG
232    /// wire — where the cancel opens a separate TCP connection — the
233    /// gRPC cancel rides the existing HTTP/2 channel as a regular RPC,
234    /// so it reuses this connection's channel, database routing, and
235    /// custom headers.
236    ///
237    /// # When you have a `query_id`
238    ///
239    /// The server assigns a `query_id` for ASYNC-mode queries
240    /// (long-running queries that the client polls).  Read it from
241    /// [`GrpcQueryResult::query_id`] after an async-mode execution.
242    /// SYNC-mode queries typically complete before a cancel would be
243    /// useful — drop the in-flight future instead.
244    ///
245    /// Cancellation is best-effort: a successful `Ok(())` return means
246    /// the server acknowledged the cancel, not that the query had not
247    /// already finished. Errors indicate a transport-level failure
248    /// (channel closed, network error, auth expired) — useful for
249    /// metrics, retry logic, or "cancel failed" UX.
250    ///
251    /// # Fallible by design
252    ///
253    /// The `Result<()>` return is the **explicit user-facing cancel
254    /// API** and is distinct from the
255    /// [`Cancellable`](hyperdb_api_core::client::Cancellable) trait, which requires
256    /// an infallible `cancel(&self)` method with no arguments. A
257    /// `GrpcConnection` cannot implement `Cancellable` directly: the
258    /// trait's signature has nowhere to pass a `query_id`, and gRPC
259    /// connections can carry many concurrent queries (so there is no
260    /// unambiguous "the" query to cancel the way there is on a PG wire
261    /// connection). If you need `Cancellable`-style fire-and-forget
262    /// cancel for a future gRPC streaming result type, it will live
263    /// on a per-query handle that wraps this method and swallows
264    /// errors — mirroring
265    /// [`impl Cancellable for hyperdb_api_core::client::Client`](hyperdb_api_core::client::Cancellable).
266    ///
267    /// # Example
268    ///
269    /// ```no_run
270    /// # use hyperdb_api::grpc::GrpcConnection;
271    /// # fn example(conn: &mut GrpcConnection, query_id: &str) -> hyperdb_api::Result<()> {
272    /// conn.cancel_query(query_id)?;
273    /// # Ok(())
274    /// # }
275    /// ```
276    ///
277    /// # Errors
278    ///
279    /// Returns [`crate::Error::Client`] on transport-level failures (channel
280    /// closed, network error, auth expired). An unknown or already-finished
281    /// `query_id` is not an error — the server returns `Ok`.
282    pub fn cancel_query(&mut self, query_id: &str) -> Result<()> {
283        Ok(self.client.cancel_query(query_id)?)
284    }
285
286    /// Returns the database path, if one is attached.
287    #[must_use]
288    pub fn database(&self) -> Option<&str> {
289        self.database.as_deref()
290    }
291
292    /// Returns the gRPC configuration.
293    pub fn config(&self) -> &GrpcConfig {
294        self.client.config()
295    }
296
297    /// Closes the connection (blocking).
298    ///
299    /// Note: The connection is automatically closed when dropped.
300    ///
301    /// # Errors
302    ///
303    /// Returns [`crate::Error::Client`] if the underlying HTTP/2 channel cannot
304    /// be shut down cleanly.
305    pub fn close(self) -> Result<()> {
306        Ok(self.client.close()?)
307    }
308}
309
310/// Async gRPC connection to a Hyper database (query-only).
311///
312/// This is the async version of [`GrpcConnection`], designed for use with
313/// tokio-based async applications.
314///
315/// # Example
316///
317/// ```no_run
318/// # use hyperdb_api::grpc::GrpcConnectionAsync;
319/// #[tokio::main]
320/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
321///     let mut conn = GrpcConnectionAsync::connect(
322///         "http://localhost:7484",
323///         "my_database.hyper"
324///     ).await?;
325///
326///     let arrow_data = conn.execute_query_to_arrow("SELECT * FROM users").await?;
327///     Ok(())
328/// }
329/// ```
330#[derive(Debug)]
331pub struct GrpcConnectionAsync {
332    /// The underlying async gRPC client
333    client: GrpcClient,
334    /// Database path
335    database: Option<String>,
336}
337
338impl GrpcConnectionAsync {
339    /// Connects to a Hyper server via gRPC (async).
340    ///
341    /// # Arguments
342    ///
343    /// * `endpoint` - The gRPC endpoint URL (e.g., "<http://localhost:7484>")
344    /// * `database_path` - Path to the database to query
345    ///
346    /// # Errors
347    ///
348    /// Returns [`crate::Error::Client`] if the HTTP/2 channel cannot be
349    /// established (endpoint unreachable, TLS handshake failure).
350    pub async fn connect(endpoint: &str, database_path: &str) -> Result<Self> {
351        let config = GrpcConfig::new(endpoint).database(database_path);
352        let client = GrpcClient::connect(config).await?;
353
354        Ok(GrpcConnectionAsync {
355            client,
356            database: Some(database_path.to_string()),
357        })
358    }
359
360    /// Connects to a Hyper server via gRPC with custom configuration (async).
361    ///
362    /// # Errors
363    ///
364    /// Returns [`crate::Error::Client`] if the HTTP/2 channel cannot be
365    /// established with the supplied configuration.
366    pub async fn connect_with_config(config: GrpcConfig) -> Result<Self> {
367        let database = config.database_path().map(std::string::ToString::to_string);
368        let client = GrpcClient::connect(config).await?;
369
370        Ok(GrpcConnectionAsync { client, database })
371    }
372
373    /// Executes a SQL query and returns raw Arrow IPC bytes (async).
374    ///
375    /// # Errors
376    ///
377    /// Returns [`crate::Error::Client`] if the server rejects the query or the
378    /// HTTP/2 channel fails mid-stream.
379    pub async fn execute_query_to_arrow(&mut self, sql: &str) -> Result<bytes::Bytes> {
380        Ok(self.client.execute_query_to_arrow(sql).await?)
381    }
382
383    /// Executes a SQL query and returns a structured result (async).
384    ///
385    /// # Errors
386    ///
387    /// Returns [`crate::Error::Client`] if the server rejects the query or the
388    /// HTTP/2 channel fails.
389    pub async fn execute_query(&mut self, sql: &str) -> Result<GrpcQueryResult> {
390        Ok(self.client.execute_query(sql).await?)
391    }
392
393    /// Cancels an in-flight gRPC query by its `query_id` (async).
394    ///
395    /// See [`GrpcConnection::cancel_query`] for full semantics, including
396    /// the "Fallible by design" discussion of why this returns
397    /// `Result<()>` and why it is *not* an implementation of the
398    /// [`Cancellable`](hyperdb_api_core::client::Cancellable) trait. The async
399    /// variant avoids blocking the current thread; both variants route
400    /// the cancel over the same channel used for queries, carrying this
401    /// connection's database routing and custom headers.
402    ///
403    /// # Example
404    ///
405    /// ```no_run
406    /// # use hyperdb_api::grpc::GrpcConnectionAsync;
407    /// # async fn example(conn: &mut GrpcConnectionAsync, query_id: &str)
408    /// #     -> hyperdb_api::Result<()> {
409    /// conn.cancel_query(query_id).await?;
410    /// # Ok(())
411    /// # }
412    /// ```
413    ///
414    /// # Errors
415    ///
416    /// See [`GrpcConnection::cancel_query`].
417    pub async fn cancel_query(&mut self, query_id: &str) -> Result<()> {
418        Ok(self.client.cancel_query(query_id).await?)
419    }
420
421    /// Returns the database path, if one is attached.
422    #[must_use]
423    pub fn database(&self) -> Option<&str> {
424        self.database.as_deref()
425    }
426
427    /// Returns the gRPC configuration.
428    pub fn config(&self) -> &GrpcConfig {
429        self.client.config()
430    }
431
432    /// Closes the connection (async).
433    ///
434    /// # Errors
435    ///
436    /// Returns [`crate::Error::Client`] if the underlying HTTP/2 channel cannot
437    /// be shut down cleanly.
438    pub async fn close(self) -> Result<()> {
439        Ok(self.client.close().await?)
440    }
441}
442
443// ============================================================================
444// Streaming chunk source adapter
445// ============================================================================
446
447/// Adapter from a [`hyperdb_api_core::client::grpc::GrpcChunkStreamSync`] to
448/// [`crate::arrow_result::ChunkSource`].
449///
450/// Used internally by [`Connection::execute_query`](crate::Connection::execute_query)
451/// on a gRPC transport so the high-level `Rowset` decodes record batches
452/// lazily instead of buffering the whole Arrow IPC payload in memory.
453pub(crate) struct GrpcChunkStreamSource {
454    inner: hyperdb_api_core::client::grpc::GrpcChunkStreamSync,
455}
456
457impl GrpcChunkStreamSource {
458    pub(crate) fn new(inner: hyperdb_api_core::client::grpc::GrpcChunkStreamSync) -> Self {
459        GrpcChunkStreamSource { inner }
460    }
461}
462
463impl crate::arrow_result::ChunkSource for GrpcChunkStreamSource {
464    fn next_chunk(&mut self) -> crate::Result<Option<bytes::Bytes>> {
465        Ok(self.inner.next_chunk()?)
466    }
467}