hyperdb_api_core/client/grpc/executor.rs
1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! gRPC query executor with state machine for result fetching.
5//!
6//! This module implements the query execution state machine that handles
7//! the different transfer modes (SYNC, ASYNC, ADAPTIVE) and manages
8//! fetching results from the Hyper gRPC service.
9//!
10//! # Transfer Mode State Machines
11//!
12//! Each transfer mode follows a different path through the executor states:
13//!
14//! **SYNC** — simplest path, all data in one response:
15//! ```text
16//! ReadInitialResults ──(stream exhausted)──> Finished
17//! ```
18//! The `ExecuteQuery` RPC returns a server-streaming response containing the
19//! schema header followed by one or more binary/string data parts, then the
20//! stream closes. Subject to the server's 100-second SYNC timeout.
21//!
22//! **ASYNC** — decouples submission from fetching:
23//! ```text
24//! ReadInitialResults ──(QueryStatus: Running)──> RequestStatus
25//! ──> ReadStatus ──(Running)──> RequestStatus (poll loop)
26//! ──> ReadStatus ──(Finished)──> RequestResults
27//! ──> ReadResults ──(more chunks)──> RequestResults
28//! ──> ReadResults ──(all chunks)──> Finished
29//! ```
30//! The initial `ExecuteQuery` response contains only a `QueryStatus` with a
31//! server-assigned `query_id`. The client polls `GetQueryInfo` until
32//! `CompletionStatus::Finished`, then fetches result chunks via
33//! `GetQueryResult` using chunk IDs.
34//!
35//! **ADAPTIVE** (default, recommended) — hybrid of SYNC and ASYNC:
36//! ```text
37//! ReadInitialResults ──(data + Finished)──> Finished (small result)
38//! ReadInitialResults ──(data + Running)──> RequestStatus (large result)
39//! ──> ... (same as ASYNC from here)
40//! ```
41//! The first chunk of results is returned inline in the `ExecuteQuery`
42//! response. If the query completes within that first chunk, the path is
43//! identical to SYNC (no polling). If the result is larger, the response
44//! includes a `QueryStatus` with `Running` and the executor transitions
45//! to the ASYNC polling path for remaining chunks.
46
47use bytes::Bytes;
48use tonic::Streaming;
49use tracing::{debug, trace, warn};
50
51use crate::client::error::{Error, ErrorKind, Result};
52
53use super::error::from_grpc_status;
54use super::proto::hyper_service::query_param::TransferMode;
55use super::proto::hyper_service::query_result::Result as QueryResultPayload;
56use super::proto::hyper_service::query_status::CompletionStatus;
57use super::proto::{
58 ExecuteQueryResponse, HyperServiceClient, QueryInfo, QueryInfoParam, QueryResult,
59 QueryResultParam, QueryStatus,
60};
61use super::result::{GrpcQueryResult, GrpcResultChunk};
62
63/// State of the query executor.
64#[derive(Debug, Clone, Copy, PartialEq, Eq)]
65enum ExecutorState {
66 /// Reading initial results from `ExecuteQuery` stream
67 ReadInitialResults,
68 /// Requesting query status via `GetQueryInfo`
69 RequestStatus,
70 /// Reading query status
71 ReadStatus,
72 /// Requesting result chunks via `GetQueryResult`
73 RequestResults,
74 /// Reading result chunks
75 ReadResults,
76 /// Query execution complete
77 Finished,
78}
79
80/// Executes gRPC queries and manages result fetching.
81///
82/// This mirrors the C++ `GrpcQueryExecutor` implementation, handling the
83/// different transfer modes and async result fetching.
84pub(crate) struct GrpcQueryExecutor<T> {
85 /// The gRPC client
86 client: HyperServiceClient<T>,
87 /// Metadata headers for requests
88 headers: Vec<(String, String)>,
89 /// Current state of the executor
90 state: ExecutorState,
91 /// Transfer mode being used
92 transfer_mode: TransferMode,
93 /// Stream for `ExecuteQuery` responses
94 execute_stream: Option<Streaming<ExecuteQueryResponse>>,
95 /// Stream for `GetQueryInfo` responses
96 query_info_stream: Option<Streaming<QueryInfo>>,
97 /// Stream for `GetQueryResult` responses
98 query_result_stream: Option<Streaming<QueryResult>>,
99 /// Query status from server
100 query_status: Option<QueryStatus>,
101 /// Query ID for async operations
102 query_id: Option<String>,
103 /// Monotonic label for the next `GrpcResultChunk` appended to
104 /// `self.result.chunks`. Bumped once per received `QueryResult` /
105 /// `BinaryPart` / `StringPart` message. This is *purely* a local
106 /// identifier for downstream consumers and has no relationship to
107 /// server-side chunk IDs.
108 next_local_chunk_id: u64,
109 /// Server-side chunk ID to request in the next `GetQueryResult` RPC,
110 /// and the value compared against `QueryStatus.chunk_count` to decide
111 /// when all chunks have been fetched.
112 ///
113 /// Bumped by exactly 1 after each `GetQueryResult` stream is fully
114 /// drained — regardless of how many `QueryResult` messages that stream
115 /// contained. Mirrors `nextChunkId_` in the C++ `GrpcQueryExecutor`.
116 ///
117 /// Initial value depends on transfer mode:
118 /// - `ASYNC`: `0` — no chunks delivered inline.
119 /// - `ADAPTIVE`: `1` — server sends chunk 0 inline on `ExecuteQuery`.
120 /// - `SYNC`: unused (all data arrives via `ExecuteQuery`).
121 next_server_chunk_id: u64,
122 /// Result being built
123 result: GrpcQueryResult,
124}
125
126impl<T> GrpcQueryExecutor<T>
127where
128 T: tonic::client::GrpcService<tonic::body::Body> + Clone + Send + 'static,
129 T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
130 <T::ResponseBody as tonic::codegen::Body>::Error:
131 Into<tonic::codegen::StdError> + Send + 'static,
132 T::Future: Send,
133{
134 /// Creates a new query executor.
135 pub(crate) fn new(
136 client: HyperServiceClient<T>,
137 headers: Vec<(String, String)>,
138 transfer_mode: TransferMode,
139 ) -> Self {
140 // Match the C++ `GrpcQueryExecutor` constructor: ADAPTIVE delivers
141 // server-side chunk 0 inline on the `ExecuteQuery` response, so the
142 // first chunk we need to request via `GetQueryResult` is chunk 1.
143 // ASYNC doesn't deliver any chunks inline, so we start at 0. SYNC
144 // never reaches the GetQueryResult state machine.
145 let next_server_chunk_id = match transfer_mode {
146 TransferMode::Adaptive => 1,
147 _ => 0,
148 };
149 GrpcQueryExecutor {
150 client,
151 headers,
152 state: ExecutorState::ReadInitialResults,
153 transfer_mode,
154 execute_stream: None,
155 query_info_stream: None,
156 query_result_stream: None,
157 query_status: None,
158 query_id: None,
159 next_local_chunk_id: 0,
160 next_server_chunk_id,
161 result: GrpcQueryResult::new(),
162 }
163 }
164
165 /// Starts query execution.
166 pub(crate) async fn execute(&mut self, query: super::proto::QueryParam) -> Result<()> {
167 debug!(query = %query.query, transfer_mode = ?self.transfer_mode, "Executing gRPC query");
168
169 // Create request with metadata
170 let mut request = tonic::Request::new(query);
171 for (key, value) in &self.headers {
172 if let (Ok(key), Ok(value)) = (
173 key.parse::<tonic::metadata::MetadataKey<_>>(),
174 value.parse(),
175 ) {
176 request.metadata_mut().insert(key, value);
177 }
178 }
179
180 // Execute the query
181 let response = self
182 .client
183 .execute_query(request)
184 .await
185 .map_err(from_grpc_status)?;
186
187 self.execute_stream = Some(response.into_inner());
188 self.state = ExecutorState::ReadInitialResults;
189
190 Ok(())
191 }
192
193 /// Gets the next result.
194 ///
195 /// Returns `None` when all results have been consumed.
196 pub(crate) async fn next_result(&mut self) -> Result<Option<GrpcQueryResult>> {
197 loop {
198 trace!(state = ?self.state, "Query executor state");
199
200 match self.state {
201 ExecutorState::ReadInitialResults => {
202 self.read_initial_results().await?;
203 }
204 ExecutorState::RequestStatus => {
205 self.request_status().await?;
206 }
207 ExecutorState::ReadStatus => {
208 self.read_status().await?;
209 }
210 ExecutorState::RequestResults => {
211 self.request_results().await?;
212 }
213 ExecutorState::ReadResults => {
214 self.read_results().await?;
215 }
216 ExecutorState::Finished => {
217 self.result.is_complete = true;
218 // Return the accumulated result
219 return Ok(Some(std::mem::take(&mut self.result)));
220 }
221 }
222
223 // Yield back to the caller as soon as we either have some
224 // chunks to deliver or have reached the terminal state.
225 // Streaming out of `ReadInitialResults` keeps peak memory
226 // bounded for SYNC/inline paths (otherwise we would buffer
227 // the entire ExecuteQuery stream before the first yield).
228 if self.state == ExecutorState::Finished || !self.result.chunks.is_empty() {
229 break;
230 }
231 }
232
233 if self.result.is_complete || !self.result.chunks.is_empty() {
234 Ok(Some(std::mem::take(&mut self.result)))
235 } else {
236 Ok(None)
237 }
238 }
239
240 /// Reads one message from the `ExecuteQuery` stream.
241 ///
242 /// We deliberately do **not** transition state the moment we see a
243 /// `QueryStatus` — under `ADAPTIVE` the server delivers the whole of
244 /// chunk 0 inline *followed by* a `QueryStatus(Running)` and then
245 /// closes the stream, so bailing early would silently drop the tail
246 /// of chunk 0. Only the server-side close (a `None` message) decides
247 /// where to go next. This mirrors the C++ `GrpcQueryExecutor::
248 /// READ_INITIAL_RESULTS` loop, which reads until `Read()` returns
249 /// false and only then inspects `queryStatus_` to pick the next
250 /// state.
251 ///
252 /// One message per call keeps memory bounded: the outer `next_result`
253 /// loop yields accumulated chunks back to the caller as they arrive
254 /// instead of buffering the entire inline response.
255 async fn read_initial_results(&mut self) -> Result<()> {
256 let response = {
257 let stream = self.execute_stream.as_mut().ok_or_else(|| {
258 Error::new(ErrorKind::Protocol, "ExecuteQuery stream not initialized")
259 })?;
260 stream.message().await.map_err(from_grpc_status)?
261 };
262
263 match response {
264 Some(response) => {
265 self.process_execute_response(response)?;
266 }
267 None => {
268 // Server closed the stream. Where we go next is determined
269 // purely by transfer mode, mirroring the C++
270 // `GrpcQueryExecutor`:
271 // - SYNC: all data is inline, we're done.
272 // - ASYNC / ADAPTIVE: go to the GetQueryInfo/
273 // GetQueryResult state machine. A `QueryStatus` of
274 // `Finished` here means *query execution* is finished,
275 // NOT that all chunks have been streamed — server-side
276 // chunks 1..N still need to be fetched for ADAPTIVE
277 // (and 0..N for ASYNC).
278 match self.transfer_mode {
279 TransferMode::Sync | TransferMode::Unspecified => {
280 debug!(
281 query_id = ?self.query_id,
282 "ExecuteQuery stream closed; SYNC mode complete",
283 );
284 self.state = ExecutorState::Finished;
285 }
286 TransferMode::Async | TransferMode::Adaptive => {
287 debug!(
288 query_id = ?self.query_id,
289 mode = ?self.transfer_mode,
290 next_chunk = self.next_server_chunk_id,
291 "ExecuteQuery stream closed; fetching remaining chunks",
292 );
293 self.state = ExecutorState::RequestStatus;
294 }
295 }
296 }
297 }
298
299 Ok(())
300 }
301
302 /// Processes an `ExecuteQueryResponse` message.
303 fn process_execute_response(&mut self, response: ExecuteQueryResponse) -> Result<()> {
304 use super::proto::hyper_service::execute_query_response::Result as ResponsePayload;
305 use super::proto::hyper_service::query_info::Content as QueryInfoContent;
306 use super::proto::hyper_service::query_result_header::Header;
307
308 match response.result {
309 Some(ResponsePayload::Header(header)) => match header.header {
310 Some(Header::Schema(schema)) => {
311 debug!(columns = schema.columns.len(), "Received schema");
312 self.result.schema = Some(schema);
313 }
314 Some(Header::Command(cmd)) => {
315 use super::proto::hyper_service::query_command_ok::CommandReturn;
316 let rows = match cmd.command_return {
317 Some(CommandReturn::AffectedRows(n)) => Some(n),
318 Some(CommandReturn::Empty(())) | None => None,
319 };
320 debug!(rows_affected = ?rows, "Command OK");
321 self.result.rows_affected = rows;
322 self.state = ExecutorState::Finished;
323 }
324 None => {
325 warn!("Received empty QueryResultHeader");
326 }
327 },
328 Some(ResponsePayload::BinaryPart(data)) => {
329 debug!(bytes = data.data.len(), "Received binary result part");
330 let chunk = GrpcResultChunk::new(self.next_local_chunk_id, data.data);
331 self.next_local_chunk_id += 1;
332 self.result.chunks.push_back(chunk);
333 }
334 Some(ResponsePayload::StringPart(data)) => {
335 debug!(len = data.data.len(), "Received string result part");
336 let chunk = GrpcResultChunk::new(
337 self.next_local_chunk_id,
338 Bytes::from(data.data.into_bytes()),
339 );
340 self.next_local_chunk_id += 1;
341 self.result.chunks.push_back(chunk);
342 }
343 Some(ResponsePayload::QueryInfo(info)) => {
344 match info.content {
345 Some(QueryInfoContent::QueryStatus(status)) => {
346 self.process_query_status(status);
347 }
348 Some(QueryInfoContent::BinarySchema(data)) => {
349 debug!(bytes = data.data.len(), "Received binary schema");
350 // Schema in binary form - store as a chunk
351 let chunk = GrpcResultChunk::new(self.next_local_chunk_id, data.data);
352 self.next_local_chunk_id += 1;
353 self.result.chunks.push_back(chunk);
354 }
355 Some(QueryInfoContent::StringSchema(data)) => {
356 debug!(len = data.data.len(), "Received string schema");
357 // Schema in string form - for JSON format
358 let chunk = GrpcResultChunk::new(
359 self.next_local_chunk_id,
360 Bytes::from(data.data.into_bytes()),
361 );
362 self.next_local_chunk_id += 1;
363 self.result.chunks.push_back(chunk);
364 }
365 None => {}
366 }
367 }
368 Some(ResponsePayload::QueryResult(query_result)) => {
369 self.process_query_result(query_result)?;
370 }
371 None => {
372 warn!("Received empty ExecuteQueryResponse");
373 }
374 }
375 Ok(())
376 }
377
378 #[expect(
379 clippy::unnecessary_wraps,
380 reason = "signature retained for API symmetry / future fallibility; returning Result/Option keeps callers from breaking when the function later grows failure cases"
381 )]
382 /// Processes a `QueryResult` message.
383 ///
384 /// Note: a single `GetQueryResult` RPC returns *multiple* `QueryResult`
385 /// messages for one server-side chunk (schema + N binary parts). Only
386 /// the local `GrpcResultChunk` label is bumped here — the server-side
387 /// chunk ID (`next_server_chunk_id`) is advanced by exactly 1 after the
388 /// RPC stream has been fully drained; see `read_results`.
389 fn process_query_result(&mut self, result: QueryResult) -> Result<()> {
390 // Extract data payload
391 if let Some(payload) = result.result {
392 let chunk = match payload {
393 QueryResultPayload::BinaryPart(data) => {
394 debug!(bytes = data.data.len(), "Received binary result chunk");
395 GrpcResultChunk::new(self.next_local_chunk_id, data.data)
396 }
397 QueryResultPayload::StringPart(data) => {
398 // Convert string data to bytes
399 debug!(len = data.data.len(), "Received string result chunk");
400 GrpcResultChunk::new(
401 self.next_local_chunk_id,
402 Bytes::from(data.data.into_bytes()),
403 )
404 }
405 };
406 self.next_local_chunk_id += 1;
407 self.result.chunks.push_back(chunk);
408 }
409
410 Ok(())
411 }
412
413 /// Processes a `QueryStatus` message.
414 fn process_query_status(&mut self, status: QueryStatus) {
415 debug!(
416 query_id = %status.query_id,
417 completion_status = ?CompletionStatus::try_from(status.completion_status),
418 "Received query status"
419 );
420
421 self.query_id = Some(status.query_id.clone());
422 self.result.query_id = Some(status.query_id.clone());
423 self.query_status = Some(status);
424 }
425
426 /// Requests query status via `GetQueryInfo`.
427 async fn request_status(&mut self) -> Result<()> {
428 let query_id = self
429 .query_id
430 .clone()
431 .ok_or_else(|| Error::new(ErrorKind::Protocol, "No query ID for status request"))?;
432
433 debug!(query_id = %query_id, "Requesting query status");
434
435 let param = QueryInfoParam {
436 query_id: query_id.clone(),
437 streaming: true, // Enable streaming to get continuous updates
438 schema_output_format: 0, // OUTPUT_FORMAT_UNSPECIFIED - we don't need schema here
439 };
440
441 let mut request = tonic::Request::new(param);
442 // Add the required x-hyperdb-query-id header
443 if let Ok(value) = query_id.parse() {
444 request.metadata_mut().insert("x-hyperdb-query-id", value);
445 }
446 for (key, value) in &self.headers {
447 if let (Ok(key), Ok(value)) = (
448 key.parse::<tonic::metadata::MetadataKey<_>>(),
449 value.parse(),
450 ) {
451 request.metadata_mut().insert(key, value);
452 }
453 }
454
455 let response = self
456 .client
457 .get_query_info(request)
458 .await
459 .map_err(from_grpc_status)?;
460
461 self.query_info_stream = Some(response.into_inner());
462 self.state = ExecutorState::ReadStatus;
463 Ok(())
464 }
465
466 /// Reads query status from `GetQueryInfo` stream.
467 async fn read_status(&mut self) -> Result<()> {
468 use super::proto::hyper_service::query_info::Content as QueryInfoContent;
469
470 let stream = self
471 .query_info_stream
472 .as_mut()
473 .ok_or_else(|| Error::new(ErrorKind::Protocol, "QueryInfo stream not initialized"))?;
474
475 if let Some(info) = stream.message().await.map_err(from_grpc_status)? {
476 match info.content {
477 Some(QueryInfoContent::QueryStatus(status)) => {
478 self.process_query_status(status.clone());
479
480 match CompletionStatus::try_from(status.completion_status)
481 .unwrap_or(CompletionStatus::RunningOrUnspecified)
482 {
483 CompletionStatus::Finished | CompletionStatus::ResultsProduced => {
484 debug!("Query finished, requesting results");
485 self.state = ExecutorState::RequestResults;
486 }
487 CompletionStatus::RunningOrUnspecified => {
488 // Keep polling
489 self.state = ExecutorState::RequestStatus;
490 }
491 }
492 }
493 Some(QueryInfoContent::BinarySchema(_) | QueryInfoContent::StringSchema(_)) => {
494 // Schema received - just continue polling
495 self.state = ExecutorState::RequestStatus;
496 }
497 None => {
498 self.state = ExecutorState::RequestStatus;
499 }
500 }
501 }
502
503 Ok(())
504 }
505
506 /// Requests result chunks via `GetQueryResult`.
507 async fn request_results(&mut self) -> Result<()> {
508 use super::proto::hyper_service::query_result_param::RequestedData;
509
510 // Short-circuit when we've already fetched every chunk the server
511 // reported. This matches C++ `GrpcQueryExecutor`: don't emit a
512 // `GetQueryResult(chunk_id=k)` when `k >= chunk_count`. Otherwise
513 // the server returns error code 22023 ("chunk id out of range").
514 //
515 // Common case: ADAPTIVE with a small result that fit entirely in
516 // chunk 0 (delivered inline). `next_server_chunk_id` starts at 1
517 // and `chunk_count` is 1, so we skip straight to Finished.
518 if let Some(ref status) = self.query_status {
519 if status.chunk_count > 0 && self.next_server_chunk_id >= status.chunk_count {
520 debug!(
521 total_chunks = status.chunk_count,
522 next_chunk = self.next_server_chunk_id,
523 "No more chunks to fetch",
524 );
525 self.state = ExecutorState::Finished;
526 return Ok(());
527 }
528 }
529
530 let query_id = self
531 .query_id
532 .clone()
533 .ok_or_else(|| Error::new(ErrorKind::Protocol, "No query ID for result request"))?;
534
535 debug!(
536 query_id = %query_id,
537 chunk_id = self.next_server_chunk_id,
538 "Requesting result chunks"
539 );
540
541 let param = QueryResultParam {
542 query_id: query_id.clone(),
543 output_format: super::proto::OutputFormat::ArrowIpc.into(),
544 requested_data: Some(RequestedData::ChunkId(self.next_server_chunk_id)),
545 // The schema is delivered inline on the initial `ExecuteQuery`
546 // stream for both ASYNC and ADAPTIVE (hyperd sends a
547 // `QueryInfo.binary_schema` message before closing that
548 // stream). Asking for it again on every `GetQueryResult`
549 // would emit extra schema frames that confuse an incremental
550 // Arrow IPC decoder. C++ `GrpcQueryExecutor` also sets this
551 // to `true`.
552 omit_schema: true,
553 };
554
555 let mut request = tonic::Request::new(param);
556 // Add the required x-hyperdb-query-id header
557 if let Ok(value) = query_id.parse() {
558 request.metadata_mut().insert("x-hyperdb-query-id", value);
559 }
560 for (key, value) in &self.headers {
561 if let (Ok(key), Ok(value)) = (
562 key.parse::<tonic::metadata::MetadataKey<_>>(),
563 value.parse(),
564 ) {
565 request.metadata_mut().insert(key, value);
566 }
567 }
568
569 let response = self
570 .client
571 .get_query_result(request)
572 .await
573 .map_err(from_grpc_status)?;
574
575 self.query_result_stream = Some(response.into_inner());
576 self.state = ExecutorState::ReadResults;
577 Ok(())
578 }
579
580 /// Reads result chunks from `GetQueryResult` stream.
581 async fn read_results(&mut self) -> Result<()> {
582 loop {
583 let result = {
584 let stream = self.query_result_stream.as_mut().ok_or_else(|| {
585 Error::new(ErrorKind::Protocol, "QueryResult stream not initialized")
586 })?;
587 stream.message().await.map_err(from_grpc_status)?
588 };
589
590 match result {
591 Some(result) => {
592 self.process_query_result(result)?;
593 }
594 None => break,
595 }
596 }
597
598 // One GetQueryResult RPC corresponds to exactly one server-side
599 // chunk, regardless of how many QueryResult messages it contained
600 // on the wire. Advance by 1 (matches C++ GrpcQueryExecutor).
601 self.next_server_chunk_id += 1;
602
603 // Check if there are more chunks
604 if let Some(ref status) = self.query_status {
605 let total_chunks = status.chunk_count;
606 if self.next_server_chunk_id >= total_chunks {
607 debug!(total_chunks, "All chunks received");
608 self.state = ExecutorState::Finished;
609 } else {
610 debug!(
611 next_chunk = self.next_server_chunk_id,
612 total_chunks, "More chunks available"
613 );
614 self.state = ExecutorState::RequestResults;
615 }
616 } else {
617 self.state = ExecutorState::Finished;
618 }
619
620 Ok(())
621 }
622}
623
624// ============================================================================
625// Streaming chunk producer
626// ============================================================================
627
628/// Streaming producer of Arrow IPC byte chunks from a gRPC query.
629///
630/// Unlike [`GrpcClient::execute_query`][`super::client::GrpcClient::execute_query`],
631/// which drains every result chunk into a single [`GrpcQueryResult`] before
632/// returning, this type yields one [`Bytes`] chunk at a time. The caller can
633/// decode each chunk (e.g. via `arrow_ipc::reader::StreamDecoder`) and drop
634/// it before fetching the next, so memory stays bounded by roughly one
635/// message (capped at the tonic `max_decoding_message_size`, default 64 MB)
636/// regardless of total result size.
637///
638/// Built by [`GrpcClient::execute_query_stream`][`super::client::GrpcClient::execute_query_stream`]
639/// and the `AuthenticatedGrpcClient` variant.
640pub struct GrpcChunkStream {
641 executor: GrpcQueryExecutor<tonic::transport::Channel>,
642 pending: std::collections::VecDeque<bytes::Bytes>,
643 schema: Option<super::proto::QueryResultSchema>,
644 query_id: Option<String>,
645 rows_affected: Option<u64>,
646 done: bool,
647}
648
649impl std::fmt::Debug for GrpcChunkStream {
650 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
651 f.debug_struct("GrpcChunkStream")
652 .field("pending_chunks", &self.pending.len())
653 .field("query_id", &self.query_id)
654 .field("rows_affected", &self.rows_affected)
655 .field("done", &self.done)
656 .finish_non_exhaustive()
657 }
658}
659
660impl GrpcChunkStream {
661 pub(crate) fn new(executor: GrpcQueryExecutor<tonic::transport::Channel>) -> Self {
662 GrpcChunkStream {
663 executor,
664 pending: std::collections::VecDeque::new(),
665 schema: None,
666 query_id: None,
667 rows_affected: None,
668 done: false,
669 }
670 }
671
672 /// Returns the next Arrow IPC byte chunk from the stream, or `None` when
673 /// the server has signalled that the stream is complete.
674 ///
675 /// # Errors
676 ///
677 /// Propagates any error from the underlying executor's
678 /// `next_result` call — typically [`tonic::Status`] errors wrapped
679 /// as [`Error`] (server-side query failure, auth expiry, or
680 /// transport-level gRPC errors).
681 pub async fn next_chunk(&mut self) -> Result<Option<bytes::Bytes>> {
682 loop {
683 if let Some(b) = self.pending.pop_front() {
684 return Ok(Some(b));
685 }
686 if self.done {
687 return Ok(None);
688 }
689 match self.executor.next_result().await? {
690 Some(mut partial) => {
691 if self.schema.is_none() {
692 self.schema = partial.schema.take();
693 }
694 if self.query_id.is_none() {
695 self.query_id = partial.query_id.take();
696 }
697 if partial.rows_affected.is_some() {
698 self.rows_affected = partial.rows_affected;
699 }
700 while let Some(chunk) = partial.take_chunk() {
701 self.pending.push_back(chunk.data);
702 }
703 if partial.is_complete {
704 self.done = true;
705 }
706 }
707 None => {
708 self.done = true;
709 }
710 }
711 }
712 }
713
714 /// Returns the schema reported by the server for this query, if one has
715 /// been received yet.
716 ///
717 /// The schema is typically delivered as the first message on the stream,
718 /// so it is usually available after the first `next_chunk()` call.
719 pub fn schema(&self) -> Option<&super::proto::QueryResultSchema> {
720 self.schema.as_ref()
721 }
722
723 /// Returns the server-assigned query ID, if one has been received.
724 pub fn query_id(&self) -> Option<&str> {
725 self.query_id.as_deref()
726 }
727
728 /// Returns the affected row count for DML queries, if reported.
729 pub fn rows_affected(&self) -> Option<u64> {
730 self.rows_affected
731 }
732}