hyperdb_api_core/client/grpc/
result.rs1use std::collections::VecDeque;
10
11use bytes::{Bytes, BytesMut};
12
13use crate::client::error::{Error, ErrorKind, Result};
14
15use super::proto::{QueryResultSchema, SqlType};
16
17#[derive(Debug)]
44pub struct GrpcQueryResult {
45 pub(crate) query_id: Option<String>,
47 pub(crate) schema: Option<QueryResultSchema>,
49 pub(crate) chunks: VecDeque<GrpcResultChunk>,
51 pub(crate) rows_affected: Option<u64>,
53 pub(crate) is_complete: bool,
55}
56
57impl GrpcQueryResult {
58 pub(crate) fn new() -> Self {
60 GrpcQueryResult {
61 query_id: None,
62 schema: None,
63 chunks: VecDeque::new(),
64 rows_affected: None,
65 is_complete: false,
66 }
67 }
68
69 #[must_use]
71 pub fn query_id(&self) -> Option<&str> {
72 self.query_id.as_deref()
73 }
74
75 #[must_use]
77 pub fn column_count(&self) -> usize {
78 self.schema.as_ref().map_or(0, |s| s.columns.len())
79 }
80
81 pub fn columns(&self) -> impl Iterator<Item = GrpcColumnInfo<'_>> + '_ {
83 self.schema
84 .as_ref()
85 .map(|s| s.columns.iter())
86 .into_iter()
87 .flatten()
88 .map(|col| GrpcColumnInfo {
89 name: &col.name,
90 sql_type: col.r#type,
91 })
92 }
93
94 #[must_use]
96 pub fn has_chunks(&self) -> bool {
97 !self.chunks.is_empty()
98 }
99
100 pub fn take_chunk(&mut self) -> Option<GrpcResultChunk> {
102 self.chunks.pop_front()
103 }
104
105 pub fn chunks(&self) -> impl Iterator<Item = &GrpcResultChunk> {
107 self.chunks.iter()
108 }
109
110 #[must_use]
120 pub fn arrow_data(&self) -> Bytes {
121 match self.chunks.len() {
122 0 => Bytes::new(),
123 1 => self.chunks[0].data.clone(),
124 _ => {
125 let total_len: usize = self.chunks.iter().map(|c| c.data.len()).sum();
126 let mut buf = BytesMut::with_capacity(total_len);
127 for chunk in &self.chunks {
128 buf.extend_from_slice(&chunk.data);
129 }
130 buf.freeze()
131 }
132 }
133 }
134
135 #[must_use]
140 pub fn into_arrow_data(mut self) -> Bytes {
141 match self.chunks.len() {
142 0 => Bytes::new(),
143 1 => self.chunks.pop_front().map(|c| c.data).unwrap_or_default(),
144 _ => {
145 let total_len: usize = self.chunks.iter().map(|c| c.data.len()).sum();
146 let mut buf = BytesMut::with_capacity(total_len);
147 for chunk in self.chunks {
148 buf.extend_from_slice(&chunk.data);
149 }
150 buf.freeze()
151 }
152 }
153 }
154
155 pub fn chunk_bytes(&self) -> impl Iterator<Item = Bytes> + '_ {
161 self.chunks.iter().map(|c| c.data.clone())
162 }
163
164 #[must_use]
169 pub fn rows_affected(&self) -> Option<u64> {
170 self.rows_affected
171 }
172
173 #[must_use]
175 pub fn is_complete(&self) -> bool {
176 self.is_complete
177 }
178}
179
180impl Default for GrpcQueryResult {
181 fn default() -> Self {
182 Self::new()
183 }
184}
185
186#[derive(Debug)]
193pub struct GrpcResultChunk {
194 pub(crate) chunk_id: u64,
196 pub(crate) data: Bytes,
198 pub(crate) row_count: Option<usize>,
200}
201
202impl GrpcResultChunk {
203 pub(crate) fn new(chunk_id: u64, data: Bytes) -> Self {
205 GrpcResultChunk {
206 chunk_id,
207 data,
208 row_count: None,
209 }
210 }
211
212 pub fn chunk_id(&self) -> u64 {
214 self.chunk_id
215 }
216
217 pub fn arrow_data(&self) -> &[u8] {
219 &self.data
220 }
221
222 pub fn arrow_bytes(&self) -> Bytes {
227 self.data.clone()
228 }
229
230 pub fn into_arrow_data(self) -> Bytes {
232 self.data
233 }
234
235 pub fn row_count(&self) -> Option<usize> {
237 self.row_count
238 }
239
240 pub fn is_empty(&self) -> bool {
242 self.data.is_empty()
243 }
244}
245
246#[derive(Debug)]
248pub struct GrpcColumnInfo<'a> {
249 pub name: &'a str,
251 pub sql_type: Option<SqlType>,
253}
254
255impl GrpcColumnInfo<'_> {
256 #[must_use]
258 pub fn name(&self) -> &str {
259 self.name
260 }
261
262 #[must_use]
264 pub fn type_name(&self) -> Option<&'static str> {
265 use crate::client::grpc::proto::hyper_service::sql_type::TypeTag;
266
267 self.sql_type
268 .as_ref()
269 .and_then(|t| match TypeTag::try_from(t.tag).ok()? {
270 TypeTag::HyperUnspecified => None,
271 TypeTag::HyperBool => Some("BOOLEAN"),
272 TypeTag::HyperBigInt => Some("BIGINT"),
273 TypeTag::HyperSmallInt => Some("SMALLINT"),
274 TypeTag::HyperInt => Some("INTEGER"),
275 TypeTag::HyperNumeric => Some("NUMERIC"),
276 TypeTag::HyperDouble => Some("DOUBLE PRECISION"),
277 TypeTag::HyperFloat => Some("REAL"),
278 TypeTag::HyperOid => Some("OID"),
279 TypeTag::HyperByteA => Some("BYTEA"),
280 TypeTag::HyperText => Some("TEXT"),
281 TypeTag::HyperVarchar => Some("VARCHAR"),
282 TypeTag::HyperChar => Some("CHAR"),
283 TypeTag::HyperJson => Some("JSON"),
284 TypeTag::HyperDate => Some("DATE"),
285 TypeTag::HyperInterval => Some("INTERVAL"),
286 TypeTag::HyperTime => Some("TIME"),
287 TypeTag::HyperTimestamp => Some("TIMESTAMP"),
288 TypeTag::HyperTimestampTz => Some("TIMESTAMPTZ"),
289 TypeTag::HyperGeography => Some("GEOGRAPHY"),
290 TypeTag::HyperArrayOfFloat => Some("FLOAT[]"),
291 })
292 }
293}
294
295#[allow(
300 dead_code,
301 reason = "helper retained for callers that want to convert gRPC schema to hyper-types"
302)]
303pub(super) fn sql_type_to_hyper_type(sql_type: &SqlType) -> Result<crate::types::SqlType> {
304 use super::proto::hyper_service::sql_type::{Modifier, TypeTag};
305 use crate::types::SqlType as HyperSqlType;
306
307 let tag = TypeTag::try_from(sql_type.tag).map_err(|_| {
308 Error::new(
309 ErrorKind::Conversion,
310 format!("Unknown SQL type tag: {}", sql_type.tag),
311 )
312 })?;
313
314 let modifier = &sql_type.modifier;
316
317 match tag {
318 TypeTag::HyperUnspecified => Err(Error::new(ErrorKind::Conversion, "Unspecified SQL type")),
319 TypeTag::HyperBool => Ok(HyperSqlType::Bool),
320 TypeTag::HyperSmallInt => Ok(HyperSqlType::SmallInt),
321 TypeTag::HyperInt => Ok(HyperSqlType::Int),
322 TypeTag::HyperBigInt => Ok(HyperSqlType::BigInt),
323 TypeTag::HyperFloat => Ok(HyperSqlType::Float),
324 TypeTag::HyperDouble => Ok(HyperSqlType::Double),
325 TypeTag::HyperNumeric => {
326 let (precision, scale) = match modifier {
328 Some(Modifier::NumericModifier(m)) => (m.precision, m.scale),
329 _ => (38, 0), };
331 Ok(HyperSqlType::Numeric { precision, scale })
332 }
333 TypeTag::HyperText => Ok(HyperSqlType::Text),
334 TypeTag::HyperVarchar => {
335 let max_length = match modifier {
336 Some(Modifier::MaxLength(len)) => Some(*len),
337 _ => None,
338 };
339 Ok(HyperSqlType::Varchar { max_length })
340 }
341 TypeTag::HyperChar => {
342 let length = match modifier {
343 Some(Modifier::MaxLength(len)) => *len,
344 _ => 1,
345 };
346 Ok(HyperSqlType::Char { length })
347 }
348 TypeTag::HyperByteA => Ok(HyperSqlType::ByteA),
349 TypeTag::HyperOid => Ok(HyperSqlType::Oid),
350 TypeTag::HyperJson => Ok(HyperSqlType::Json),
351 TypeTag::HyperDate => Ok(HyperSqlType::Date),
352 TypeTag::HyperTime => Ok(HyperSqlType::Time),
353 TypeTag::HyperTimestamp => Ok(HyperSqlType::Timestamp),
354 TypeTag::HyperTimestampTz => Ok(HyperSqlType::TimestampTz),
355 TypeTag::HyperInterval => Ok(HyperSqlType::Interval),
356 TypeTag::HyperGeography => Ok(HyperSqlType::Geography),
357 TypeTag::HyperArrayOfFloat => {
358 Err(Error::new(
360 ErrorKind::Conversion,
361 "Array types not yet supported",
362 ))
363 }
364 }
365}
366
367#[cfg(test)]
368mod tests {
369 use super::*;
370
371 #[test]
372 fn test_grpc_result_empty() {
373 let result = GrpcQueryResult::new();
374 assert!(!result.has_chunks());
375 assert!(result.arrow_data().is_empty());
376 assert_eq!(result.column_count(), 0);
377 }
378
379 #[test]
380 fn test_grpc_result_with_chunks() {
381 let mut result = GrpcQueryResult::new();
382 result
383 .chunks
384 .push_back(GrpcResultChunk::new(0, Bytes::from_static(&[1, 2, 3])));
385 result
386 .chunks
387 .push_back(GrpcResultChunk::new(1, Bytes::from_static(&[4, 5, 6])));
388
389 assert!(result.has_chunks());
390 assert_eq!(result.arrow_data().as_ref(), &[1, 2, 3, 4, 5, 6]);
391
392 let chunk = result.take_chunk().unwrap();
393 assert_eq!(chunk.chunk_id(), 0);
394 assert_eq!(chunk.arrow_data(), &[1, 2, 3]);
395 }
396
397 #[test]
398 fn test_sql_type_mapping() {
399 use crate::client::grpc::proto::hyper_service::sql_type::TypeTag;
400
401 let sql_type = SqlType {
402 tag: TypeTag::HyperInt.into(),
403 modifier: None,
404 };
405
406 let hyper_type = sql_type_to_hyper_type(&sql_type).unwrap();
407 assert_eq!(hyper_type, crate::types::SqlType::Int);
408 }
409}