Skip to main content

hyperdb_api_core/client/grpc/
executor.rs

1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! gRPC query executor with state machine for result fetching.
5//!
6//! This module implements the query execution state machine that handles
7//! the different transfer modes (SYNC, ASYNC, ADAPTIVE) and manages
8//! fetching results from the Hyper gRPC service.
9//!
10//! # Transfer Mode State Machines
11//!
12//! Each transfer mode follows a different path through the executor states:
13//!
14//! **SYNC** — simplest path, all data in one response:
15//! ```text
16//! ReadInitialResults ──(stream exhausted)──> Finished
17//! ```
18//! The `ExecuteQuery` RPC returns a server-streaming response containing the
19//! schema header followed by one or more binary/string data parts, then the
20//! stream closes. Subject to the server's 100-second SYNC timeout.
21//!
22//! **ASYNC** — decouples submission from fetching:
23//! ```text
24//! ReadInitialResults ──(QueryStatus: Running)──> RequestStatus
25//!     ──> ReadStatus ──(Running)──> RequestStatus  (poll loop)
26//!     ──> ReadStatus ──(Finished)──> RequestResults
27//!     ──> ReadResults ──(more chunks)──> RequestResults
28//!     ──> ReadResults ──(all chunks)──> Finished
29//! ```
30//! The initial `ExecuteQuery` response contains only a `QueryStatus` with a
31//! server-assigned `query_id`. The client polls `GetQueryInfo` until
32//! `CompletionStatus::Finished`, then fetches result chunks via
33//! `GetQueryResult` using chunk IDs.
34//!
35//! **ADAPTIVE** (default, recommended) — hybrid of SYNC and ASYNC:
36//! ```text
37//! ReadInitialResults ──(data + Finished)──> Finished       (small result)
38//! ReadInitialResults ──(data + Running)──> RequestStatus   (large result)
39//!     ──> ... (same as ASYNC from here)
40//! ```
41//! The first chunk of results is returned inline in the `ExecuteQuery`
42//! response. If the query completes within that first chunk, the path is
43//! identical to SYNC (no polling). If the result is larger, the response
44//! includes a `QueryStatus` with `Running` and the executor transitions
45//! to the ASYNC polling path for remaining chunks.
46
47use bytes::Bytes;
48use tonic::Streaming;
49use tracing::{debug, trace, warn};
50
51use crate::client::error::{Error, ErrorKind, Result};
52
53use super::error::from_grpc_status;
54use super::proto::hyper_service::query_param::TransferMode;
55use super::proto::hyper_service::query_result::Result as QueryResultPayload;
56use super::proto::hyper_service::query_status::CompletionStatus;
57use super::proto::{
58    ExecuteQueryResponse, HyperServiceClient, QueryInfo, QueryInfoParam, QueryResult,
59    QueryResultParam, QueryStatus,
60};
61use super::result::{GrpcQueryResult, GrpcResultChunk};
62
63/// State of the query executor.
64#[derive(Debug, Clone, Copy, PartialEq, Eq)]
65enum ExecutorState {
66    /// Reading initial results from `ExecuteQuery` stream
67    ReadInitialResults,
68    /// Requesting query status via `GetQueryInfo`
69    RequestStatus,
70    /// Reading query status
71    ReadStatus,
72    /// Requesting result chunks via `GetQueryResult`
73    RequestResults,
74    /// Reading result chunks
75    ReadResults,
76    /// Query execution complete
77    Finished,
78}
79
80/// Executes gRPC queries and manages result fetching.
81///
82/// This mirrors the C++ `GrpcQueryExecutor` implementation, handling the
83/// different transfer modes and async result fetching.
84pub(crate) struct GrpcQueryExecutor<T> {
85    /// The gRPC client
86    client: HyperServiceClient<T>,
87    /// Metadata headers for requests
88    headers: Vec<(String, String)>,
89    /// Current state of the executor
90    state: ExecutorState,
91    /// Transfer mode being used
92    transfer_mode: TransferMode,
93    /// Stream for `ExecuteQuery` responses
94    execute_stream: Option<Streaming<ExecuteQueryResponse>>,
95    /// Stream for `GetQueryInfo` responses
96    query_info_stream: Option<Streaming<QueryInfo>>,
97    /// Stream for `GetQueryResult` responses
98    query_result_stream: Option<Streaming<QueryResult>>,
99    /// Query status from server
100    query_status: Option<QueryStatus>,
101    /// Query ID for async operations
102    query_id: Option<String>,
103    /// Monotonic label for the next `GrpcResultChunk` appended to
104    /// `self.result.chunks`. Bumped once per received `QueryResult` /
105    /// `BinaryPart` / `StringPart` message. This is *purely* a local
106    /// identifier for downstream consumers and has no relationship to
107    /// server-side chunk IDs.
108    next_local_chunk_id: u64,
109    /// Server-side chunk ID to request in the next `GetQueryResult` RPC,
110    /// and the value compared against `QueryStatus.chunk_count` to decide
111    /// when all chunks have been fetched.
112    ///
113    /// Bumped by exactly 1 after each `GetQueryResult` stream is fully
114    /// drained — regardless of how many `QueryResult` messages that stream
115    /// contained. Mirrors `nextChunkId_` in the C++ `GrpcQueryExecutor`.
116    ///
117    /// Initial value depends on transfer mode:
118    /// - `ASYNC`: `0` — no chunks delivered inline.
119    /// - `ADAPTIVE`: `1` — server sends chunk 0 inline on `ExecuteQuery`.
120    /// - `SYNC`: unused (all data arrives via `ExecuteQuery`).
121    next_server_chunk_id: u64,
122    /// Result being built
123    result: GrpcQueryResult,
124}
125
126impl<T> GrpcQueryExecutor<T>
127where
128    T: tonic::client::GrpcService<tonic::body::Body> + Clone + Send + 'static,
129    T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
130    <T::ResponseBody as tonic::codegen::Body>::Error:
131        Into<tonic::codegen::StdError> + Send + 'static,
132    T::Future: Send,
133{
134    /// Creates a new query executor.
135    pub(crate) fn new(
136        client: HyperServiceClient<T>,
137        headers: Vec<(String, String)>,
138        transfer_mode: TransferMode,
139    ) -> Self {
140        // Match the C++ `GrpcQueryExecutor` constructor: ADAPTIVE delivers
141        // server-side chunk 0 inline on the `ExecuteQuery` response, so the
142        // first chunk we need to request via `GetQueryResult` is chunk 1.
143        // ASYNC doesn't deliver any chunks inline, so we start at 0. SYNC
144        // never reaches the GetQueryResult state machine.
145        let next_server_chunk_id = match transfer_mode {
146            TransferMode::Adaptive => 1,
147            _ => 0,
148        };
149        GrpcQueryExecutor {
150            client,
151            headers,
152            state: ExecutorState::ReadInitialResults,
153            transfer_mode,
154            execute_stream: None,
155            query_info_stream: None,
156            query_result_stream: None,
157            query_status: None,
158            query_id: None,
159            next_local_chunk_id: 0,
160            next_server_chunk_id,
161            result: GrpcQueryResult::new(),
162        }
163    }
164
165    /// Starts query execution.
166    pub(crate) async fn execute(&mut self, query: super::proto::QueryParam) -> Result<()> {
167        debug!(query = %query.query, transfer_mode = ?self.transfer_mode, "Executing gRPC query");
168
169        // Create request with metadata
170        let mut request = tonic::Request::new(query);
171        for (key, value) in &self.headers {
172            if let (Ok(key), Ok(value)) = (
173                key.parse::<tonic::metadata::MetadataKey<_>>(),
174                value.parse(),
175            ) {
176                request.metadata_mut().insert(key, value);
177            }
178        }
179
180        // Execute the query
181        let response = self
182            .client
183            .execute_query(request)
184            .await
185            .map_err(from_grpc_status)?;
186
187        self.execute_stream = Some(response.into_inner());
188        self.state = ExecutorState::ReadInitialResults;
189
190        Ok(())
191    }
192
193    /// Gets the next result.
194    ///
195    /// Returns `None` when all results have been consumed.
196    pub(crate) async fn next_result(&mut self) -> Result<Option<GrpcQueryResult>> {
197        loop {
198            trace!(state = ?self.state, "Query executor state");
199
200            match self.state {
201                ExecutorState::ReadInitialResults => {
202                    self.read_initial_results().await?;
203                }
204                ExecutorState::RequestStatus => {
205                    self.request_status().await?;
206                }
207                ExecutorState::ReadStatus => {
208                    self.read_status().await?;
209                }
210                ExecutorState::RequestResults => {
211                    self.request_results().await?;
212                }
213                ExecutorState::ReadResults => {
214                    self.read_results().await?;
215                }
216                ExecutorState::Finished => {
217                    self.result.is_complete = true;
218                    // Return the accumulated result
219                    return Ok(Some(std::mem::take(&mut self.result)));
220                }
221            }
222
223            // Yield back to the caller as soon as we either have some
224            // chunks to deliver or have reached the terminal state.
225            // Streaming out of `ReadInitialResults` keeps peak memory
226            // bounded for SYNC/inline paths (otherwise we would buffer
227            // the entire ExecuteQuery stream before the first yield).
228            if self.state == ExecutorState::Finished || !self.result.chunks.is_empty() {
229                break;
230            }
231        }
232
233        if self.result.is_complete || !self.result.chunks.is_empty() {
234            Ok(Some(std::mem::take(&mut self.result)))
235        } else {
236            Ok(None)
237        }
238    }
239
240    /// Reads one message from the `ExecuteQuery` stream.
241    ///
242    /// We deliberately do **not** transition state the moment we see a
243    /// `QueryStatus` — under `ADAPTIVE` the server delivers the whole of
244    /// chunk 0 inline *followed by* a `QueryStatus(Running)` and then
245    /// closes the stream, so bailing early would silently drop the tail
246    /// of chunk 0. Only the server-side close (a `None` message) decides
247    /// where to go next. This mirrors the C++ `GrpcQueryExecutor::
248    /// READ_INITIAL_RESULTS` loop, which reads until `Read()` returns
249    /// false and only then inspects `queryStatus_` to pick the next
250    /// state.
251    ///
252    /// One message per call keeps memory bounded: the outer `next_result`
253    /// loop yields accumulated chunks back to the caller as they arrive
254    /// instead of buffering the entire inline response.
255    async fn read_initial_results(&mut self) -> Result<()> {
256        let response = {
257            let stream = self.execute_stream.as_mut().ok_or_else(|| {
258                Error::new(ErrorKind::Protocol, "ExecuteQuery stream not initialized")
259            })?;
260            stream.message().await.map_err(from_grpc_status)?
261        };
262
263        match response {
264            Some(response) => {
265                self.process_execute_response(response)?;
266            }
267            None => {
268                // Server closed the stream. Where we go next is determined
269                // purely by transfer mode, mirroring the C++
270                // `GrpcQueryExecutor`:
271                //   - SYNC: all data is inline, we're done.
272                //   - ASYNC / ADAPTIVE: go to the GetQueryInfo/
273                //     GetQueryResult state machine. A `QueryStatus` of
274                //     `Finished` here means *query execution* is finished,
275                //     NOT that all chunks have been streamed — server-side
276                //     chunks 1..N still need to be fetched for ADAPTIVE
277                //     (and 0..N for ASYNC).
278                match self.transfer_mode {
279                    TransferMode::Sync | TransferMode::Unspecified => {
280                        debug!(
281                            query_id = ?self.query_id,
282                            "ExecuteQuery stream closed; SYNC mode complete",
283                        );
284                        self.state = ExecutorState::Finished;
285                    }
286                    TransferMode::Async | TransferMode::Adaptive => {
287                        debug!(
288                            query_id = ?self.query_id,
289                            mode = ?self.transfer_mode,
290                            next_chunk = self.next_server_chunk_id,
291                            "ExecuteQuery stream closed; fetching remaining chunks",
292                        );
293                        self.state = ExecutorState::RequestStatus;
294                    }
295                }
296            }
297        }
298
299        Ok(())
300    }
301
302    /// Processes an `ExecuteQueryResponse` message.
303    fn process_execute_response(&mut self, response: ExecuteQueryResponse) -> Result<()> {
304        use super::proto::hyper_service::execute_query_response::Result as ResponsePayload;
305        use super::proto::hyper_service::query_info::Content as QueryInfoContent;
306        use super::proto::hyper_service::query_result_header::Header;
307
308        match response.result {
309            Some(ResponsePayload::Header(header)) => match header.header {
310                Some(Header::Schema(schema)) => {
311                    debug!(columns = schema.columns.len(), "Received schema");
312                    self.result.schema = Some(schema);
313                }
314                Some(Header::Command(cmd)) => {
315                    use super::proto::hyper_service::query_command_ok::CommandReturn;
316                    let rows = match cmd.command_return {
317                        Some(CommandReturn::AffectedRows(n)) => Some(n),
318                        Some(CommandReturn::Empty(())) | None => None,
319                    };
320                    debug!(rows_affected = ?rows, "Command OK");
321                    self.result.rows_affected = rows;
322                    self.state = ExecutorState::Finished;
323                }
324                None => {
325                    warn!("Received empty QueryResultHeader");
326                }
327            },
328            Some(ResponsePayload::BinaryPart(data)) => {
329                debug!(bytes = data.data.len(), "Received binary result part");
330                let chunk = GrpcResultChunk::new(self.next_local_chunk_id, data.data);
331                self.next_local_chunk_id += 1;
332                self.result.chunks.push_back(chunk);
333            }
334            Some(ResponsePayload::StringPart(data)) => {
335                debug!(len = data.data.len(), "Received string result part");
336                let chunk = GrpcResultChunk::new(
337                    self.next_local_chunk_id,
338                    Bytes::from(data.data.into_bytes()),
339                );
340                self.next_local_chunk_id += 1;
341                self.result.chunks.push_back(chunk);
342            }
343            Some(ResponsePayload::QueryInfo(info)) => {
344                match info.content {
345                    Some(QueryInfoContent::QueryStatus(status)) => {
346                        self.process_query_status(status);
347                    }
348                    Some(QueryInfoContent::BinarySchema(data)) => {
349                        debug!(bytes = data.data.len(), "Received binary schema");
350                        // Schema in binary form - store as a chunk
351                        let chunk = GrpcResultChunk::new(self.next_local_chunk_id, data.data);
352                        self.next_local_chunk_id += 1;
353                        self.result.chunks.push_back(chunk);
354                    }
355                    Some(QueryInfoContent::StringSchema(data)) => {
356                        debug!(len = data.data.len(), "Received string schema");
357                        // Schema in string form - for JSON format
358                        let chunk = GrpcResultChunk::new(
359                            self.next_local_chunk_id,
360                            Bytes::from(data.data.into_bytes()),
361                        );
362                        self.next_local_chunk_id += 1;
363                        self.result.chunks.push_back(chunk);
364                    }
365                    None => {}
366                }
367            }
368            Some(ResponsePayload::QueryResult(query_result)) => {
369                self.process_query_result(query_result)?;
370            }
371            None => {
372                warn!("Received empty ExecuteQueryResponse");
373            }
374        }
375        Ok(())
376    }
377
378    #[expect(
379        clippy::unnecessary_wraps,
380        reason = "signature retained for API symmetry / future fallibility; returning Result/Option keeps callers from breaking when the function later grows failure cases"
381    )]
382    /// Processes a `QueryResult` message.
383    ///
384    /// Note: a single `GetQueryResult` RPC returns *multiple* `QueryResult`
385    /// messages for one server-side chunk (schema + N binary parts). Only
386    /// the local `GrpcResultChunk` label is bumped here — the server-side
387    /// chunk ID (`next_server_chunk_id`) is advanced by exactly 1 after the
388    /// RPC stream has been fully drained; see `read_results`.
389    fn process_query_result(&mut self, result: QueryResult) -> Result<()> {
390        // Extract data payload
391        if let Some(payload) = result.result {
392            let chunk = match payload {
393                QueryResultPayload::BinaryPart(data) => {
394                    debug!(bytes = data.data.len(), "Received binary result chunk");
395                    GrpcResultChunk::new(self.next_local_chunk_id, data.data)
396                }
397                QueryResultPayload::StringPart(data) => {
398                    // Convert string data to bytes
399                    debug!(len = data.data.len(), "Received string result chunk");
400                    GrpcResultChunk::new(
401                        self.next_local_chunk_id,
402                        Bytes::from(data.data.into_bytes()),
403                    )
404                }
405            };
406            self.next_local_chunk_id += 1;
407            self.result.chunks.push_back(chunk);
408        }
409
410        Ok(())
411    }
412
413    /// Processes a `QueryStatus` message.
414    fn process_query_status(&mut self, status: QueryStatus) {
415        debug!(
416            query_id = %status.query_id,
417            completion_status = ?CompletionStatus::try_from(status.completion_status),
418            "Received query status"
419        );
420
421        self.query_id = Some(status.query_id.clone());
422        self.result.query_id = Some(status.query_id.clone());
423        self.query_status = Some(status);
424    }
425
426    /// Requests query status via `GetQueryInfo`.
427    async fn request_status(&mut self) -> Result<()> {
428        let query_id = self
429            .query_id
430            .clone()
431            .ok_or_else(|| Error::new(ErrorKind::Protocol, "No query ID for status request"))?;
432
433        debug!(query_id = %query_id, "Requesting query status");
434
435        let param = QueryInfoParam {
436            query_id: query_id.clone(),
437            streaming: true,         // Enable streaming to get continuous updates
438            schema_output_format: 0, // OUTPUT_FORMAT_UNSPECIFIED - we don't need schema here
439        };
440
441        let mut request = tonic::Request::new(param);
442        // Add the required x-hyperdb-query-id header
443        if let Ok(value) = query_id.parse() {
444            request.metadata_mut().insert("x-hyperdb-query-id", value);
445        }
446        for (key, value) in &self.headers {
447            if let (Ok(key), Ok(value)) = (
448                key.parse::<tonic::metadata::MetadataKey<_>>(),
449                value.parse(),
450            ) {
451                request.metadata_mut().insert(key, value);
452            }
453        }
454
455        let response = self
456            .client
457            .get_query_info(request)
458            .await
459            .map_err(from_grpc_status)?;
460
461        self.query_info_stream = Some(response.into_inner());
462        self.state = ExecutorState::ReadStatus;
463        Ok(())
464    }
465
466    /// Reads query status from `GetQueryInfo` stream.
467    async fn read_status(&mut self) -> Result<()> {
468        use super::proto::hyper_service::query_info::Content as QueryInfoContent;
469
470        let stream = self
471            .query_info_stream
472            .as_mut()
473            .ok_or_else(|| Error::new(ErrorKind::Protocol, "QueryInfo stream not initialized"))?;
474
475        if let Some(info) = stream.message().await.map_err(from_grpc_status)? {
476            match info.content {
477                Some(QueryInfoContent::QueryStatus(status)) => {
478                    self.process_query_status(status.clone());
479
480                    match CompletionStatus::try_from(status.completion_status)
481                        .unwrap_or(CompletionStatus::RunningOrUnspecified)
482                    {
483                        CompletionStatus::Finished | CompletionStatus::ResultsProduced => {
484                            debug!("Query finished, requesting results");
485                            self.state = ExecutorState::RequestResults;
486                        }
487                        CompletionStatus::RunningOrUnspecified => {
488                            // Keep polling
489                            self.state = ExecutorState::RequestStatus;
490                        }
491                    }
492                }
493                Some(QueryInfoContent::BinarySchema(_) | QueryInfoContent::StringSchema(_)) => {
494                    // Schema received - just continue polling
495                    self.state = ExecutorState::RequestStatus;
496                }
497                None => {
498                    self.state = ExecutorState::RequestStatus;
499                }
500            }
501        }
502
503        Ok(())
504    }
505
506    /// Requests result chunks via `GetQueryResult`.
507    async fn request_results(&mut self) -> Result<()> {
508        use super::proto::hyper_service::query_result_param::RequestedData;
509
510        // Short-circuit when we've already fetched every chunk the server
511        // reported. This matches C++ `GrpcQueryExecutor`: don't emit a
512        // `GetQueryResult(chunk_id=k)` when `k >= chunk_count`. Otherwise
513        // the server returns error code 22023 ("chunk id out of range").
514        //
515        // Common case: ADAPTIVE with a small result that fit entirely in
516        // chunk 0 (delivered inline). `next_server_chunk_id` starts at 1
517        // and `chunk_count` is 1, so we skip straight to Finished.
518        if let Some(ref status) = self.query_status {
519            if status.chunk_count > 0 && self.next_server_chunk_id >= status.chunk_count {
520                debug!(
521                    total_chunks = status.chunk_count,
522                    next_chunk = self.next_server_chunk_id,
523                    "No more chunks to fetch",
524                );
525                self.state = ExecutorState::Finished;
526                return Ok(());
527            }
528        }
529
530        let query_id = self
531            .query_id
532            .clone()
533            .ok_or_else(|| Error::new(ErrorKind::Protocol, "No query ID for result request"))?;
534
535        debug!(
536            query_id = %query_id,
537            chunk_id = self.next_server_chunk_id,
538            "Requesting result chunks"
539        );
540
541        let param = QueryResultParam {
542            query_id: query_id.clone(),
543            output_format: super::proto::OutputFormat::ArrowIpc.into(),
544            requested_data: Some(RequestedData::ChunkId(self.next_server_chunk_id)),
545            // The schema is delivered inline on the initial `ExecuteQuery`
546            // stream for both ASYNC and ADAPTIVE (hyperd sends a
547            // `QueryInfo.binary_schema` message before closing that
548            // stream). Asking for it again on every `GetQueryResult`
549            // would emit extra schema frames that confuse an incremental
550            // Arrow IPC decoder. C++ `GrpcQueryExecutor` also sets this
551            // to `true`.
552            omit_schema: true,
553        };
554
555        let mut request = tonic::Request::new(param);
556        // Add the required x-hyperdb-query-id header
557        if let Ok(value) = query_id.parse() {
558            request.metadata_mut().insert("x-hyperdb-query-id", value);
559        }
560        for (key, value) in &self.headers {
561            if let (Ok(key), Ok(value)) = (
562                key.parse::<tonic::metadata::MetadataKey<_>>(),
563                value.parse(),
564            ) {
565                request.metadata_mut().insert(key, value);
566            }
567        }
568
569        let response = self
570            .client
571            .get_query_result(request)
572            .await
573            .map_err(from_grpc_status)?;
574
575        self.query_result_stream = Some(response.into_inner());
576        self.state = ExecutorState::ReadResults;
577        Ok(())
578    }
579
580    /// Reads result chunks from `GetQueryResult` stream.
581    async fn read_results(&mut self) -> Result<()> {
582        loop {
583            let result = {
584                let stream = self.query_result_stream.as_mut().ok_or_else(|| {
585                    Error::new(ErrorKind::Protocol, "QueryResult stream not initialized")
586                })?;
587                stream.message().await.map_err(from_grpc_status)?
588            };
589
590            match result {
591                Some(result) => {
592                    self.process_query_result(result)?;
593                }
594                None => break,
595            }
596        }
597
598        // One GetQueryResult RPC corresponds to exactly one server-side
599        // chunk, regardless of how many QueryResult messages it contained
600        // on the wire. Advance by 1 (matches C++ GrpcQueryExecutor).
601        self.next_server_chunk_id += 1;
602
603        // Check if there are more chunks
604        if let Some(ref status) = self.query_status {
605            let total_chunks = status.chunk_count;
606            if self.next_server_chunk_id >= total_chunks {
607                debug!(total_chunks, "All chunks received");
608                self.state = ExecutorState::Finished;
609            } else {
610                debug!(
611                    next_chunk = self.next_server_chunk_id,
612                    total_chunks, "More chunks available"
613                );
614                self.state = ExecutorState::RequestResults;
615            }
616        } else {
617            self.state = ExecutorState::Finished;
618        }
619
620        Ok(())
621    }
622}
623
624// ============================================================================
625// Streaming chunk producer
626// ============================================================================
627
628/// Streaming producer of Arrow IPC byte chunks from a gRPC query.
629///
630/// Unlike [`GrpcClient::execute_query`][`super::client::GrpcClient::execute_query`],
631/// which drains every result chunk into a single [`GrpcQueryResult`] before
632/// returning, this type yields one [`Bytes`] chunk at a time. The caller can
633/// decode each chunk (e.g. via `arrow_ipc::reader::StreamDecoder`) and drop
634/// it before fetching the next, so memory stays bounded by roughly one
635/// message (capped at the tonic `max_decoding_message_size`, default 64 MB)
636/// regardless of total result size.
637///
638/// Built by [`GrpcClient::execute_query_stream`][`super::client::GrpcClient::execute_query_stream`]
639/// and the `AuthenticatedGrpcClient` variant.
640pub struct GrpcChunkStream {
641    executor: GrpcQueryExecutor<tonic::transport::Channel>,
642    pending: std::collections::VecDeque<bytes::Bytes>,
643    schema: Option<super::proto::QueryResultSchema>,
644    query_id: Option<String>,
645    rows_affected: Option<u64>,
646    done: bool,
647}
648
649impl std::fmt::Debug for GrpcChunkStream {
650    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
651        f.debug_struct("GrpcChunkStream")
652            .field("pending_chunks", &self.pending.len())
653            .field("query_id", &self.query_id)
654            .field("rows_affected", &self.rows_affected)
655            .field("done", &self.done)
656            .finish_non_exhaustive()
657    }
658}
659
660impl GrpcChunkStream {
661    pub(crate) fn new(executor: GrpcQueryExecutor<tonic::transport::Channel>) -> Self {
662        GrpcChunkStream {
663            executor,
664            pending: std::collections::VecDeque::new(),
665            schema: None,
666            query_id: None,
667            rows_affected: None,
668            done: false,
669        }
670    }
671
672    /// Returns the next Arrow IPC byte chunk from the stream, or `None` when
673    /// the server has signalled that the stream is complete.
674    ///
675    /// # Errors
676    ///
677    /// Propagates any error from the underlying executor's
678    /// `next_result` call — typically [`tonic::Status`] errors wrapped
679    /// as [`Error`] (server-side query failure, auth expiry, or
680    /// transport-level gRPC errors).
681    pub async fn next_chunk(&mut self) -> Result<Option<bytes::Bytes>> {
682        loop {
683            if let Some(b) = self.pending.pop_front() {
684                return Ok(Some(b));
685            }
686            if self.done {
687                return Ok(None);
688            }
689            match self.executor.next_result().await? {
690                Some(mut partial) => {
691                    if self.schema.is_none() {
692                        self.schema = partial.schema.take();
693                    }
694                    if self.query_id.is_none() {
695                        self.query_id = partial.query_id.take();
696                    }
697                    if partial.rows_affected.is_some() {
698                        self.rows_affected = partial.rows_affected;
699                    }
700                    while let Some(chunk) = partial.take_chunk() {
701                        self.pending.push_back(chunk.data);
702                    }
703                    if partial.is_complete {
704                        self.done = true;
705                    }
706                }
707                None => {
708                    self.done = true;
709                }
710            }
711        }
712    }
713
714    /// Returns the schema reported by the server for this query, if one has
715    /// been received yet.
716    ///
717    /// The schema is typically delivered as the first message on the stream,
718    /// so it is usually available after the first `next_chunk()` call.
719    pub fn schema(&self) -> Option<&super::proto::QueryResultSchema> {
720        self.schema.as_ref()
721    }
722
723    /// Returns the server-assigned query ID, if one has been received.
724    pub fn query_id(&self) -> Option<&str> {
725        self.query_id.as_deref()
726    }
727
728    /// Returns the affected row count for DML queries, if reported.
729    pub fn rows_affected(&self) -> Option<u64> {
730        self.rows_affected
731    }
732}