Skip to main content

hyperdb_api_core/client/grpc/
client.rs

1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! gRPC client for Hyper database.
5//!
6//! This module provides the [`GrpcClient`] struct for executing queries against
7//! Hyper servers via gRPC.
8
9use std::sync::Arc;
10
11use tonic::transport::{Channel, Endpoint};
12use tracing::{debug, info, warn};
13
14use crate::client::error::{Error, ErrorKind, Result};
15
16use super::config::GrpcConfig;
17use super::error::from_grpc_status;
18use super::executor::{GrpcChunkStream, GrpcQueryExecutor};
19use super::params::{ParameterStyle, QueryParameters};
20use super::proto::hyper_service::query_param::TransferMode;
21use super::proto::{
22    AttachedDatabase, CancelQueryParam, HyperServiceClient, OutputFormat, QueryParam,
23};
24use super::result::GrpcQueryResult;
25
26/// Async gRPC client for Hyper database.
27///
28/// `GrpcClient` provides query-only access to Hyper databases via gRPC.
29/// Results are returned in Apache Arrow IPC format.
30///
31/// gRPC transport is always available - no feature flags required.
32///
33/// # Limitations
34///
35/// The gRPC interface is **read-only**:
36/// - Only SELECT queries are supported
37/// - No INSERT, UPDATE, DELETE, or DDL operations
38/// - No COPY protocol for bulk data insertion
39///
40/// For write operations, use the standard TCP [`Client`](crate::client::Client).
41///
42/// # Example
43///
44/// ```no_run
45/// use hyperdb_api_core::client::grpc::{GrpcClient, GrpcConfig};
46///
47/// #[tokio::main]
48/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
49///     let config = GrpcConfig::new("http://localhost:7484")
50///         .database("my_database.hyper");
51///
52///     let mut client = GrpcClient::connect(config).await?;
53///
54///     // Execute a query
55///     let result = client.execute_query("SELECT * FROM users").await?;
56///     let arrow_data = result.arrow_data();
57///
58///     // Process arrow_data with arrow crate...
59///
60///     client.close().await?;
61///     Ok(())
62/// }
63/// ```
64#[derive(Debug)]
65pub struct GrpcClient {
66    /// The underlying gRPC channel
67    channel: Channel,
68    /// Client configuration
69    config: GrpcConfig,
70}
71
72impl GrpcClient {
73    /// Connects to a Hyper server via gRPC.
74    ///
75    /// # Example
76    ///
77    /// ```no_run
78    /// use hyperdb_api_core::client::grpc::{GrpcClient, GrpcConfig};
79    ///
80    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
81    /// let config = GrpcConfig::new("http://localhost:7484")
82    ///     .database("test.hyper");
83    ///
84    /// let client = GrpcClient::connect(config).await?;
85    /// # Ok(())
86    /// # }
87    /// ```
88    ///
89    /// # Errors
90    ///
91    /// - Returns [`ErrorKind::Config`] if `config.endpoint` is not a
92    ///   well-formed URI, or if TLS configuration fails.
93    /// - Returns [`ErrorKind::Connection`] if the gRPC transport
94    ///   cannot establish a channel to the endpoint.
95    pub async fn connect(config: GrpcConfig) -> Result<Self> {
96        info!(endpoint = %config.endpoint, "Connecting to Hyper via gRPC");
97
98        let endpoint = Endpoint::from_shared(config.endpoint.clone())
99            .map_err(|e| Error::new(ErrorKind::Config, format!("Invalid gRPC endpoint: {e}")))?;
100
101        // Configure timeouts
102        let endpoint = endpoint
103            .connect_timeout(config.connect_timeout)
104            .timeout(config.request_timeout);
105
106        // Configure TLS if needed
107        let endpoint = if config.use_tls {
108            // Use system root certificates for TLS validation
109            let tls_config = tonic::transport::ClientTlsConfig::new().with_enabled_roots();
110
111            endpoint.tls_config(tls_config).map_err(|e| {
112                Error::new(ErrorKind::Config, format!("TLS configuration error: {e}"))
113            })?
114        } else {
115            endpoint
116        };
117
118        // Connect
119        let channel = endpoint.connect().await.map_err(|e| {
120            debug!("gRPC connection error details: {:?}", e);
121            Error::new(
122                ErrorKind::Connection,
123                format!("Failed to connect to gRPC endpoint: {e} (details: {e:?})"),
124            )
125        })?;
126
127        debug!("gRPC channel established");
128
129        Ok(GrpcClient { channel, config })
130    }
131
132    /// Returns the underlying gRPC channel.
133    ///
134    /// This can be used for advanced use cases like channel cloning or
135    /// direct stub access.
136    #[must_use]
137    pub fn channel(&self) -> &Channel {
138        &self.channel
139    }
140
141    /// Returns the client configuration.
142    pub fn config(&self) -> &GrpcConfig {
143        &self.config
144    }
145
146    /// Executes a SQL query and returns the result.
147    ///
148    /// Results are returned in Apache Arrow IPC format. Use the `arrow_data()`
149    /// method on the result to get the raw Arrow bytes.
150    ///
151    /// # Example
152    ///
153    /// ```no_run
154    /// use hyperdb_api_core::client::grpc::{GrpcClient, GrpcConfig};
155    ///
156    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
157    /// # let config = GrpcConfig::new("http://localhost:7484");
158    /// # let mut client = GrpcClient::connect(config).await?;
159    /// let result = client.execute_query("SELECT * FROM users LIMIT 10").await?;
160    /// let arrow_bytes = result.arrow_data();
161    /// # Ok(())
162    /// # }
163    /// ```
164    ///
165    /// # Errors
166    ///
167    /// Returns an error if:
168    /// - The query syntax is invalid
169    /// - The referenced tables/columns don't exist
170    /// - A non-SELECT query is executed (gRPC is read-only)
171    /// - The connection is lost
172    pub async fn execute_query(&mut self, sql: &str) -> Result<GrpcQueryResult> {
173        self.execute_query_with_options(sql, OutputFormat::ArrowIpc, self.config.transfer_mode)
174            .await
175    }
176
177    /// Executes a query and returns raw Arrow IPC bytes.
178    ///
179    /// This is a convenience method that extracts the Arrow data from the result.
180    ///
181    /// # Example
182    ///
183    /// ```no_run
184    /// use hyperdb_api_core::client::grpc::{GrpcClient, GrpcConfig};
185    ///
186    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
187    /// # let config = GrpcConfig::new("http://localhost:7484");
188    /// # let mut client = GrpcClient::connect(config).await?;
189    /// let arrow_bytes = client.execute_query_to_arrow("SELECT * FROM users").await?;
190    /// // Parse with arrow crate...
191    /// # Ok(())
192    /// # }
193    /// ```
194    ///
195    /// # Errors
196    ///
197    /// Same failure modes as [`Self::execute_query`] (invalid SQL,
198    /// missing tables/columns, non-SELECT mutation attempts, or
199    /// connection loss).
200    pub async fn execute_query_to_arrow(&mut self, sql: &str) -> Result<bytes::Bytes> {
201        let result = self.execute_query(sql).await?;
202        Ok(result.into_arrow_data())
203    }
204
205    /// Executes a parameterized SQL query.
206    ///
207    /// This provides SQL injection prevention and type safety by separating
208    /// the query from its parameters.
209    ///
210    /// # Arguments
211    ///
212    /// * `sql` - SQL query with parameter placeholders
213    /// * `params` - Query parameters (JSON or Arrow encoded)
214    /// * `style` - Parameter style used in the query
215    ///
216    /// # Example
217    ///
218    /// ```no_run
219    /// use hyperdb_api_core::client::grpc::{GrpcClient, GrpcConfig, QueryParameters, ParameterStyle};
220    ///
221    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
222    /// # let config = GrpcConfig::new("http://localhost:7484");
223    /// # let mut client = GrpcClient::connect(config).await?;
224    /// // Dollar-numbered parameters (mixed types use from_json_value)
225    /// let params = QueryParameters::from_json_value(&serde_json::json!([42, "Alice"]))?;
226    /// let result = client.execute_query_with_params(
227    ///     "SELECT * FROM users WHERE id = $1 AND name = $2",
228    ///     params,
229    ///     ParameterStyle::DollarNumbered,
230    /// ).await?;
231    ///
232    /// // Named parameters
233    /// let params = QueryParameters::json_named()
234    ///     .add("min_age", &18)?
235    ///     .build();
236    /// let result = client.execute_query_with_params(
237    ///     "SELECT * FROM users WHERE age >= :min_age",
238    ///     params,
239    ///     ParameterStyle::Named,
240    /// ).await?;
241    /// # Ok(())
242    /// # }
243    /// ```
244    ///
245    /// # Errors
246    ///
247    /// Same failure modes as [`Self::execute_query`], plus any
248    /// parameter-related error reported by the server (unknown
249    /// placeholder, type coercion failure, shape mismatch between the
250    /// SQL placeholders and the supplied parameter set).
251    pub async fn execute_query_with_params(
252        &mut self,
253        sql: &str,
254        params: QueryParameters,
255        style: ParameterStyle,
256    ) -> Result<GrpcQueryResult> {
257        self.execute_query_with_params_and_options(
258            sql,
259            params,
260            style,
261            OutputFormat::ArrowIpc,
262            self.config.transfer_mode,
263        )
264        .await
265    }
266
267    /// Executes a parameterized query and returns raw Arrow IPC bytes.
268    ///
269    /// # Example
270    ///
271    /// ```no_run
272    /// use hyperdb_api_core::client::grpc::{GrpcClient, GrpcConfig, QueryParameters, ParameterStyle};
273    ///
274    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
275    /// # let config = GrpcConfig::new("http://localhost:7484");
276    /// # let mut client = GrpcClient::connect(config).await?;
277    /// let params = QueryParameters::json_positional(&[&42i64])?;
278    /// let arrow_bytes = client.execute_query_with_params_to_arrow(
279    ///     "SELECT * FROM users WHERE id = $1",
280    ///     params,
281    ///     ParameterStyle::DollarNumbered,
282    /// ).await?;
283    /// # Ok(())
284    /// # }
285    /// ```
286    ///
287    /// # Errors
288    ///
289    /// Same failure modes as [`Self::execute_query_with_params`].
290    pub async fn execute_query_with_params_to_arrow(
291        &mut self,
292        sql: &str,
293        params: QueryParameters,
294        style: ParameterStyle,
295    ) -> Result<bytes::Bytes> {
296        let result = self.execute_query_with_params(sql, params, style).await?;
297        Ok(result.into_arrow_data())
298    }
299
300    /// Executes a parameterized query with specific options.
301    ///
302    /// This allows full control over the output format and transfer mode.
303    ///
304    /// # Errors
305    ///
306    /// - Returns [`ErrorKind::Protocol`] if the server returns no
307    ///   result chunks and does not signal completion.
308    /// - Propagates any error from the underlying
309    ///   `GrpcQueryExecutor` — auth failure, transport error, or
310    ///   server-side SQL error surfaced as [`tonic::Status`].
311    pub async fn execute_query_with_params_and_options(
312        &mut self,
313        sql: &str,
314        params: QueryParameters,
315        style: ParameterStyle,
316        output_format: OutputFormat,
317        transfer_mode: TransferMode,
318    ) -> Result<GrpcQueryResult> {
319        debug!(
320            sql = %sql,
321            param_style = ?style,
322            format = ?output_format,
323            mode = ?transfer_mode,
324            "Executing parameterized query"
325        );
326
327        // Build the query parameter
328        let query_param = QueryParam {
329            query: sql.to_string(),
330            databases: self.build_attached_databases(),
331            output_format: output_format.into(),
332            settings: self.config.settings.clone(),
333            transfer_mode: transfer_mode.into(),
334            param_style: i32::from(style),
335            parameters: Some(params.into_proto()),
336            result_range: None,
337            query_row_limit: None,
338        };
339
340        // Build headers for authentication and routing
341        let headers = self.build_headers();
342
343        // Create executor with configured message size limits
344        let client = HyperServiceClient::new(self.channel.clone())
345            .max_decoding_message_size(self.config.max_decoding_message_size)
346            .max_encoding_message_size(self.config.max_encoding_message_size);
347        let mut executor = GrpcQueryExecutor::new(client, headers, transfer_mode);
348
349        // Execute the query
350        executor.execute(query_param).await?;
351
352        // Collect all result chunks
353        let mut final_result = GrpcQueryResult::default();
354
355        loop {
356            if let Some(mut partial_result) = executor.next_result().await? {
357                // Merge chunks
358                while let Some(chunk) = partial_result.take_chunk() {
359                    final_result.chunks.push_back(chunk);
360                }
361                // Copy metadata from last result
362                if partial_result.query_id.is_some() {
363                    final_result.query_id = partial_result.query_id;
364                }
365                if partial_result.schema.is_some() {
366                    final_result.schema = partial_result.schema;
367                }
368                if partial_result.rows_affected.is_some() {
369                    final_result.rows_affected = partial_result.rows_affected;
370                }
371                // Check if complete
372                if partial_result.is_complete {
373                    final_result.is_complete = true;
374                    break;
375                }
376            } else {
377                // No more results
378                final_result.is_complete = true;
379                break;
380            }
381        }
382
383        if final_result.chunks.is_empty() && !final_result.is_complete {
384            return Err(Error::new(ErrorKind::Protocol, "No result from query"));
385        }
386
387        Ok(final_result)
388    }
389
390    /// Executes a query with specific options.
391    ///
392    /// This allows control over the output format and transfer mode.
393    ///
394    /// # Errors
395    ///
396    /// - Returns [`ErrorKind::Protocol`] if the server returns no
397    ///   result chunks and does not signal completion.
398    /// - Propagates any error from the underlying
399    ///   `GrpcQueryExecutor` — auth failure, transport error, or
400    ///   server-side SQL error surfaced as [`tonic::Status`].
401    pub async fn execute_query_with_options(
402        &mut self,
403        sql: &str,
404        output_format: OutputFormat,
405        transfer_mode: TransferMode,
406    ) -> Result<GrpcQueryResult> {
407        debug!(sql = %sql, format = ?output_format, mode = ?transfer_mode, "Executing query");
408
409        // Build the query parameter
410        let query_param = QueryParam {
411            query: sql.to_string(),
412            databases: self.build_attached_databases(),
413            output_format: output_format.into(),
414            settings: self.config.settings.clone(),
415            transfer_mode: transfer_mode.into(),
416            param_style: 0, // Default
417            parameters: None,
418            result_range: None,
419            query_row_limit: None,
420        };
421
422        // Build headers for authentication and routing
423        let headers = self.build_headers();
424
425        // Create executor with configured message size limits
426        let client = HyperServiceClient::new(self.channel.clone())
427            .max_decoding_message_size(self.config.max_decoding_message_size)
428            .max_encoding_message_size(self.config.max_encoding_message_size);
429        let mut executor = GrpcQueryExecutor::new(client, headers, transfer_mode);
430
431        // Execute the query
432        executor.execute(query_param).await?;
433
434        // Collect all result chunks
435        let mut final_result = GrpcQueryResult::default();
436
437        loop {
438            if let Some(mut partial_result) = executor.next_result().await? {
439                // Merge chunks
440                while let Some(chunk) = partial_result.take_chunk() {
441                    final_result.chunks.push_back(chunk);
442                }
443                // Copy metadata from last result
444                if partial_result.query_id.is_some() {
445                    final_result.query_id = partial_result.query_id;
446                }
447                if partial_result.schema.is_some() {
448                    final_result.schema = partial_result.schema;
449                }
450                if partial_result.rows_affected.is_some() {
451                    final_result.rows_affected = partial_result.rows_affected;
452                }
453                // Check if complete
454                if partial_result.is_complete {
455                    final_result.is_complete = true;
456                    break;
457                }
458            } else {
459                // No more results
460                final_result.is_complete = true;
461                break;
462            }
463        }
464
465        if final_result.chunks.is_empty() && !final_result.is_complete {
466            return Err(Error::new(ErrorKind::Protocol, "No result from query"));
467        }
468
469        Ok(final_result)
470    }
471
472    /// Executes a query and returns a streaming chunk producer.
473    ///
474    /// Unlike [`execute_query`](Self::execute_query), which drains every
475    /// result chunk into a single [`GrpcQueryResult`] before returning, this
476    /// method yields chunks lazily: each call to
477    /// [`GrpcChunkStream::next_chunk`] pulls just enough from the HTTP/2
478    /// stream to produce one Arrow IPC byte chunk. For very large result
479    /// sets (hundreds of MB to GB) this keeps client memory bounded by a
480    /// single gRPC message (capped at the tonic
481    /// `max_decoding_message_size`, default 64 MB) rather than growing to
482    /// the full result size.
483    ///
484    /// Pair this with
485    /// [`hyperdb_api::ArrowRowset::from_stream`][arrow_rowset_from_stream] to
486    /// decode batches incrementally and keep peak memory constant regardless
487    /// of total row count.
488    ///
489    /// [arrow_rowset_from_stream]: https://docs.rs/hyperdb-api/latest/hyperdb_api/struct.ArrowRowset.html#method.from_stream
490    ///
491    /// # Errors
492    ///
493    /// Same failure modes as
494    /// [`Self::execute_query_stream_with_options`] — invalid SQL,
495    /// auth failure, transport error, etc.
496    pub async fn execute_query_stream(&mut self, sql: &str) -> Result<GrpcChunkStream> {
497        self.execute_query_stream_with_options(
498            sql,
499            OutputFormat::ArrowIpc,
500            self.config.transfer_mode,
501        )
502        .await
503    }
504
505    /// Streaming variant of [`execute_query_with_options`](Self::execute_query_with_options).
506    ///
507    /// # Errors
508    ///
509    /// Propagates any error from the initial
510    /// `GrpcQueryExecutor::execute` call — server-side SQL error,
511    /// auth failure, or transport-level gRPC error.
512    pub async fn execute_query_stream_with_options(
513        &mut self,
514        sql: &str,
515        output_format: OutputFormat,
516        transfer_mode: TransferMode,
517    ) -> Result<GrpcChunkStream> {
518        debug!(sql = %sql, format = ?output_format, mode = ?transfer_mode, "Executing streaming query");
519
520        let query_param = QueryParam {
521            query: sql.to_string(),
522            databases: self.build_attached_databases(),
523            output_format: output_format.into(),
524            settings: self.config.settings.clone(),
525            transfer_mode: transfer_mode.into(),
526            param_style: 0,
527            parameters: None,
528            result_range: None,
529            query_row_limit: None,
530        };
531
532        let headers = self.build_headers();
533
534        let client = HyperServiceClient::new(self.channel.clone())
535            .max_decoding_message_size(self.config.max_decoding_message_size)
536            .max_encoding_message_size(self.config.max_encoding_message_size);
537        let mut executor = GrpcQueryExecutor::new(client, headers, transfer_mode);
538
539        executor.execute(query_param).await?;
540
541        Ok(GrpcChunkStream::new(executor))
542    }
543
544    /// Cancels an in-flight gRPC query by its `query_id`.
545    ///
546    /// This is the gRPC analogue of the PG wire `CancelRequest` packet: it
547    /// tells the server to stop executing a previously-started query. Unlike
548    /// PG wire (where the cancel travels on a *fresh* connection), gRPC
549    /// cancels travel as a regular RPC multiplexed over the existing HTTP/2
550    /// channel — that's why this call shares `self.channel` with normal
551    /// query traffic.
552    ///
553    /// # When do you have a `query_id`?
554    ///
555    /// The server assigns a `query_id` for queries started in
556    /// [`TransferMode::Async`](super::proto::hyper_service::query_param::TransferMode)
557    /// (long-running queries that the client polls). Grab it from
558    /// [`GrpcQueryResult::query_id`](super::result::GrpcQueryResult::query_id)
559    /// after `execute_query_with_options(..., TransferMode::Async)` returns.
560    /// SYNC-mode queries typically complete before the client needs a
561    /// cancel — for those, just drop the in-flight future.
562    ///
563    /// # Query-id lifecycle
564    ///
565    /// Query ids are stable for the lifetime of a query and are
566    /// server-assigned — a given id is never silently re-used for a
567    /// different query (Hyper generates them as UUID-like opaque tokens,
568    /// not sequential counters). The only race a caller needs to
569    /// consider is between obtaining the id and calling `cancel_query`:
570    ///
571    /// - If the query is still running, the cancel lands and the server
572    ///   aborts it.
573    /// - If the query has already completed normally between "obtain id"
574    ///   and "cancel", the server sees a cancel for an unknown /
575    ///   completed query and handles it gracefully (the exact shape
576    ///   depends on server build — see the tests in
577    ///   `hyperdb-api/tests/grpc_cancel_tests.rs` for details). Either way
578    ///   the channel stays healthy.
579    ///
580    /// There is no scenario where a stale id causes a cancel to target
581    /// the *wrong* query, because ids are not reassigned.
582    ///
583    /// # Errors
584    ///
585    /// Propagates transport-level errors. A successful cancel returns
586    /// `Ok(())` even if the query had already completed on the server;
587    /// cancellation is best-effort by design.
588    ///
589    /// # Relation to the [`Cancellable`](crate::client::cancel::Cancellable) trait
590    ///
591    /// This is the **fallible user-facing cancel API**: it returns a
592    /// `Result<()>` so explicit callers can observe transport-level
593    /// failures and react accordingly.
594    ///
595    /// It is *not* an implementation of the
596    /// [`Cancellable`](crate::client::cancel::Cancellable) trait — and cannot
597    /// be, because `Cancellable::cancel(&self)` takes no arguments while
598    /// gRPC cancels need a per-query `query_id`. A `GrpcClient` can have
599    /// many concurrent queries in flight; there is no single "the"
600    /// query on it the way there is on a PG wire connection. A future
601    /// gRPC streaming result type (when one is introduced) would carry
602    /// its `query_id` in a dedicated handle like
603    /// `GrpcCancelHandle { client, query_id }`, and *that* handle
604    /// would `impl Cancellable` by wrapping this method and swallowing
605    /// errors — same shape as
606    /// [`impl Cancellable for Client`](crate::client::cancel::Cancellable).
607    /// See the [`Cancellable`](crate::client::cancel::Cancellable) trait docs
608    /// for the full wrapper pattern.
609    ///
610    /// # Example
611    ///
612    /// ```no_run
613    /// use hyperdb_api_core::client::grpc::{GrpcClient, GrpcConfig, OutputFormat, TransferMode};
614    ///
615    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
616    /// # let config = GrpcConfig::new("http://localhost:7484");
617    /// # let mut client = GrpcClient::connect(config).await?;
618    /// let result = client
619    ///     .execute_query_with_options(
620    ///         "SELECT * FROM very_large_table",
621    ///         OutputFormat::ArrowIpc,
622    ///         TransferMode::Async,
623    ///     )
624    ///     .await?;
625    ///
626    /// if let Some(query_id) = result.query_id() {
627    ///     // Some time later, decide to abort:
628    ///     client.cancel_query(query_id).await?;
629    /// }
630    /// # Ok(())
631    /// # }
632    /// ```
633    pub async fn cancel_query(&mut self, query_id: &str) -> Result<()> {
634        debug!(query_id = %query_id, "Cancelling gRPC query");
635
636        let param = CancelQueryParam {
637            query_id: query_id.to_string(),
638        };
639        let mut request = tonic::Request::new(param);
640
641        // Apply the client's standard headers (database routing, custom
642        // headers). Matches what the query path does so that any server-side
643        // routing based on headers lands the cancel on the same backend as
644        // the query it's trying to cancel.
645        //
646        // Header parse failures are logged at `warn!` and then skipped —
647        // the cancel goes out without that particular header rather than
648        // failing the whole operation. A missing custom header is strictly
649        // better than a cancel we never send. The warn! is the only
650        // operational signal that routing-critical headers (e.g. the
651        // database selector) were dropped, so don't silence it.
652        for (key, value) in self.build_headers() {
653            match (
654                key.parse::<tonic::metadata::MetadataKey<_>>(),
655                value.parse(),
656            ) {
657                (Ok(k), Ok(v)) => {
658                    request.metadata_mut().insert(k, v);
659                }
660                (key_res, value_res) => {
661                    warn!(
662                        target: "hyperdb_api_core::client",
663                        query_id = %query_id,
664                        header_key = %key,
665                        key_parse_ok = key_res.is_ok(),
666                        value_parse_ok = value_res.is_ok(),
667                        "cancel: header parse failed, dropping header from cancel request",
668                    );
669                }
670            }
671        }
672        // Also set the canonical x-hyperdb-query-id metadata — some server
673        // deployments route cancels based on this header rather than the
674        // payload body.
675        match query_id.parse() {
676            Ok(value) => {
677                request.metadata_mut().insert("x-hyperdb-query-id", value);
678            }
679            Err(e) => {
680                warn!(
681                    target: "hyperdb_api_core::client",
682                    query_id = %query_id,
683                    error = %e,
684                    "cancel: x-hyperdb-query-id header parse failed; \
685                     cancel routing may fall back to payload-based lookup",
686                );
687            }
688        }
689
690        let mut client = HyperServiceClient::new(self.channel.clone())
691            .max_decoding_message_size(self.config.max_decoding_message_size)
692            .max_encoding_message_size(self.config.max_encoding_message_size);
693        client
694            .cancel_query(request)
695            .await
696            .map_err(from_grpc_status)?;
697
698        info!(query_id = %query_id, "gRPC query cancelled");
699        Ok(())
700    }
701
702    #[expect(
703        clippy::unused_async,
704        reason = "async fn retained for API symmetry; callers await regardless of whether the current body is synchronous"
705    )]
706    /// Closes the gRPC connection.
707    ///
708    /// This is a no-op as tonic channels are reference-counted and will be
709    /// closed when the last reference is dropped.
710    ///
711    /// # Errors
712    ///
713    /// Currently infallible — always returns `Ok(())`. The `Result`
714    /// return type is preserved for API symmetry with
715    /// [`GrpcClientSync::close`] and for forward compatibility if
716    /// future tonic channels expose a fallible shutdown.
717    pub async fn close(self) -> Result<()> {
718        debug!("Closing gRPC connection");
719        // Channel is dropped automatically
720        Ok(())
721    }
722
723    /// Builds the attached databases list from configuration.
724    fn build_attached_databases(&self) -> Vec<AttachedDatabase> {
725        if let Some(db_path) = &self.config.database {
726            debug!(db_path = %db_path, "Attaching database for query");
727            // Check if it's a JSON array (multiple databases)
728            if db_path.starts_with('[') {
729                // Parse JSON - for now just use as single database
730                // TODO: Implement proper JSON parsing for multiple databases
731                vec![AttachedDatabase {
732                    path: db_path.clone(),
733                    alias: String::new(), // Empty alias means use default
734                }]
735            } else {
736                vec![AttachedDatabase {
737                    path: db_path.clone(),
738                    alias: String::new(), // Empty alias means use default
739                }]
740            }
741        } else {
742            debug!("No database configured on gRPC client — query will run without attachment");
743            vec![]
744        }
745    }
746
747    /// Builds headers for gRPC requests.
748    fn build_headers(&self) -> Vec<(String, String)> {
749        let mut headers: Vec<(String, String)> = self
750            .config
751            .headers
752            .iter()
753            .map(|(k, v)| (k.clone(), v.clone()))
754            .collect();
755
756        // Add database header if configured
757        if let Some(ref db) = self.config.database {
758            headers.push(("x-hyper-database".to_string(), db.clone()));
759        }
760
761        headers
762    }
763}
764
765/// Synchronous wrapper around [`GrpcClient`].
766///
767/// This provides a blocking API by creating a Tokio runtime internally.
768/// For better performance in async contexts, use [`GrpcClient`] directly.
769///
770/// # Example
771///
772/// ```no_run
773/// use hyperdb_api_core::client::grpc::{GrpcClientSync, GrpcConfig};
774///
775/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
776/// let config = GrpcConfig::new("http://localhost:7484")
777///     .database("test.hyper");
778///
779/// let mut client = GrpcClientSync::connect(config)?;
780/// let result = client.execute_query("SELECT * FROM users")?;
781/// let arrow_bytes = result.arrow_data();
782/// # Ok(())
783/// # }
784/// ```
785#[derive(Debug)]
786pub struct GrpcClientSync {
787    /// The async client
788    inner: GrpcClient,
789    /// Tokio runtime for blocking operations.
790    ///
791    /// Wrapped in `Arc` so streaming chunk producers
792    /// ([`GrpcChunkStreamSync`]) can share the same runtime without having to
793    /// borrow the client or create their own.
794    runtime: Arc<tokio::runtime::Runtime>,
795}
796
797impl GrpcClientSync {
798    /// Connects to a Hyper server via gRPC (blocking).
799    ///
800    /// # Errors
801    ///
802    /// - Returns [`ErrorKind::Other`] if a current-thread Tokio
803    ///   runtime cannot be built.
804    /// - Propagates any error from [`GrpcClient::connect`] (invalid
805    ///   endpoint, TLS configuration failure, or transport setup
806    ///   failure).
807    pub fn connect(config: GrpcConfig) -> Result<Self> {
808        let runtime = tokio::runtime::Builder::new_current_thread()
809            .enable_all()
810            .build()
811            .map_err(|e| {
812                Error::new(
813                    ErrorKind::Other,
814                    format!("Failed to create Tokio runtime: {e}"),
815                )
816            })?;
817
818        let inner = runtime.block_on(GrpcClient::connect(config))?;
819
820        Ok(GrpcClientSync {
821            inner,
822            runtime: Arc::new(runtime),
823        })
824    }
825
826    /// Executes a SQL query (blocking).
827    ///
828    /// # Errors
829    ///
830    /// Blocking wrapper around [`GrpcClient::execute_query`]; see that
831    /// method for the concrete failure modes.
832    pub fn execute_query(&mut self, sql: &str) -> Result<GrpcQueryResult> {
833        self.runtime.block_on(self.inner.execute_query(sql))
834    }
835
836    /// Executes a query and returns Arrow IPC bytes (blocking).
837    ///
838    /// # Errors
839    ///
840    /// Same failure modes as [`Self::execute_query`].
841    pub fn execute_query_to_arrow(&mut self, sql: &str) -> Result<bytes::Bytes> {
842        self.runtime
843            .block_on(self.inner.execute_query_to_arrow(sql))
844    }
845
846    /// Executes a query and returns a blocking streaming chunk producer.
847    ///
848    /// See [`GrpcClient::execute_query_stream`] for the streaming semantics
849    /// and memory behavior. The returned [`GrpcChunkStreamSync`] lets you
850    /// pull chunks one at a time without buffering the entire result.
851    ///
852    /// # Errors
853    ///
854    /// Same failure modes as [`GrpcClient::execute_query_stream`].
855    pub fn execute_query_stream(&mut self, sql: &str) -> Result<GrpcChunkStreamSync> {
856        let inner = self
857            .runtime
858            .block_on(self.inner.execute_query_stream(sql))?;
859        Ok(GrpcChunkStreamSync {
860            inner,
861            runtime: Arc::clone(&self.runtime),
862        })
863    }
864
865    /// Executes a parameterized SQL query (blocking).
866    ///
867    /// # Example
868    ///
869    /// ```no_run
870    /// use hyperdb_api_core::client::grpc::{GrpcClientSync, GrpcConfig, QueryParameters, ParameterStyle};
871    ///
872    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
873    /// # let config = GrpcConfig::new("http://localhost:7484");
874    /// # let mut client = GrpcClientSync::connect(config)?;
875    /// let params = QueryParameters::json_positional(&[&42i64])?;
876    /// let result = client.execute_query_with_params(
877    ///     "SELECT * FROM users WHERE id = $1",
878    ///     params,
879    ///     ParameterStyle::DollarNumbered,
880    /// )?;
881    /// # Ok(())
882    /// # }
883    /// ```
884    ///
885    /// # Errors
886    ///
887    /// Blocking wrapper around
888    /// [`GrpcClient::execute_query_with_params`]; see that method for
889    /// the concrete failure modes.
890    pub fn execute_query_with_params(
891        &mut self,
892        sql: &str,
893        params: QueryParameters,
894        style: ParameterStyle,
895    ) -> Result<GrpcQueryResult> {
896        self.runtime
897            .block_on(self.inner.execute_query_with_params(sql, params, style))
898    }
899
900    /// Executes a parameterized query and returns Arrow IPC bytes (blocking).
901    ///
902    /// # Errors
903    ///
904    /// Same failure modes as [`Self::execute_query_with_params`].
905    pub fn execute_query_with_params_to_arrow(
906        &mut self,
907        sql: &str,
908        params: QueryParameters,
909        style: ParameterStyle,
910    ) -> Result<bytes::Bytes> {
911        self.runtime.block_on(
912            self.inner
913                .execute_query_with_params_to_arrow(sql, params, style),
914        )
915    }
916
917    /// Cancels an in-flight gRPC query by its `query_id` (blocking).
918    ///
919    /// Blocking wrapper around
920    /// [`GrpcClient::cancel_query`]. See that method's documentation for
921    /// when a `query_id` is available (ASYNC-mode queries), best-effort
922    /// cancel semantics, and the full "Relation to the `Cancellable`
923    /// trait" discussion.
924    ///
925    /// # Fallible by design
926    ///
927    /// The `Result<()>` return is intentional and mirrors the async
928    /// `GrpcClient::cancel_query`. Explicit callers get to observe
929    /// transport-level failures (network errors, channel closed, auth
930    /// expired) so they can record metrics, retry, or surface "cancel
931    /// failed" UX. This is *not* an `impl Cancellable for GrpcClientSync`
932    /// — it cannot be, because `Cancellable::cancel(&self)` takes no
933    /// arguments and has no way to pass the `query_id`.  See the
934    /// [`Cancellable`](crate::client::cancel::Cancellable) trait docs for the
935    /// infallible-wrapper pattern used by `Drop`-path consumers.
936    ///
937    /// # Example
938    ///
939    /// ```no_run
940    /// use hyperdb_api_core::client::grpc::{GrpcClientSync, GrpcConfig};
941    ///
942    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
943    /// # let config = GrpcConfig::new("http://localhost:7484");
944    /// # let mut client = GrpcClientSync::connect(config)?;
945    /// # let query_id = "some-query-id";
946    /// client.cancel_query(query_id)?;
947    /// # Ok(())
948    /// # }
949    /// ```
950    ///
951    /// # Errors
952    ///
953    /// Same failure modes as [`GrpcClient::cancel_query`] —
954    /// transport-level errors bubble up; a cancel for an
955    /// already-completed query returns `Ok(())` by design.
956    pub fn cancel_query(&mut self, query_id: &str) -> Result<()> {
957        self.runtime.block_on(self.inner.cancel_query(query_id))
958    }
959
960    /// Returns the client configuration.
961    pub fn config(&self) -> &GrpcConfig {
962        self.inner.config()
963    }
964
965    /// Closes the connection (blocking).
966    ///
967    /// # Errors
968    ///
969    /// Currently infallible — always returns `Ok(())`. The `Result`
970    /// return type is preserved for API symmetry with async callers.
971    pub fn close(self) -> Result<()> {
972        self.runtime.block_on(self.inner.close())
973    }
974}
975
976/// Blocking wrapper around [`GrpcChunkStream`].
977///
978/// Returned by [`GrpcClientSync::execute_query_stream`] and the
979/// `AuthenticatedGrpcClientSync` equivalent. Yields Arrow IPC byte chunks
980/// one at a time, blocking on the shared Tokio runtime as needed.
981///
982/// Pair with
983/// [`hyperdb_api::ArrowRowset::from_stream`][arrow_rowset_from_stream] to
984/// decode Arrow record batches incrementally with constant client memory.
985///
986/// [arrow_rowset_from_stream]: https://docs.rs/hyperdb-api/latest/hyperdb_api/struct.ArrowRowset.html#method.from_stream
987#[derive(Debug)]
988pub struct GrpcChunkStreamSync {
989    inner: GrpcChunkStream,
990    runtime: Arc<tokio::runtime::Runtime>,
991}
992
993impl GrpcChunkStreamSync {
994    /// Returns the next Arrow IPC byte chunk, or `None` when the stream is
995    /// complete.
996    ///
997    /// # Errors
998    ///
999    /// Same failure modes as [`GrpcChunkStream::next_chunk`] —
1000    /// transport errors and server-side query failures surface as
1001    /// [`Error`].
1002    pub fn next_chunk(&mut self) -> Result<Option<bytes::Bytes>> {
1003        self.runtime.block_on(self.inner.next_chunk())
1004    }
1005
1006    /// Returns the schema reported by the server, if one has been received yet.
1007    pub fn schema(&self) -> Option<&super::proto::QueryResultSchema> {
1008        self.inner.schema()
1009    }
1010
1011    /// Returns the server-assigned query ID, if one has been received.
1012    pub fn query_id(&self) -> Option<&str> {
1013        self.inner.query_id()
1014    }
1015
1016    /// Returns the affected row count for DML queries, if reported.
1017    pub fn rows_affected(&self) -> Option<u64> {
1018        self.inner.rows_affected()
1019    }
1020}