hyperdb_api/grpc_connection.rs
1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! gRPC-based connection to Hyper databases.
5//!
6//! This module provides [`GrpcConnection`] for query-only access to Hyper databases
7//! via gRPC, with results returned in Apache Arrow IPC format.
8//!
9//! # Differences from TCP Connection
10//!
11//! gRPC connections have these limitations compared to TCP:
12//!
13//! - **Read-only**: Only SELECT queries are supported
14//! - **No DDL/DML**: CREATE, DROP, INSERT, UPDATE, DELETE not supported
15//! - **No COPY**: Bulk insertion via COPY protocol not available
16//! - **No prepared statements**: Parameters not supported
17//!
18//! Use [`Connection`](crate::Connection) for full read-write access.
19//!
20//! # Transfer Modes
21//!
22//! gRPC queries support three transfer modes via
23//! [`TransferMode`](crate::grpc::TransferMode), which control how
24//! result data flows between server and client:
25//!
26//! - **`Sync`** — All results are returned in a single response message.
27//! Simple but limited to a 100-second server timeout, making it unsuitable
28//! for long-running queries or large result sets.
29//!
30//! - **`Async`** — The initial response contains only a header (query ID);
31//! results must be fetched separately via `GetQueryResult`. Useful when
32//! the client needs to decouple query submission from result consumption.
33//!
34//! - **`Adaptive`** (default, recommended) — The first chunk of results is
35//! returned inline with the response; remaining chunks are streamed via
36//! `GetQueryResult`. This avoids the sync timeout limit while keeping
37//! latency low for small results that fit in a single chunk.
38//!
39//! # Example
40//!
41//! ```no_run
42//! use hyperdb_api::grpc::{GrpcConnection, GrpcConnectionAsync};
43//!
44//! // Async usage
45//! #[tokio::main]
46//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
47//! let mut conn = GrpcConnectionAsync::connect(
48//! "http://localhost:7484",
49//! "my_database.hyper"
50//! ).await?;
51//!
52//! // Execute a query
53//! let arrow_data = conn.execute_query_to_arrow("SELECT * FROM users").await?;
54//!
55//! // Or get a structured result
56//! let result = conn.execute_query("SELECT id, name FROM users").await?;
57//! println!("Got {} bytes of Arrow data", result.arrow_data().len());
58//!
59//! Ok(())
60//! }
61//!
62//! // Sync usage
63//! fn main_sync() -> Result<(), Box<dyn std::error::Error>> {
64//! let mut conn = GrpcConnection::connect(
65//! "http://localhost:7484",
66//! "my_database.hyper"
67//! )?;
68//!
69//! let arrow_data = conn.execute_query_to_arrow("SELECT * FROM users")?;
70//! Ok(())
71//! }
72//! ```
73
74use crate::error::Result;
75
76// Re-export types from hyperdb_api_core::client::grpc for convenience
77pub(crate) use hyperdb_api_core::client::grpc::{
78 GrpcClient, GrpcClientSync, GrpcConfig, GrpcQueryResult,
79};
80
81/// A gRPC connection to a Hyper database (query-only).
82///
83/// Unlike [`Connection`](crate::Connection), this provides read-only access via gRPC
84/// with results in Apache Arrow IPC format. This is useful for:
85///
86/// - Load-balanced deployments where gRPC provides better routing
87/// - Integration with Arrow-based data pipelines
88/// - Scenarios where HTTP/2 benefits are needed
89///
90/// # Limitations
91///
92/// gRPC connections only support SELECT queries. Attempting to execute
93/// DDL (CREATE, DROP), DML (INSERT, UPDATE, DELETE), or use features like
94/// prepared statements will result in an error.
95///
96/// # Async vs Sync
97///
98/// This struct provides both async and sync APIs:
99///
100/// - `connect()` / `execute_query()` - blocking (uses internal tokio runtime)
101/// - `connect_async()` / `execute_query_async()` - async (requires tokio runtime)
102///
103/// For applications already using tokio, the async methods are preferred.
104#[derive(Debug)]
105pub struct GrpcConnection {
106 /// The underlying gRPC client (sync wrapper)
107 client: GrpcClientSync,
108 /// Database path
109 database: Option<String>,
110}
111
112impl GrpcConnection {
113 /// Connects to a Hyper server via gRPC (blocking).
114 ///
115 /// # Arguments
116 ///
117 /// * `endpoint` - The gRPC endpoint URL (e.g., "<http://localhost:7484>")
118 /// * `database_path` - Path to the database to query
119 ///
120 /// # Example
121 ///
122 /// ```no_run
123 /// # use hyperdb_api::grpc::GrpcConnection;
124 /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
125 /// let conn = GrpcConnection::connect(
126 /// "http://localhost:7484",
127 /// "my_database.hyper"
128 /// )?;
129 /// # Ok(())
130 /// # }
131 /// ```
132 ///
133 /// # Errors
134 ///
135 /// Returns [`crate::Error::Client`] wrapping a
136 /// `hyperdb_api_core::client::Error` if the HTTP/2 channel cannot be established
137 /// (endpoint unreachable, TLS handshake failure, auth rejection).
138 pub fn connect(endpoint: &str, database_path: &str) -> Result<Self> {
139 let config = GrpcConfig::new(endpoint).database(database_path);
140 let client = GrpcClientSync::connect(config)?;
141
142 Ok(GrpcConnection {
143 client,
144 database: Some(database_path.to_string()),
145 })
146 }
147
148 /// Connects to a Hyper server via gRPC with custom configuration (blocking).
149 ///
150 /// # Example
151 ///
152 /// ```no_run
153 /// # use hyperdb_api::grpc::{GrpcConnection, GrpcConfig};
154 /// # use std::time::Duration;
155 /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
156 /// let config = GrpcConfig::new("http://localhost:7484")
157 /// .database("my_database.hyper")
158 /// .connect_timeout(Duration::from_secs(10))
159 /// .header("x-custom-header", "value");
160 ///
161 /// let conn = GrpcConnection::connect_with_config(config)?;
162 /// # Ok(())
163 /// # }
164 /// ```
165 ///
166 /// # Errors
167 ///
168 /// Returns [`crate::Error::Client`] if the HTTP/2 channel cannot be
169 /// established with the supplied configuration.
170 pub fn connect_with_config(config: GrpcConfig) -> Result<Self> {
171 let database = config.database_path().map(std::string::ToString::to_string);
172 let client = GrpcClientSync::connect(config)?;
173
174 Ok(GrpcConnection { client, database })
175 }
176
177 /// Executes a SQL query and returns raw Arrow IPC bytes (blocking).
178 ///
179 /// This is the most efficient method when you need the raw Arrow data
180 /// for processing with an Arrow library.
181 ///
182 /// # Example
183 ///
184 /// ```no_run
185 /// # use hyperdb_api::grpc::GrpcConnection;
186 /// # fn example(conn: &mut GrpcConnection) -> Result<(), Box<dyn std::error::Error>> {
187 /// let arrow_bytes = conn.execute_query_to_arrow("SELECT * FROM users")?;
188 ///
189 /// // Process with arrow crate using the zero-copy helper
190 /// let rowset = hyperdb_api::ArrowRowset::from_bytes(arrow_bytes)?;
191 /// # Ok(())
192 /// # }
193 /// ```
194 ///
195 /// # Errors
196 ///
197 /// Returns [`crate::Error::Client`] if the gRPC server rejects the query
198 /// or if the HTTP/2 channel fails mid-stream.
199 pub fn execute_query_to_arrow(&mut self, sql: &str) -> Result<bytes::Bytes> {
200 Ok(self.client.execute_query_to_arrow(sql)?)
201 }
202
203 /// Executes a SQL query and returns a structured result (blocking).
204 ///
205 /// The result contains the Arrow IPC data along with metadata about
206 /// the query execution.
207 ///
208 /// # Example
209 ///
210 /// ```no_run
211 /// # use hyperdb_api::grpc::GrpcConnection;
212 /// # fn example(conn: &mut GrpcConnection) -> Result<(), Box<dyn std::error::Error>> {
213 /// let result = conn.execute_query("SELECT id, name FROM users")?;
214 /// println!("Query ID: {:?}", result.query_id());
215 /// println!("Columns: {}", result.column_count());
216 /// let arrow_data = result.into_arrow_data();
217 /// # Ok(())
218 /// # }
219 /// ```
220 ///
221 /// # Errors
222 ///
223 /// Returns [`crate::Error::Client`] if the gRPC server rejects the query
224 /// or the HTTP/2 channel fails.
225 pub fn execute_query(&mut self, sql: &str) -> Result<GrpcQueryResult> {
226 Ok(self.client.execute_query(sql)?)
227 }
228
229 /// Cancels an in-flight gRPC query by its `query_id` (blocking).
230 ///
231 /// This is the gRPC analogue of PG wire's `CancelRequest`. Unlike PG
232 /// wire — where the cancel opens a separate TCP connection — the
233 /// gRPC cancel rides the existing HTTP/2 channel as a regular RPC,
234 /// so it reuses this connection's channel, database routing, and
235 /// custom headers.
236 ///
237 /// # When you have a `query_id`
238 ///
239 /// The server assigns a `query_id` for ASYNC-mode queries
240 /// (long-running queries that the client polls). Read it from
241 /// [`GrpcQueryResult::query_id`] after an async-mode execution.
242 /// SYNC-mode queries typically complete before a cancel would be
243 /// useful — drop the in-flight future instead.
244 ///
245 /// Cancellation is best-effort: a successful `Ok(())` return means
246 /// the server acknowledged the cancel, not that the query had not
247 /// already finished. Errors indicate a transport-level failure
248 /// (channel closed, network error, auth expired) — useful for
249 /// metrics, retry logic, or "cancel failed" UX.
250 ///
251 /// # Fallible by design
252 ///
253 /// The `Result<()>` return is the **explicit user-facing cancel
254 /// API** and is distinct from the
255 /// [`Cancellable`](hyperdb_api_core::client::Cancellable) trait, which requires
256 /// an infallible `cancel(&self)` method with no arguments. A
257 /// `GrpcConnection` cannot implement `Cancellable` directly: the
258 /// trait's signature has nowhere to pass a `query_id`, and gRPC
259 /// connections can carry many concurrent queries (so there is no
260 /// unambiguous "the" query to cancel the way there is on a PG wire
261 /// connection). If you need `Cancellable`-style fire-and-forget
262 /// cancel for a future gRPC streaming result type, it will live
263 /// on a per-query handle that wraps this method and swallows
264 /// errors — mirroring
265 /// [`impl Cancellable for hyperdb_api_core::client::Client`](hyperdb_api_core::client::Cancellable).
266 ///
267 /// # Example
268 ///
269 /// ```no_run
270 /// # use hyperdb_api::grpc::GrpcConnection;
271 /// # fn example(conn: &mut GrpcConnection, query_id: &str) -> hyperdb_api::Result<()> {
272 /// conn.cancel_query(query_id)?;
273 /// # Ok(())
274 /// # }
275 /// ```
276 ///
277 /// # Errors
278 ///
279 /// Returns [`crate::Error::Client`] on transport-level failures (channel
280 /// closed, network error, auth expired). An unknown or already-finished
281 /// `query_id` is not an error — the server returns `Ok`.
282 pub fn cancel_query(&mut self, query_id: &str) -> Result<()> {
283 Ok(self.client.cancel_query(query_id)?)
284 }
285
286 /// Returns the database path, if one is attached.
287 #[must_use]
288 pub fn database(&self) -> Option<&str> {
289 self.database.as_deref()
290 }
291
292 /// Returns the gRPC configuration.
293 pub fn config(&self) -> &GrpcConfig {
294 self.client.config()
295 }
296
297 /// Closes the connection (blocking).
298 ///
299 /// Note: The connection is automatically closed when dropped.
300 ///
301 /// # Errors
302 ///
303 /// Returns [`crate::Error::Client`] if the underlying HTTP/2 channel cannot
304 /// be shut down cleanly.
305 pub fn close(self) -> Result<()> {
306 Ok(self.client.close()?)
307 }
308}
309
310/// Async gRPC connection to a Hyper database (query-only).
311///
312/// This is the async version of [`GrpcConnection`], designed for use with
313/// tokio-based async applications.
314///
315/// # Example
316///
317/// ```no_run
318/// # use hyperdb_api::grpc::GrpcConnectionAsync;
319/// #[tokio::main]
320/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
321/// let mut conn = GrpcConnectionAsync::connect(
322/// "http://localhost:7484",
323/// "my_database.hyper"
324/// ).await?;
325///
326/// let arrow_data = conn.execute_query_to_arrow("SELECT * FROM users").await?;
327/// Ok(())
328/// }
329/// ```
330#[derive(Debug)]
331pub struct GrpcConnectionAsync {
332 /// The underlying async gRPC client
333 client: GrpcClient,
334 /// Database path
335 database: Option<String>,
336}
337
338impl GrpcConnectionAsync {
339 /// Connects to a Hyper server via gRPC (async).
340 ///
341 /// # Arguments
342 ///
343 /// * `endpoint` - The gRPC endpoint URL (e.g., "<http://localhost:7484>")
344 /// * `database_path` - Path to the database to query
345 ///
346 /// # Errors
347 ///
348 /// Returns [`crate::Error::Client`] if the HTTP/2 channel cannot be
349 /// established (endpoint unreachable, TLS handshake failure).
350 pub async fn connect(endpoint: &str, database_path: &str) -> Result<Self> {
351 let config = GrpcConfig::new(endpoint).database(database_path);
352 let client = GrpcClient::connect(config).await?;
353
354 Ok(GrpcConnectionAsync {
355 client,
356 database: Some(database_path.to_string()),
357 })
358 }
359
360 /// Connects to a Hyper server via gRPC with custom configuration (async).
361 ///
362 /// # Errors
363 ///
364 /// Returns [`crate::Error::Client`] if the HTTP/2 channel cannot be
365 /// established with the supplied configuration.
366 pub async fn connect_with_config(config: GrpcConfig) -> Result<Self> {
367 let database = config.database_path().map(std::string::ToString::to_string);
368 let client = GrpcClient::connect(config).await?;
369
370 Ok(GrpcConnectionAsync { client, database })
371 }
372
373 /// Executes a SQL query and returns raw Arrow IPC bytes (async).
374 ///
375 /// # Errors
376 ///
377 /// Returns [`crate::Error::Client`] if the server rejects the query or the
378 /// HTTP/2 channel fails mid-stream.
379 pub async fn execute_query_to_arrow(&mut self, sql: &str) -> Result<bytes::Bytes> {
380 Ok(self.client.execute_query_to_arrow(sql).await?)
381 }
382
383 /// Executes a SQL query and returns a structured result (async).
384 ///
385 /// # Errors
386 ///
387 /// Returns [`crate::Error::Client`] if the server rejects the query or the
388 /// HTTP/2 channel fails.
389 pub async fn execute_query(&mut self, sql: &str) -> Result<GrpcQueryResult> {
390 Ok(self.client.execute_query(sql).await?)
391 }
392
393 /// Cancels an in-flight gRPC query by its `query_id` (async).
394 ///
395 /// See [`GrpcConnection::cancel_query`] for full semantics, including
396 /// the "Fallible by design" discussion of why this returns
397 /// `Result<()>` and why it is *not* an implementation of the
398 /// [`Cancellable`](hyperdb_api_core::client::Cancellable) trait. The async
399 /// variant avoids blocking the current thread; both variants route
400 /// the cancel over the same channel used for queries, carrying this
401 /// connection's database routing and custom headers.
402 ///
403 /// # Example
404 ///
405 /// ```no_run
406 /// # use hyperdb_api::grpc::GrpcConnectionAsync;
407 /// # async fn example(conn: &mut GrpcConnectionAsync, query_id: &str)
408 /// # -> hyperdb_api::Result<()> {
409 /// conn.cancel_query(query_id).await?;
410 /// # Ok(())
411 /// # }
412 /// ```
413 ///
414 /// # Errors
415 ///
416 /// See [`GrpcConnection::cancel_query`].
417 pub async fn cancel_query(&mut self, query_id: &str) -> Result<()> {
418 Ok(self.client.cancel_query(query_id).await?)
419 }
420
421 /// Returns the database path, if one is attached.
422 #[must_use]
423 pub fn database(&self) -> Option<&str> {
424 self.database.as_deref()
425 }
426
427 /// Returns the gRPC configuration.
428 pub fn config(&self) -> &GrpcConfig {
429 self.client.config()
430 }
431
432 /// Closes the connection (async).
433 ///
434 /// # Errors
435 ///
436 /// Returns [`crate::Error::Client`] if the underlying HTTP/2 channel cannot
437 /// be shut down cleanly.
438 pub async fn close(self) -> Result<()> {
439 Ok(self.client.close().await?)
440 }
441}
442
443// ============================================================================
444// Streaming chunk source adapter
445// ============================================================================
446
447/// Adapter from a [`hyperdb_api_core::client::grpc::GrpcChunkStreamSync`] to
448/// [`crate::arrow_result::ChunkSource`].
449///
450/// Used internally by [`Connection::execute_query`](crate::Connection::execute_query)
451/// on a gRPC transport so the high-level `Rowset` decodes record batches
452/// lazily instead of buffering the whole Arrow IPC payload in memory.
453pub(crate) struct GrpcChunkStreamSource {
454 inner: hyperdb_api_core::client::grpc::GrpcChunkStreamSync,
455}
456
457impl GrpcChunkStreamSource {
458 pub(crate) fn new(inner: hyperdb_api_core::client::grpc::GrpcChunkStreamSync) -> Self {
459 GrpcChunkStreamSource { inner }
460 }
461}
462
463impl crate::arrow_result::ChunkSource for GrpcChunkStreamSource {
464 fn next_chunk(&mut self) -> crate::Result<Option<bytes::Bytes>> {
465 Ok(self.inner.next_chunk()?)
466 }
467}