hyperdb_api_core/client/grpc/client.rs
1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! gRPC client for Hyper database.
5//!
6//! This module provides the [`GrpcClient`] struct for executing queries against
7//! Hyper servers via gRPC.
8
9use std::sync::Arc;
10
11use tonic::transport::{Channel, Endpoint};
12use tracing::{debug, info, warn};
13
14use crate::client::error::{Error, ErrorKind, Result};
15
16use super::config::GrpcConfig;
17use super::error::from_grpc_status;
18use super::executor::{GrpcChunkStream, GrpcQueryExecutor};
19use super::params::{ParameterStyle, QueryParameters};
20use super::proto::hyper_service::query_param::TransferMode;
21use super::proto::{
22 AttachedDatabase, CancelQueryParam, HyperServiceClient, OutputFormat, QueryParam,
23};
24use super::result::GrpcQueryResult;
25
26/// Async gRPC client for Hyper database.
27///
28/// `GrpcClient` provides query-only access to Hyper databases via gRPC.
29/// Results are returned in Apache Arrow IPC format.
30///
31/// gRPC transport is always available - no feature flags required.
32///
33/// # Limitations
34///
35/// The gRPC interface is **read-only**:
36/// - Only SELECT queries are supported
37/// - No INSERT, UPDATE, DELETE, or DDL operations
38/// - No COPY protocol for bulk data insertion
39///
40/// For write operations, use the standard TCP [`Client`](crate::client::Client).
41///
42/// # Example
43///
44/// ```no_run
45/// use hyperdb_api_core::client::grpc::{GrpcClient, GrpcConfig};
46///
47/// #[tokio::main]
48/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
49/// let config = GrpcConfig::new("http://localhost:7484")
50/// .database("my_database.hyper");
51///
52/// let mut client = GrpcClient::connect(config).await?;
53///
54/// // Execute a query
55/// let result = client.execute_query("SELECT * FROM users").await?;
56/// let arrow_data = result.arrow_data();
57///
58/// // Process arrow_data with arrow crate...
59///
60/// client.close().await?;
61/// Ok(())
62/// }
63/// ```
64#[derive(Debug)]
65pub struct GrpcClient {
66 /// The underlying gRPC channel
67 channel: Channel,
68 /// Client configuration
69 config: GrpcConfig,
70}
71
72impl GrpcClient {
73 /// Connects to a Hyper server via gRPC.
74 ///
75 /// # Example
76 ///
77 /// ```no_run
78 /// use hyperdb_api_core::client::grpc::{GrpcClient, GrpcConfig};
79 ///
80 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
81 /// let config = GrpcConfig::new("http://localhost:7484")
82 /// .database("test.hyper");
83 ///
84 /// let client = GrpcClient::connect(config).await?;
85 /// # Ok(())
86 /// # }
87 /// ```
88 ///
89 /// # Errors
90 ///
91 /// - Returns [`ErrorKind::Config`] if `config.endpoint` is not a
92 /// well-formed URI, or if TLS configuration fails.
93 /// - Returns [`ErrorKind::Connection`] if the gRPC transport
94 /// cannot establish a channel to the endpoint.
95 pub async fn connect(config: GrpcConfig) -> Result<Self> {
96 info!(endpoint = %config.endpoint, "Connecting to Hyper via gRPC");
97
98 let endpoint = Endpoint::from_shared(config.endpoint.clone())
99 .map_err(|e| Error::new(ErrorKind::Config, format!("Invalid gRPC endpoint: {e}")))?;
100
101 // Configure timeouts
102 let endpoint = endpoint
103 .connect_timeout(config.connect_timeout)
104 .timeout(config.request_timeout);
105
106 // Configure TLS if needed
107 let endpoint = if config.use_tls {
108 // Use system root certificates for TLS validation
109 let tls_config = tonic::transport::ClientTlsConfig::new().with_enabled_roots();
110
111 endpoint.tls_config(tls_config).map_err(|e| {
112 Error::new(ErrorKind::Config, format!("TLS configuration error: {e}"))
113 })?
114 } else {
115 endpoint
116 };
117
118 // Connect
119 let channel = endpoint.connect().await.map_err(|e| {
120 debug!("gRPC connection error details: {:?}", e);
121 Error::new(
122 ErrorKind::Connection,
123 format!("Failed to connect to gRPC endpoint: {e} (details: {e:?})"),
124 )
125 })?;
126
127 debug!("gRPC channel established");
128
129 Ok(GrpcClient { channel, config })
130 }
131
132 /// Returns the underlying gRPC channel.
133 ///
134 /// This can be used for advanced use cases like channel cloning or
135 /// direct stub access.
136 #[must_use]
137 pub fn channel(&self) -> &Channel {
138 &self.channel
139 }
140
141 /// Returns the client configuration.
142 pub fn config(&self) -> &GrpcConfig {
143 &self.config
144 }
145
146 /// Executes a SQL query and returns the result.
147 ///
148 /// Results are returned in Apache Arrow IPC format. Use the `arrow_data()`
149 /// method on the result to get the raw Arrow bytes.
150 ///
151 /// # Example
152 ///
153 /// ```no_run
154 /// use hyperdb_api_core::client::grpc::{GrpcClient, GrpcConfig};
155 ///
156 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
157 /// # let config = GrpcConfig::new("http://localhost:7484");
158 /// # let mut client = GrpcClient::connect(config).await?;
159 /// let result = client.execute_query("SELECT * FROM users LIMIT 10").await?;
160 /// let arrow_bytes = result.arrow_data();
161 /// # Ok(())
162 /// # }
163 /// ```
164 ///
165 /// # Errors
166 ///
167 /// Returns an error if:
168 /// - The query syntax is invalid
169 /// - The referenced tables/columns don't exist
170 /// - A non-SELECT query is executed (gRPC is read-only)
171 /// - The connection is lost
172 pub async fn execute_query(&mut self, sql: &str) -> Result<GrpcQueryResult> {
173 self.execute_query_with_options(sql, OutputFormat::ArrowIpc, self.config.transfer_mode)
174 .await
175 }
176
177 /// Executes a query and returns raw Arrow IPC bytes.
178 ///
179 /// This is a convenience method that extracts the Arrow data from the result.
180 ///
181 /// # Example
182 ///
183 /// ```no_run
184 /// use hyperdb_api_core::client::grpc::{GrpcClient, GrpcConfig};
185 ///
186 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
187 /// # let config = GrpcConfig::new("http://localhost:7484");
188 /// # let mut client = GrpcClient::connect(config).await?;
189 /// let arrow_bytes = client.execute_query_to_arrow("SELECT * FROM users").await?;
190 /// // Parse with arrow crate...
191 /// # Ok(())
192 /// # }
193 /// ```
194 ///
195 /// # Errors
196 ///
197 /// Same failure modes as [`Self::execute_query`] (invalid SQL,
198 /// missing tables/columns, non-SELECT mutation attempts, or
199 /// connection loss).
200 pub async fn execute_query_to_arrow(&mut self, sql: &str) -> Result<bytes::Bytes> {
201 let result = self.execute_query(sql).await?;
202 Ok(result.into_arrow_data())
203 }
204
205 /// Executes a parameterized SQL query.
206 ///
207 /// This provides SQL injection prevention and type safety by separating
208 /// the query from its parameters.
209 ///
210 /// # Arguments
211 ///
212 /// * `sql` - SQL query with parameter placeholders
213 /// * `params` - Query parameters (JSON or Arrow encoded)
214 /// * `style` - Parameter style used in the query
215 ///
216 /// # Example
217 ///
218 /// ```no_run
219 /// use hyperdb_api_core::client::grpc::{GrpcClient, GrpcConfig, QueryParameters, ParameterStyle};
220 ///
221 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
222 /// # let config = GrpcConfig::new("http://localhost:7484");
223 /// # let mut client = GrpcClient::connect(config).await?;
224 /// // Dollar-numbered parameters (mixed types use from_json_value)
225 /// let params = QueryParameters::from_json_value(&serde_json::json!([42, "Alice"]))?;
226 /// let result = client.execute_query_with_params(
227 /// "SELECT * FROM users WHERE id = $1 AND name = $2",
228 /// params,
229 /// ParameterStyle::DollarNumbered,
230 /// ).await?;
231 ///
232 /// // Named parameters
233 /// let params = QueryParameters::json_named()
234 /// .add("min_age", &18)?
235 /// .build();
236 /// let result = client.execute_query_with_params(
237 /// "SELECT * FROM users WHERE age >= :min_age",
238 /// params,
239 /// ParameterStyle::Named,
240 /// ).await?;
241 /// # Ok(())
242 /// # }
243 /// ```
244 ///
245 /// # Errors
246 ///
247 /// Same failure modes as [`Self::execute_query`], plus any
248 /// parameter-related error reported by the server (unknown
249 /// placeholder, type coercion failure, shape mismatch between the
250 /// SQL placeholders and the supplied parameter set).
251 pub async fn execute_query_with_params(
252 &mut self,
253 sql: &str,
254 params: QueryParameters,
255 style: ParameterStyle,
256 ) -> Result<GrpcQueryResult> {
257 self.execute_query_with_params_and_options(
258 sql,
259 params,
260 style,
261 OutputFormat::ArrowIpc,
262 self.config.transfer_mode,
263 )
264 .await
265 }
266
267 /// Executes a parameterized query and returns raw Arrow IPC bytes.
268 ///
269 /// # Example
270 ///
271 /// ```no_run
272 /// use hyperdb_api_core::client::grpc::{GrpcClient, GrpcConfig, QueryParameters, ParameterStyle};
273 ///
274 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
275 /// # let config = GrpcConfig::new("http://localhost:7484");
276 /// # let mut client = GrpcClient::connect(config).await?;
277 /// let params = QueryParameters::json_positional(&[&42i64])?;
278 /// let arrow_bytes = client.execute_query_with_params_to_arrow(
279 /// "SELECT * FROM users WHERE id = $1",
280 /// params,
281 /// ParameterStyle::DollarNumbered,
282 /// ).await?;
283 /// # Ok(())
284 /// # }
285 /// ```
286 ///
287 /// # Errors
288 ///
289 /// Same failure modes as [`Self::execute_query_with_params`].
290 pub async fn execute_query_with_params_to_arrow(
291 &mut self,
292 sql: &str,
293 params: QueryParameters,
294 style: ParameterStyle,
295 ) -> Result<bytes::Bytes> {
296 let result = self.execute_query_with_params(sql, params, style).await?;
297 Ok(result.into_arrow_data())
298 }
299
300 /// Executes a parameterized query with specific options.
301 ///
302 /// This allows full control over the output format and transfer mode.
303 ///
304 /// # Errors
305 ///
306 /// - Returns [`ErrorKind::Protocol`] if the server returns no
307 /// result chunks and does not signal completion.
308 /// - Propagates any error from the underlying
309 /// `GrpcQueryExecutor` — auth failure, transport error, or
310 /// server-side SQL error surfaced as [`tonic::Status`].
311 pub async fn execute_query_with_params_and_options(
312 &mut self,
313 sql: &str,
314 params: QueryParameters,
315 style: ParameterStyle,
316 output_format: OutputFormat,
317 transfer_mode: TransferMode,
318 ) -> Result<GrpcQueryResult> {
319 debug!(
320 sql = %sql,
321 param_style = ?style,
322 format = ?output_format,
323 mode = ?transfer_mode,
324 "Executing parameterized query"
325 );
326
327 // Build the query parameter
328 let query_param = QueryParam {
329 query: sql.to_string(),
330 databases: self.build_attached_databases(),
331 output_format: output_format.into(),
332 settings: self.config.settings.clone(),
333 transfer_mode: transfer_mode.into(),
334 param_style: i32::from(style),
335 parameters: Some(params.into_proto()),
336 result_range: None,
337 query_row_limit: None,
338 };
339
340 // Build headers for authentication and routing
341 let headers = self.build_headers();
342
343 // Create executor with configured message size limits
344 let client = HyperServiceClient::new(self.channel.clone())
345 .max_decoding_message_size(self.config.max_decoding_message_size)
346 .max_encoding_message_size(self.config.max_encoding_message_size);
347 let mut executor = GrpcQueryExecutor::new(client, headers, transfer_mode);
348
349 // Execute the query
350 executor.execute(query_param).await?;
351
352 // Collect all result chunks
353 let mut final_result = GrpcQueryResult::default();
354
355 loop {
356 if let Some(mut partial_result) = executor.next_result().await? {
357 // Merge chunks
358 while let Some(chunk) = partial_result.take_chunk() {
359 final_result.chunks.push_back(chunk);
360 }
361 // Copy metadata from last result
362 if partial_result.query_id.is_some() {
363 final_result.query_id = partial_result.query_id;
364 }
365 if partial_result.schema.is_some() {
366 final_result.schema = partial_result.schema;
367 }
368 if partial_result.rows_affected.is_some() {
369 final_result.rows_affected = partial_result.rows_affected;
370 }
371 // Check if complete
372 if partial_result.is_complete {
373 final_result.is_complete = true;
374 break;
375 }
376 } else {
377 // No more results
378 final_result.is_complete = true;
379 break;
380 }
381 }
382
383 if final_result.chunks.is_empty() && !final_result.is_complete {
384 return Err(Error::new(ErrorKind::Protocol, "No result from query"));
385 }
386
387 Ok(final_result)
388 }
389
390 /// Executes a query with specific options.
391 ///
392 /// This allows control over the output format and transfer mode.
393 ///
394 /// # Errors
395 ///
396 /// - Returns [`ErrorKind::Protocol`] if the server returns no
397 /// result chunks and does not signal completion.
398 /// - Propagates any error from the underlying
399 /// `GrpcQueryExecutor` — auth failure, transport error, or
400 /// server-side SQL error surfaced as [`tonic::Status`].
401 pub async fn execute_query_with_options(
402 &mut self,
403 sql: &str,
404 output_format: OutputFormat,
405 transfer_mode: TransferMode,
406 ) -> Result<GrpcQueryResult> {
407 debug!(sql = %sql, format = ?output_format, mode = ?transfer_mode, "Executing query");
408
409 // Build the query parameter
410 let query_param = QueryParam {
411 query: sql.to_string(),
412 databases: self.build_attached_databases(),
413 output_format: output_format.into(),
414 settings: self.config.settings.clone(),
415 transfer_mode: transfer_mode.into(),
416 param_style: 0, // Default
417 parameters: None,
418 result_range: None,
419 query_row_limit: None,
420 };
421
422 // Build headers for authentication and routing
423 let headers = self.build_headers();
424
425 // Create executor with configured message size limits
426 let client = HyperServiceClient::new(self.channel.clone())
427 .max_decoding_message_size(self.config.max_decoding_message_size)
428 .max_encoding_message_size(self.config.max_encoding_message_size);
429 let mut executor = GrpcQueryExecutor::new(client, headers, transfer_mode);
430
431 // Execute the query
432 executor.execute(query_param).await?;
433
434 // Collect all result chunks
435 let mut final_result = GrpcQueryResult::default();
436
437 loop {
438 if let Some(mut partial_result) = executor.next_result().await? {
439 // Merge chunks
440 while let Some(chunk) = partial_result.take_chunk() {
441 final_result.chunks.push_back(chunk);
442 }
443 // Copy metadata from last result
444 if partial_result.query_id.is_some() {
445 final_result.query_id = partial_result.query_id;
446 }
447 if partial_result.schema.is_some() {
448 final_result.schema = partial_result.schema;
449 }
450 if partial_result.rows_affected.is_some() {
451 final_result.rows_affected = partial_result.rows_affected;
452 }
453 // Check if complete
454 if partial_result.is_complete {
455 final_result.is_complete = true;
456 break;
457 }
458 } else {
459 // No more results
460 final_result.is_complete = true;
461 break;
462 }
463 }
464
465 if final_result.chunks.is_empty() && !final_result.is_complete {
466 return Err(Error::new(ErrorKind::Protocol, "No result from query"));
467 }
468
469 Ok(final_result)
470 }
471
472 /// Executes a query and returns a streaming chunk producer.
473 ///
474 /// Unlike [`execute_query`](Self::execute_query), which drains every
475 /// result chunk into a single [`GrpcQueryResult`] before returning, this
476 /// method yields chunks lazily: each call to
477 /// [`GrpcChunkStream::next_chunk`] pulls just enough from the HTTP/2
478 /// stream to produce one Arrow IPC byte chunk. For very large result
479 /// sets (hundreds of MB to GB) this keeps client memory bounded by a
480 /// single gRPC message (capped at the tonic
481 /// `max_decoding_message_size`, default 64 MB) rather than growing to
482 /// the full result size.
483 ///
484 /// Pair this with
485 /// [`hyperdb_api::ArrowRowset::from_stream`][arrow_rowset_from_stream] to
486 /// decode batches incrementally and keep peak memory constant regardless
487 /// of total row count.
488 ///
489 /// [arrow_rowset_from_stream]: https://docs.rs/hyperdb-api/latest/hyperdb_api/struct.ArrowRowset.html#method.from_stream
490 ///
491 /// # Errors
492 ///
493 /// Same failure modes as
494 /// [`Self::execute_query_stream_with_options`] — invalid SQL,
495 /// auth failure, transport error, etc.
496 pub async fn execute_query_stream(&mut self, sql: &str) -> Result<GrpcChunkStream> {
497 self.execute_query_stream_with_options(
498 sql,
499 OutputFormat::ArrowIpc,
500 self.config.transfer_mode,
501 )
502 .await
503 }
504
505 /// Streaming variant of [`execute_query_with_options`](Self::execute_query_with_options).
506 ///
507 /// # Errors
508 ///
509 /// Propagates any error from the initial
510 /// `GrpcQueryExecutor::execute` call — server-side SQL error,
511 /// auth failure, or transport-level gRPC error.
512 pub async fn execute_query_stream_with_options(
513 &mut self,
514 sql: &str,
515 output_format: OutputFormat,
516 transfer_mode: TransferMode,
517 ) -> Result<GrpcChunkStream> {
518 debug!(sql = %sql, format = ?output_format, mode = ?transfer_mode, "Executing streaming query");
519
520 let query_param = QueryParam {
521 query: sql.to_string(),
522 databases: self.build_attached_databases(),
523 output_format: output_format.into(),
524 settings: self.config.settings.clone(),
525 transfer_mode: transfer_mode.into(),
526 param_style: 0,
527 parameters: None,
528 result_range: None,
529 query_row_limit: None,
530 };
531
532 let headers = self.build_headers();
533
534 let client = HyperServiceClient::new(self.channel.clone())
535 .max_decoding_message_size(self.config.max_decoding_message_size)
536 .max_encoding_message_size(self.config.max_encoding_message_size);
537 let mut executor = GrpcQueryExecutor::new(client, headers, transfer_mode);
538
539 executor.execute(query_param).await?;
540
541 Ok(GrpcChunkStream::new(executor))
542 }
543
544 /// Cancels an in-flight gRPC query by its `query_id`.
545 ///
546 /// This is the gRPC analogue of the PG wire `CancelRequest` packet: it
547 /// tells the server to stop executing a previously-started query. Unlike
548 /// PG wire (where the cancel travels on a *fresh* connection), gRPC
549 /// cancels travel as a regular RPC multiplexed over the existing HTTP/2
550 /// channel — that's why this call shares `self.channel` with normal
551 /// query traffic.
552 ///
553 /// # When do you have a `query_id`?
554 ///
555 /// The server assigns a `query_id` for queries started in
556 /// [`TransferMode::Async`](super::proto::hyper_service::query_param::TransferMode)
557 /// (long-running queries that the client polls). Grab it from
558 /// [`GrpcQueryResult::query_id`](super::result::GrpcQueryResult::query_id)
559 /// after `execute_query_with_options(..., TransferMode::Async)` returns.
560 /// SYNC-mode queries typically complete before the client needs a
561 /// cancel — for those, just drop the in-flight future.
562 ///
563 /// # Query-id lifecycle
564 ///
565 /// Query ids are stable for the lifetime of a query and are
566 /// server-assigned — a given id is never silently re-used for a
567 /// different query (Hyper generates them as UUID-like opaque tokens,
568 /// not sequential counters). The only race a caller needs to
569 /// consider is between obtaining the id and calling `cancel_query`:
570 ///
571 /// - If the query is still running, the cancel lands and the server
572 /// aborts it.
573 /// - If the query has already completed normally between "obtain id"
574 /// and "cancel", the server sees a cancel for an unknown /
575 /// completed query and handles it gracefully (the exact shape
576 /// depends on server build — see the tests in
577 /// `hyperdb-api/tests/grpc_cancel_tests.rs` for details). Either way
578 /// the channel stays healthy.
579 ///
580 /// There is no scenario where a stale id causes a cancel to target
581 /// the *wrong* query, because ids are not reassigned.
582 ///
583 /// # Errors
584 ///
585 /// Propagates transport-level errors. A successful cancel returns
586 /// `Ok(())` even if the query had already completed on the server;
587 /// cancellation is best-effort by design.
588 ///
589 /// # Relation to the [`Cancellable`](crate::client::cancel::Cancellable) trait
590 ///
591 /// This is the **fallible user-facing cancel API**: it returns a
592 /// `Result<()>` so explicit callers can observe transport-level
593 /// failures and react accordingly.
594 ///
595 /// It is *not* an implementation of the
596 /// [`Cancellable`](crate::client::cancel::Cancellable) trait — and cannot
597 /// be, because `Cancellable::cancel(&self)` takes no arguments while
598 /// gRPC cancels need a per-query `query_id`. A `GrpcClient` can have
599 /// many concurrent queries in flight; there is no single "the"
600 /// query on it the way there is on a PG wire connection. A future
601 /// gRPC streaming result type (when one is introduced) would carry
602 /// its `query_id` in a dedicated handle like
603 /// `GrpcCancelHandle { client, query_id }`, and *that* handle
604 /// would `impl Cancellable` by wrapping this method and swallowing
605 /// errors — same shape as
606 /// [`impl Cancellable for Client`](crate::client::cancel::Cancellable).
607 /// See the [`Cancellable`](crate::client::cancel::Cancellable) trait docs
608 /// for the full wrapper pattern.
609 ///
610 /// # Example
611 ///
612 /// ```no_run
613 /// use hyperdb_api_core::client::grpc::{GrpcClient, GrpcConfig, OutputFormat, TransferMode};
614 ///
615 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
616 /// # let config = GrpcConfig::new("http://localhost:7484");
617 /// # let mut client = GrpcClient::connect(config).await?;
618 /// let result = client
619 /// .execute_query_with_options(
620 /// "SELECT * FROM very_large_table",
621 /// OutputFormat::ArrowIpc,
622 /// TransferMode::Async,
623 /// )
624 /// .await?;
625 ///
626 /// if let Some(query_id) = result.query_id() {
627 /// // Some time later, decide to abort:
628 /// client.cancel_query(query_id).await?;
629 /// }
630 /// # Ok(())
631 /// # }
632 /// ```
633 pub async fn cancel_query(&mut self, query_id: &str) -> Result<()> {
634 debug!(query_id = %query_id, "Cancelling gRPC query");
635
636 let param = CancelQueryParam {
637 query_id: query_id.to_string(),
638 };
639 let mut request = tonic::Request::new(param);
640
641 // Apply the client's standard headers (database routing, custom
642 // headers). Matches what the query path does so that any server-side
643 // routing based on headers lands the cancel on the same backend as
644 // the query it's trying to cancel.
645 //
646 // Header parse failures are logged at `warn!` and then skipped —
647 // the cancel goes out without that particular header rather than
648 // failing the whole operation. A missing custom header is strictly
649 // better than a cancel we never send. The warn! is the only
650 // operational signal that routing-critical headers (e.g. the
651 // database selector) were dropped, so don't silence it.
652 for (key, value) in self.build_headers() {
653 match (
654 key.parse::<tonic::metadata::MetadataKey<_>>(),
655 value.parse(),
656 ) {
657 (Ok(k), Ok(v)) => {
658 request.metadata_mut().insert(k, v);
659 }
660 (key_res, value_res) => {
661 warn!(
662 target: "hyperdb_api_core::client",
663 query_id = %query_id,
664 header_key = %key,
665 key_parse_ok = key_res.is_ok(),
666 value_parse_ok = value_res.is_ok(),
667 "cancel: header parse failed, dropping header from cancel request",
668 );
669 }
670 }
671 }
672 // Also set the canonical x-hyperdb-query-id metadata — some server
673 // deployments route cancels based on this header rather than the
674 // payload body.
675 match query_id.parse() {
676 Ok(value) => {
677 request.metadata_mut().insert("x-hyperdb-query-id", value);
678 }
679 Err(e) => {
680 warn!(
681 target: "hyperdb_api_core::client",
682 query_id = %query_id,
683 error = %e,
684 "cancel: x-hyperdb-query-id header parse failed; \
685 cancel routing may fall back to payload-based lookup",
686 );
687 }
688 }
689
690 let mut client = HyperServiceClient::new(self.channel.clone())
691 .max_decoding_message_size(self.config.max_decoding_message_size)
692 .max_encoding_message_size(self.config.max_encoding_message_size);
693 client
694 .cancel_query(request)
695 .await
696 .map_err(from_grpc_status)?;
697
698 info!(query_id = %query_id, "gRPC query cancelled");
699 Ok(())
700 }
701
702 #[expect(
703 clippy::unused_async,
704 reason = "async fn retained for API symmetry; callers await regardless of whether the current body is synchronous"
705 )]
706 /// Closes the gRPC connection.
707 ///
708 /// This is a no-op as tonic channels are reference-counted and will be
709 /// closed when the last reference is dropped.
710 ///
711 /// # Errors
712 ///
713 /// Currently infallible — always returns `Ok(())`. The `Result`
714 /// return type is preserved for API symmetry with
715 /// [`GrpcClientSync::close`] and for forward compatibility if
716 /// future tonic channels expose a fallible shutdown.
717 pub async fn close(self) -> Result<()> {
718 debug!("Closing gRPC connection");
719 // Channel is dropped automatically
720 Ok(())
721 }
722
723 /// Builds the attached databases list from configuration.
724 fn build_attached_databases(&self) -> Vec<AttachedDatabase> {
725 if let Some(db_path) = &self.config.database {
726 debug!(db_path = %db_path, "Attaching database for query");
727 // Check if it's a JSON array (multiple databases)
728 if db_path.starts_with('[') {
729 // Parse JSON - for now just use as single database
730 // TODO: Implement proper JSON parsing for multiple databases
731 vec![AttachedDatabase {
732 path: db_path.clone(),
733 alias: String::new(), // Empty alias means use default
734 }]
735 } else {
736 vec![AttachedDatabase {
737 path: db_path.clone(),
738 alias: String::new(), // Empty alias means use default
739 }]
740 }
741 } else {
742 debug!("No database configured on gRPC client — query will run without attachment");
743 vec![]
744 }
745 }
746
747 /// Builds headers for gRPC requests.
748 fn build_headers(&self) -> Vec<(String, String)> {
749 let mut headers: Vec<(String, String)> = self
750 .config
751 .headers
752 .iter()
753 .map(|(k, v)| (k.clone(), v.clone()))
754 .collect();
755
756 // Add database header if configured
757 if let Some(ref db) = self.config.database {
758 headers.push(("x-hyper-database".to_string(), db.clone()));
759 }
760
761 headers
762 }
763}
764
765/// Synchronous wrapper around [`GrpcClient`].
766///
767/// This provides a blocking API by creating a Tokio runtime internally.
768/// For better performance in async contexts, use [`GrpcClient`] directly.
769///
770/// # Example
771///
772/// ```no_run
773/// use hyperdb_api_core::client::grpc::{GrpcClientSync, GrpcConfig};
774///
775/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
776/// let config = GrpcConfig::new("http://localhost:7484")
777/// .database("test.hyper");
778///
779/// let mut client = GrpcClientSync::connect(config)?;
780/// let result = client.execute_query("SELECT * FROM users")?;
781/// let arrow_bytes = result.arrow_data();
782/// # Ok(())
783/// # }
784/// ```
785#[derive(Debug)]
786pub struct GrpcClientSync {
787 /// The async client
788 inner: GrpcClient,
789 /// Tokio runtime for blocking operations.
790 ///
791 /// Wrapped in `Arc` so streaming chunk producers
792 /// ([`GrpcChunkStreamSync`]) can share the same runtime without having to
793 /// borrow the client or create their own.
794 runtime: Arc<tokio::runtime::Runtime>,
795}
796
797impl GrpcClientSync {
798 /// Connects to a Hyper server via gRPC (blocking).
799 ///
800 /// # Errors
801 ///
802 /// - Returns [`ErrorKind::Other`] if a current-thread Tokio
803 /// runtime cannot be built.
804 /// - Propagates any error from [`GrpcClient::connect`] (invalid
805 /// endpoint, TLS configuration failure, or transport setup
806 /// failure).
807 pub fn connect(config: GrpcConfig) -> Result<Self> {
808 let runtime = tokio::runtime::Builder::new_current_thread()
809 .enable_all()
810 .build()
811 .map_err(|e| {
812 Error::new(
813 ErrorKind::Other,
814 format!("Failed to create Tokio runtime: {e}"),
815 )
816 })?;
817
818 let inner = runtime.block_on(GrpcClient::connect(config))?;
819
820 Ok(GrpcClientSync {
821 inner,
822 runtime: Arc::new(runtime),
823 })
824 }
825
826 /// Executes a SQL query (blocking).
827 ///
828 /// # Errors
829 ///
830 /// Blocking wrapper around [`GrpcClient::execute_query`]; see that
831 /// method for the concrete failure modes.
832 pub fn execute_query(&mut self, sql: &str) -> Result<GrpcQueryResult> {
833 self.runtime.block_on(self.inner.execute_query(sql))
834 }
835
836 /// Executes a query and returns Arrow IPC bytes (blocking).
837 ///
838 /// # Errors
839 ///
840 /// Same failure modes as [`Self::execute_query`].
841 pub fn execute_query_to_arrow(&mut self, sql: &str) -> Result<bytes::Bytes> {
842 self.runtime
843 .block_on(self.inner.execute_query_to_arrow(sql))
844 }
845
846 /// Executes a query and returns a blocking streaming chunk producer.
847 ///
848 /// See [`GrpcClient::execute_query_stream`] for the streaming semantics
849 /// and memory behavior. The returned [`GrpcChunkStreamSync`] lets you
850 /// pull chunks one at a time without buffering the entire result.
851 ///
852 /// # Errors
853 ///
854 /// Same failure modes as [`GrpcClient::execute_query_stream`].
855 pub fn execute_query_stream(&mut self, sql: &str) -> Result<GrpcChunkStreamSync> {
856 let inner = self
857 .runtime
858 .block_on(self.inner.execute_query_stream(sql))?;
859 Ok(GrpcChunkStreamSync {
860 inner,
861 runtime: Arc::clone(&self.runtime),
862 })
863 }
864
865 /// Executes a parameterized SQL query (blocking).
866 ///
867 /// # Example
868 ///
869 /// ```no_run
870 /// use hyperdb_api_core::client::grpc::{GrpcClientSync, GrpcConfig, QueryParameters, ParameterStyle};
871 ///
872 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
873 /// # let config = GrpcConfig::new("http://localhost:7484");
874 /// # let mut client = GrpcClientSync::connect(config)?;
875 /// let params = QueryParameters::json_positional(&[&42i64])?;
876 /// let result = client.execute_query_with_params(
877 /// "SELECT * FROM users WHERE id = $1",
878 /// params,
879 /// ParameterStyle::DollarNumbered,
880 /// )?;
881 /// # Ok(())
882 /// # }
883 /// ```
884 ///
885 /// # Errors
886 ///
887 /// Blocking wrapper around
888 /// [`GrpcClient::execute_query_with_params`]; see that method for
889 /// the concrete failure modes.
890 pub fn execute_query_with_params(
891 &mut self,
892 sql: &str,
893 params: QueryParameters,
894 style: ParameterStyle,
895 ) -> Result<GrpcQueryResult> {
896 self.runtime
897 .block_on(self.inner.execute_query_with_params(sql, params, style))
898 }
899
900 /// Executes a parameterized query and returns Arrow IPC bytes (blocking).
901 ///
902 /// # Errors
903 ///
904 /// Same failure modes as [`Self::execute_query_with_params`].
905 pub fn execute_query_with_params_to_arrow(
906 &mut self,
907 sql: &str,
908 params: QueryParameters,
909 style: ParameterStyle,
910 ) -> Result<bytes::Bytes> {
911 self.runtime.block_on(
912 self.inner
913 .execute_query_with_params_to_arrow(sql, params, style),
914 )
915 }
916
917 /// Cancels an in-flight gRPC query by its `query_id` (blocking).
918 ///
919 /// Blocking wrapper around
920 /// [`GrpcClient::cancel_query`]. See that method's documentation for
921 /// when a `query_id` is available (ASYNC-mode queries), best-effort
922 /// cancel semantics, and the full "Relation to the `Cancellable`
923 /// trait" discussion.
924 ///
925 /// # Fallible by design
926 ///
927 /// The `Result<()>` return is intentional and mirrors the async
928 /// `GrpcClient::cancel_query`. Explicit callers get to observe
929 /// transport-level failures (network errors, channel closed, auth
930 /// expired) so they can record metrics, retry, or surface "cancel
931 /// failed" UX. This is *not* an `impl Cancellable for GrpcClientSync`
932 /// — it cannot be, because `Cancellable::cancel(&self)` takes no
933 /// arguments and has no way to pass the `query_id`. See the
934 /// [`Cancellable`](crate::client::cancel::Cancellable) trait docs for the
935 /// infallible-wrapper pattern used by `Drop`-path consumers.
936 ///
937 /// # Example
938 ///
939 /// ```no_run
940 /// use hyperdb_api_core::client::grpc::{GrpcClientSync, GrpcConfig};
941 ///
942 /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
943 /// # let config = GrpcConfig::new("http://localhost:7484");
944 /// # let mut client = GrpcClientSync::connect(config)?;
945 /// # let query_id = "some-query-id";
946 /// client.cancel_query(query_id)?;
947 /// # Ok(())
948 /// # }
949 /// ```
950 ///
951 /// # Errors
952 ///
953 /// Same failure modes as [`GrpcClient::cancel_query`] —
954 /// transport-level errors bubble up; a cancel for an
955 /// already-completed query returns `Ok(())` by design.
956 pub fn cancel_query(&mut self, query_id: &str) -> Result<()> {
957 self.runtime.block_on(self.inner.cancel_query(query_id))
958 }
959
960 /// Returns the client configuration.
961 pub fn config(&self) -> &GrpcConfig {
962 self.inner.config()
963 }
964
965 /// Closes the connection (blocking).
966 ///
967 /// # Errors
968 ///
969 /// Currently infallible — always returns `Ok(())`. The `Result`
970 /// return type is preserved for API symmetry with async callers.
971 pub fn close(self) -> Result<()> {
972 self.runtime.block_on(self.inner.close())
973 }
974}
975
976/// Blocking wrapper around [`GrpcChunkStream`].
977///
978/// Returned by [`GrpcClientSync::execute_query_stream`] and the
979/// `AuthenticatedGrpcClientSync` equivalent. Yields Arrow IPC byte chunks
980/// one at a time, blocking on the shared Tokio runtime as needed.
981///
982/// Pair with
983/// [`hyperdb_api::ArrowRowset::from_stream`][arrow_rowset_from_stream] to
984/// decode Arrow record batches incrementally with constant client memory.
985///
986/// [arrow_rowset_from_stream]: https://docs.rs/hyperdb-api/latest/hyperdb_api/struct.ArrowRowset.html#method.from_stream
987#[derive(Debug)]
988pub struct GrpcChunkStreamSync {
989 inner: GrpcChunkStream,
990 runtime: Arc<tokio::runtime::Runtime>,
991}
992
993impl GrpcChunkStreamSync {
994 /// Returns the next Arrow IPC byte chunk, or `None` when the stream is
995 /// complete.
996 ///
997 /// # Errors
998 ///
999 /// Same failure modes as [`GrpcChunkStream::next_chunk`] —
1000 /// transport errors and server-side query failures surface as
1001 /// [`Error`].
1002 pub fn next_chunk(&mut self) -> Result<Option<bytes::Bytes>> {
1003 self.runtime.block_on(self.inner.next_chunk())
1004 }
1005
1006 /// Returns the schema reported by the server, if one has been received yet.
1007 pub fn schema(&self) -> Option<&super::proto::QueryResultSchema> {
1008 self.inner.schema()
1009 }
1010
1011 /// Returns the server-assigned query ID, if one has been received.
1012 pub fn query_id(&self) -> Option<&str> {
1013 self.inner.query_id()
1014 }
1015
1016 /// Returns the affected row count for DML queries, if reported.
1017 pub fn rows_affected(&self) -> Option<u64> {
1018 self.inner.rows_affected()
1019 }
1020}