use bytes::Bytes;
use tonic::Streaming;
use tracing::{debug, trace, warn};
use crate::client::error::{Error, ErrorKind, Result};
use super::error::from_grpc_status;
use super::proto::hyper_service::query_param::TransferMode;
use super::proto::hyper_service::query_result::Result as QueryResultPayload;
use super::proto::hyper_service::query_status::CompletionStatus;
use super::proto::{
ExecuteQueryResponse, HyperServiceClient, QueryInfo, QueryInfoParam, QueryResult,
QueryResultParam, QueryStatus,
};
use super::result::{GrpcQueryResult, GrpcResultChunk};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ExecutorState {
ReadInitialResults,
RequestStatus,
ReadStatus,
RequestResults,
ReadResults,
Finished,
}
pub(crate) struct GrpcQueryExecutor<T> {
client: HyperServiceClient<T>,
headers: Vec<(String, String)>,
state: ExecutorState,
transfer_mode: TransferMode,
execute_stream: Option<Streaming<ExecuteQueryResponse>>,
query_info_stream: Option<Streaming<QueryInfo>>,
query_result_stream: Option<Streaming<QueryResult>>,
query_status: Option<QueryStatus>,
query_id: Option<String>,
next_local_chunk_id: u64,
next_server_chunk_id: u64,
result: GrpcQueryResult,
}
impl<T> GrpcQueryExecutor<T>
where
T: tonic::client::GrpcService<tonic::body::Body> + Clone + Send + 'static,
T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
<T::ResponseBody as tonic::codegen::Body>::Error:
Into<tonic::codegen::StdError> + Send + 'static,
T::Future: Send,
{
pub(crate) fn new(
client: HyperServiceClient<T>,
headers: Vec<(String, String)>,
transfer_mode: TransferMode,
) -> Self {
let next_server_chunk_id = match transfer_mode {
TransferMode::Adaptive => 1,
_ => 0,
};
GrpcQueryExecutor {
client,
headers,
state: ExecutorState::ReadInitialResults,
transfer_mode,
execute_stream: None,
query_info_stream: None,
query_result_stream: None,
query_status: None,
query_id: None,
next_local_chunk_id: 0,
next_server_chunk_id,
result: GrpcQueryResult::new(),
}
}
pub(crate) async fn execute(&mut self, query: super::proto::QueryParam) -> Result<()> {
debug!(query = %query.query, transfer_mode = ?self.transfer_mode, "Executing gRPC query");
let mut request = tonic::Request::new(query);
for (key, value) in &self.headers {
if let (Ok(key), Ok(value)) = (
key.parse::<tonic::metadata::MetadataKey<_>>(),
value.parse(),
) {
request.metadata_mut().insert(key, value);
}
}
let response = self
.client
.execute_query(request)
.await
.map_err(from_grpc_status)?;
self.execute_stream = Some(response.into_inner());
self.state = ExecutorState::ReadInitialResults;
Ok(())
}
pub(crate) async fn next_result(&mut self) -> Result<Option<GrpcQueryResult>> {
loop {
trace!(state = ?self.state, "Query executor state");
match self.state {
ExecutorState::ReadInitialResults => {
self.read_initial_results().await?;
}
ExecutorState::RequestStatus => {
self.request_status().await?;
}
ExecutorState::ReadStatus => {
self.read_status().await?;
}
ExecutorState::RequestResults => {
self.request_results().await?;
}
ExecutorState::ReadResults => {
self.read_results().await?;
}
ExecutorState::Finished => {
self.result.is_complete = true;
return Ok(Some(std::mem::take(&mut self.result)));
}
}
if self.state == ExecutorState::Finished || !self.result.chunks.is_empty() {
break;
}
}
if self.result.is_complete || !self.result.chunks.is_empty() {
Ok(Some(std::mem::take(&mut self.result)))
} else {
Ok(None)
}
}
async fn read_initial_results(&mut self) -> Result<()> {
let response = {
let stream = self.execute_stream.as_mut().ok_or_else(|| {
Error::new(ErrorKind::Protocol, "ExecuteQuery stream not initialized")
})?;
stream.message().await.map_err(from_grpc_status)?
};
match response {
Some(response) => {
self.process_execute_response(response)?;
}
None => {
match self.transfer_mode {
TransferMode::Sync | TransferMode::Unspecified => {
debug!(
query_id = ?self.query_id,
"ExecuteQuery stream closed; SYNC mode complete",
);
self.state = ExecutorState::Finished;
}
TransferMode::Async | TransferMode::Adaptive => {
debug!(
query_id = ?self.query_id,
mode = ?self.transfer_mode,
next_chunk = self.next_server_chunk_id,
"ExecuteQuery stream closed; fetching remaining chunks",
);
self.state = ExecutorState::RequestStatus;
}
}
}
}
Ok(())
}
fn process_execute_response(&mut self, response: ExecuteQueryResponse) -> Result<()> {
use super::proto::hyper_service::execute_query_response::Result as ResponsePayload;
use super::proto::hyper_service::query_info::Content as QueryInfoContent;
use super::proto::hyper_service::query_result_header::Header;
match response.result {
Some(ResponsePayload::Header(header)) => match header.header {
Some(Header::Schema(schema)) => {
debug!(columns = schema.columns.len(), "Received schema");
self.result.schema = Some(schema);
}
Some(Header::Command(cmd)) => {
use super::proto::hyper_service::query_command_ok::CommandReturn;
let rows = match cmd.command_return {
Some(CommandReturn::AffectedRows(n)) => Some(n),
Some(CommandReturn::Empty(())) | None => None,
};
debug!(rows_affected = ?rows, "Command OK");
self.result.rows_affected = rows;
self.state = ExecutorState::Finished;
}
None => {
warn!("Received empty QueryResultHeader");
}
},
Some(ResponsePayload::BinaryPart(data)) => {
debug!(bytes = data.data.len(), "Received binary result part");
let chunk = GrpcResultChunk::new(self.next_local_chunk_id, data.data);
self.next_local_chunk_id += 1;
self.result.chunks.push_back(chunk);
}
Some(ResponsePayload::StringPart(data)) => {
debug!(len = data.data.len(), "Received string result part");
let chunk = GrpcResultChunk::new(
self.next_local_chunk_id,
Bytes::from(data.data.into_bytes()),
);
self.next_local_chunk_id += 1;
self.result.chunks.push_back(chunk);
}
Some(ResponsePayload::QueryInfo(info)) => {
match info.content {
Some(QueryInfoContent::QueryStatus(status)) => {
self.process_query_status(status);
}
Some(QueryInfoContent::BinarySchema(data)) => {
debug!(bytes = data.data.len(), "Received binary schema");
let chunk = GrpcResultChunk::new(self.next_local_chunk_id, data.data);
self.next_local_chunk_id += 1;
self.result.chunks.push_back(chunk);
}
Some(QueryInfoContent::StringSchema(data)) => {
debug!(len = data.data.len(), "Received string schema");
let chunk = GrpcResultChunk::new(
self.next_local_chunk_id,
Bytes::from(data.data.into_bytes()),
);
self.next_local_chunk_id += 1;
self.result.chunks.push_back(chunk);
}
None => {}
}
}
Some(ResponsePayload::QueryResult(query_result)) => {
self.process_query_result(query_result)?;
}
None => {
warn!("Received empty ExecuteQueryResponse");
}
}
Ok(())
}
#[expect(
clippy::unnecessary_wraps,
reason = "signature retained for API symmetry / future fallibility; returning Result/Option keeps callers from breaking when the function later grows failure cases"
)]
fn process_query_result(&mut self, result: QueryResult) -> Result<()> {
if let Some(payload) = result.result {
let chunk = match payload {
QueryResultPayload::BinaryPart(data) => {
debug!(bytes = data.data.len(), "Received binary result chunk");
GrpcResultChunk::new(self.next_local_chunk_id, data.data)
}
QueryResultPayload::StringPart(data) => {
debug!(len = data.data.len(), "Received string result chunk");
GrpcResultChunk::new(
self.next_local_chunk_id,
Bytes::from(data.data.into_bytes()),
)
}
};
self.next_local_chunk_id += 1;
self.result.chunks.push_back(chunk);
}
Ok(())
}
fn process_query_status(&mut self, status: QueryStatus) {
debug!(
query_id = %status.query_id,
completion_status = ?CompletionStatus::try_from(status.completion_status),
"Received query status"
);
self.query_id = Some(status.query_id.clone());
self.result.query_id = Some(status.query_id.clone());
self.query_status = Some(status);
}
async fn request_status(&mut self) -> Result<()> {
let query_id = self
.query_id
.clone()
.ok_or_else(|| Error::new(ErrorKind::Protocol, "No query ID for status request"))?;
debug!(query_id = %query_id, "Requesting query status");
let param = QueryInfoParam {
query_id: query_id.clone(),
streaming: true, schema_output_format: 0, };
let mut request = tonic::Request::new(param);
if let Ok(value) = query_id.parse() {
request.metadata_mut().insert("x-hyperdb-query-id", value);
}
for (key, value) in &self.headers {
if let (Ok(key), Ok(value)) = (
key.parse::<tonic::metadata::MetadataKey<_>>(),
value.parse(),
) {
request.metadata_mut().insert(key, value);
}
}
let response = self
.client
.get_query_info(request)
.await
.map_err(from_grpc_status)?;
self.query_info_stream = Some(response.into_inner());
self.state = ExecutorState::ReadStatus;
Ok(())
}
async fn read_status(&mut self) -> Result<()> {
use super::proto::hyper_service::query_info::Content as QueryInfoContent;
let stream = self
.query_info_stream
.as_mut()
.ok_or_else(|| Error::new(ErrorKind::Protocol, "QueryInfo stream not initialized"))?;
if let Some(info) = stream.message().await.map_err(from_grpc_status)? {
match info.content {
Some(QueryInfoContent::QueryStatus(status)) => {
self.process_query_status(status.clone());
match CompletionStatus::try_from(status.completion_status)
.unwrap_or(CompletionStatus::RunningOrUnspecified)
{
CompletionStatus::Finished | CompletionStatus::ResultsProduced => {
debug!("Query finished, requesting results");
self.state = ExecutorState::RequestResults;
}
CompletionStatus::RunningOrUnspecified => {
self.state = ExecutorState::RequestStatus;
}
}
}
Some(QueryInfoContent::BinarySchema(_) | QueryInfoContent::StringSchema(_)) => {
self.state = ExecutorState::RequestStatus;
}
None => {
self.state = ExecutorState::RequestStatus;
}
}
}
Ok(())
}
async fn request_results(&mut self) -> Result<()> {
use super::proto::hyper_service::query_result_param::RequestedData;
if let Some(ref status) = self.query_status {
if status.chunk_count > 0 && self.next_server_chunk_id >= status.chunk_count {
debug!(
total_chunks = status.chunk_count,
next_chunk = self.next_server_chunk_id,
"No more chunks to fetch",
);
self.state = ExecutorState::Finished;
return Ok(());
}
}
let query_id = self
.query_id
.clone()
.ok_or_else(|| Error::new(ErrorKind::Protocol, "No query ID for result request"))?;
debug!(
query_id = %query_id,
chunk_id = self.next_server_chunk_id,
"Requesting result chunks"
);
let param = QueryResultParam {
query_id: query_id.clone(),
output_format: super::proto::OutputFormat::ArrowIpc.into(),
requested_data: Some(RequestedData::ChunkId(self.next_server_chunk_id)),
omit_schema: true,
};
let mut request = tonic::Request::new(param);
if let Ok(value) = query_id.parse() {
request.metadata_mut().insert("x-hyperdb-query-id", value);
}
for (key, value) in &self.headers {
if let (Ok(key), Ok(value)) = (
key.parse::<tonic::metadata::MetadataKey<_>>(),
value.parse(),
) {
request.metadata_mut().insert(key, value);
}
}
let response = self
.client
.get_query_result(request)
.await
.map_err(from_grpc_status)?;
self.query_result_stream = Some(response.into_inner());
self.state = ExecutorState::ReadResults;
Ok(())
}
async fn read_results(&mut self) -> Result<()> {
loop {
let result = {
let stream = self.query_result_stream.as_mut().ok_or_else(|| {
Error::new(ErrorKind::Protocol, "QueryResult stream not initialized")
})?;
stream.message().await.map_err(from_grpc_status)?
};
match result {
Some(result) => {
self.process_query_result(result)?;
}
None => break,
}
}
self.next_server_chunk_id += 1;
if let Some(ref status) = self.query_status {
let total_chunks = status.chunk_count;
if self.next_server_chunk_id >= total_chunks {
debug!(total_chunks, "All chunks received");
self.state = ExecutorState::Finished;
} else {
debug!(
next_chunk = self.next_server_chunk_id,
total_chunks, "More chunks available"
);
self.state = ExecutorState::RequestResults;
}
} else {
self.state = ExecutorState::Finished;
}
Ok(())
}
}
pub struct GrpcChunkStream {
executor: GrpcQueryExecutor<tonic::transport::Channel>,
pending: std::collections::VecDeque<bytes::Bytes>,
schema: Option<super::proto::QueryResultSchema>,
query_id: Option<String>,
rows_affected: Option<u64>,
done: bool,
}
impl std::fmt::Debug for GrpcChunkStream {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("GrpcChunkStream")
.field("pending_chunks", &self.pending.len())
.field("query_id", &self.query_id)
.field("rows_affected", &self.rows_affected)
.field("done", &self.done)
.finish_non_exhaustive()
}
}
impl GrpcChunkStream {
pub(crate) fn new(executor: GrpcQueryExecutor<tonic::transport::Channel>) -> Self {
GrpcChunkStream {
executor,
pending: std::collections::VecDeque::new(),
schema: None,
query_id: None,
rows_affected: None,
done: false,
}
}
pub async fn next_chunk(&mut self) -> Result<Option<bytes::Bytes>> {
loop {
if let Some(b) = self.pending.pop_front() {
return Ok(Some(b));
}
if self.done {
return Ok(None);
}
match self.executor.next_result().await? {
Some(mut partial) => {
if self.schema.is_none() {
self.schema = partial.schema.take();
}
if self.query_id.is_none() {
self.query_id = partial.query_id.take();
}
if partial.rows_affected.is_some() {
self.rows_affected = partial.rows_affected;
}
while let Some(chunk) = partial.take_chunk() {
self.pending.push_back(chunk.data);
}
if partial.is_complete {
self.done = true;
}
}
None => {
self.done = true;
}
}
}
}
pub fn schema(&self) -> Option<&super::proto::QueryResultSchema> {
self.schema.as_ref()
}
pub fn query_id(&self) -> Option<&str> {
self.query_id.as_deref()
}
pub fn rows_affected(&self) -> Option<u64> {
self.rows_affected
}
}