use std::collections::VecDeque;
use bytes::{Bytes, BytesMut};
use crate::client::error::{Error, ErrorKind, Result};
use super::proto::{QueryResultSchema, SqlType};
#[derive(Debug)]
pub struct GrpcQueryResult {
pub(crate) query_id: Option<String>,
pub(crate) schema: Option<QueryResultSchema>,
pub(crate) chunks: VecDeque<GrpcResultChunk>,
pub(crate) rows_affected: Option<u64>,
pub(crate) is_complete: bool,
}
impl GrpcQueryResult {
pub(crate) fn new() -> Self {
GrpcQueryResult {
query_id: None,
schema: None,
chunks: VecDeque::new(),
rows_affected: None,
is_complete: false,
}
}
#[must_use]
pub fn query_id(&self) -> Option<&str> {
self.query_id.as_deref()
}
#[must_use]
pub fn column_count(&self) -> usize {
self.schema.as_ref().map_or(0, |s| s.columns.len())
}
pub fn columns(&self) -> impl Iterator<Item = GrpcColumnInfo<'_>> + '_ {
self.schema
.as_ref()
.map(|s| s.columns.iter())
.into_iter()
.flatten()
.map(|col| GrpcColumnInfo {
name: &col.name,
sql_type: col.r#type,
})
}
#[must_use]
pub fn has_chunks(&self) -> bool {
!self.chunks.is_empty()
}
pub fn take_chunk(&mut self) -> Option<GrpcResultChunk> {
self.chunks.pop_front()
}
pub fn chunks(&self) -> impl Iterator<Item = &GrpcResultChunk> {
self.chunks.iter()
}
#[must_use]
pub fn arrow_data(&self) -> Bytes {
match self.chunks.len() {
0 => Bytes::new(),
1 => self.chunks[0].data.clone(),
_ => {
let total_len: usize = self.chunks.iter().map(|c| c.data.len()).sum();
let mut buf = BytesMut::with_capacity(total_len);
for chunk in &self.chunks {
buf.extend_from_slice(&chunk.data);
}
buf.freeze()
}
}
}
#[must_use]
pub fn into_arrow_data(mut self) -> Bytes {
match self.chunks.len() {
0 => Bytes::new(),
1 => self.chunks.pop_front().map(|c| c.data).unwrap_or_default(),
_ => {
let total_len: usize = self.chunks.iter().map(|c| c.data.len()).sum();
let mut buf = BytesMut::with_capacity(total_len);
for chunk in self.chunks {
buf.extend_from_slice(&chunk.data);
}
buf.freeze()
}
}
}
pub fn chunk_bytes(&self) -> impl Iterator<Item = Bytes> + '_ {
self.chunks.iter().map(|c| c.data.clone())
}
#[must_use]
pub fn rows_affected(&self) -> Option<u64> {
self.rows_affected
}
#[must_use]
pub fn is_complete(&self) -> bool {
self.is_complete
}
}
impl Default for GrpcQueryResult {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub struct GrpcResultChunk {
pub(crate) chunk_id: u64,
pub(crate) data: Bytes,
pub(crate) row_count: Option<usize>,
}
impl GrpcResultChunk {
pub(crate) fn new(chunk_id: u64, data: Bytes) -> Self {
GrpcResultChunk {
chunk_id,
data,
row_count: None,
}
}
pub fn chunk_id(&self) -> u64 {
self.chunk_id
}
pub fn arrow_data(&self) -> &[u8] {
&self.data
}
pub fn arrow_bytes(&self) -> Bytes {
self.data.clone()
}
pub fn into_arrow_data(self) -> Bytes {
self.data
}
pub fn row_count(&self) -> Option<usize> {
self.row_count
}
pub fn is_empty(&self) -> bool {
self.data.is_empty()
}
}
#[derive(Debug)]
pub struct GrpcColumnInfo<'a> {
pub name: &'a str,
pub sql_type: Option<SqlType>,
}
impl GrpcColumnInfo<'_> {
#[must_use]
pub fn name(&self) -> &str {
self.name
}
#[must_use]
pub fn type_name(&self) -> Option<&'static str> {
use crate::client::grpc::proto::hyper_service::sql_type::TypeTag;
self.sql_type
.as_ref()
.and_then(|t| match TypeTag::try_from(t.tag).ok()? {
TypeTag::HyperUnspecified => None,
TypeTag::HyperBool => Some("BOOLEAN"),
TypeTag::HyperBigInt => Some("BIGINT"),
TypeTag::HyperSmallInt => Some("SMALLINT"),
TypeTag::HyperInt => Some("INTEGER"),
TypeTag::HyperNumeric => Some("NUMERIC"),
TypeTag::HyperDouble => Some("DOUBLE PRECISION"),
TypeTag::HyperFloat => Some("REAL"),
TypeTag::HyperOid => Some("OID"),
TypeTag::HyperByteA => Some("BYTEA"),
TypeTag::HyperText => Some("TEXT"),
TypeTag::HyperVarchar => Some("VARCHAR"),
TypeTag::HyperChar => Some("CHAR"),
TypeTag::HyperJson => Some("JSON"),
TypeTag::HyperDate => Some("DATE"),
TypeTag::HyperInterval => Some("INTERVAL"),
TypeTag::HyperTime => Some("TIME"),
TypeTag::HyperTimestamp => Some("TIMESTAMP"),
TypeTag::HyperTimestampTz => Some("TIMESTAMPTZ"),
TypeTag::HyperGeography => Some("GEOGRAPHY"),
TypeTag::HyperArrayOfFloat => Some("FLOAT[]"),
})
}
}
#[allow(
dead_code,
reason = "helper retained for callers that want to convert gRPC schema to hyper-types"
)]
pub(super) fn sql_type_to_hyper_type(sql_type: &SqlType) -> Result<crate::types::SqlType> {
use super::proto::hyper_service::sql_type::{Modifier, TypeTag};
use crate::types::SqlType as HyperSqlType;
let tag = TypeTag::try_from(sql_type.tag).map_err(|_| {
Error::new(
ErrorKind::Conversion,
format!("Unknown SQL type tag: {}", sql_type.tag),
)
})?;
let modifier = &sql_type.modifier;
match tag {
TypeTag::HyperUnspecified => Err(Error::new(ErrorKind::Conversion, "Unspecified SQL type")),
TypeTag::HyperBool => Ok(HyperSqlType::Bool),
TypeTag::HyperSmallInt => Ok(HyperSqlType::SmallInt),
TypeTag::HyperInt => Ok(HyperSqlType::Int),
TypeTag::HyperBigInt => Ok(HyperSqlType::BigInt),
TypeTag::HyperFloat => Ok(HyperSqlType::Float),
TypeTag::HyperDouble => Ok(HyperSqlType::Double),
TypeTag::HyperNumeric => {
let (precision, scale) = match modifier {
Some(Modifier::NumericModifier(m)) => (m.precision, m.scale),
_ => (38, 0), };
Ok(HyperSqlType::Numeric { precision, scale })
}
TypeTag::HyperText => Ok(HyperSqlType::Text),
TypeTag::HyperVarchar => {
let max_length = match modifier {
Some(Modifier::MaxLength(len)) => Some(*len),
_ => None,
};
Ok(HyperSqlType::Varchar { max_length })
}
TypeTag::HyperChar => {
let length = match modifier {
Some(Modifier::MaxLength(len)) => *len,
_ => 1,
};
Ok(HyperSqlType::Char { length })
}
TypeTag::HyperByteA => Ok(HyperSqlType::ByteA),
TypeTag::HyperOid => Ok(HyperSqlType::Oid),
TypeTag::HyperJson => Ok(HyperSqlType::Json),
TypeTag::HyperDate => Ok(HyperSqlType::Date),
TypeTag::HyperTime => Ok(HyperSqlType::Time),
TypeTag::HyperTimestamp => Ok(HyperSqlType::Timestamp),
TypeTag::HyperTimestampTz => Ok(HyperSqlType::TimestampTz),
TypeTag::HyperInterval => Ok(HyperSqlType::Interval),
TypeTag::HyperGeography => Ok(HyperSqlType::Geography),
TypeTag::HyperArrayOfFloat => {
Err(Error::new(
ErrorKind::Conversion,
"Array types not yet supported",
))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_grpc_result_empty() {
let result = GrpcQueryResult::new();
assert!(!result.has_chunks());
assert!(result.arrow_data().is_empty());
assert_eq!(result.column_count(), 0);
}
#[test]
fn test_grpc_result_with_chunks() {
let mut result = GrpcQueryResult::new();
result
.chunks
.push_back(GrpcResultChunk::new(0, Bytes::from_static(&[1, 2, 3])));
result
.chunks
.push_back(GrpcResultChunk::new(1, Bytes::from_static(&[4, 5, 6])));
assert!(result.has_chunks());
assert_eq!(result.arrow_data().as_ref(), &[1, 2, 3, 4, 5, 6]);
let chunk = result.take_chunk().unwrap();
assert_eq!(chunk.chunk_id(), 0);
assert_eq!(chunk.arrow_data(), &[1, 2, 3]);
}
#[test]
fn test_sql_type_mapping() {
use crate::client::grpc::proto::hyper_service::sql_type::TypeTag;
let sql_type = SqlType {
tag: TypeTag::HyperInt.into(),
modifier: None,
};
let hyper_type = sql_type_to_hyper_type(&sql_type).unwrap();
assert_eq!(hyper_type, crate::types::SqlType::Int);
}
}