Skip to main content

hyperdb_api_core/client/grpc/
result.rs

1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! gRPC query result types.
5//!
6//! This module provides types for handling gRPC query results, which are
7//! returned in Apache Arrow IPC format.
8
9use std::collections::VecDeque;
10
11use bytes::{Bytes, BytesMut};
12
13use crate::client::error::{Error, ErrorKind, Result};
14
15use super::proto::{QueryResultSchema, SqlType};
16
17/// Result of a gRPC query execution.
18///
19/// Unlike TCP-based queries that return row-at-a-time results, gRPC queries
20/// return results in Arrow IPC format, which can contain multiple record batches.
21///
22/// # Example
23///
24/// ```no_run
25/// use hyperdb_api_core::client::grpc::{GrpcClient, GrpcConfig};
26///
27/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
28/// # let config = GrpcConfig::new("http://localhost:7484");
29/// # let mut client = GrpcClient::connect(config).await?;
30/// let result = client.execute_query("SELECT * FROM users").await?;
31///
32/// // Get raw Arrow IPC bytes for all chunks
33/// let all_arrow_data = result.arrow_data();
34///
35/// // Or process chunk by chunk
36/// for chunk in result.chunks() {
37///     let arrow_bytes = chunk.arrow_data();
38///     // Process with arrow crate...
39/// }
40/// # Ok(())
41/// # }
42/// ```
43#[derive(Debug)]
44pub struct GrpcQueryResult {
45    /// The query ID assigned by the server
46    pub(crate) query_id: Option<String>,
47    /// Schema information from the server
48    pub(crate) schema: Option<QueryResultSchema>,
49    /// Result chunks (Arrow IPC data)
50    pub(crate) chunks: VecDeque<GrpcResultChunk>,
51    /// Number of rows affected (for DML queries)
52    pub(crate) rows_affected: Option<u64>,
53    /// Whether the query is complete
54    pub(crate) is_complete: bool,
55}
56
57impl GrpcQueryResult {
58    /// Creates a new empty result.
59    pub(crate) fn new() -> Self {
60        GrpcQueryResult {
61            query_id: None,
62            schema: None,
63            chunks: VecDeque::new(),
64            rows_affected: None,
65            is_complete: false,
66        }
67    }
68
69    /// Returns the query ID assigned by the server, if available.
70    #[must_use]
71    pub fn query_id(&self) -> Option<&str> {
72        self.query_id.as_deref()
73    }
74
75    /// Returns the number of columns in the result.
76    #[must_use]
77    pub fn column_count(&self) -> usize {
78        self.schema.as_ref().map_or(0, |s| s.columns.len())
79    }
80
81    /// Returns the column descriptions from the server.
82    pub fn columns(&self) -> impl Iterator<Item = GrpcColumnInfo<'_>> + '_ {
83        self.schema
84            .as_ref()
85            .map(|s| s.columns.iter())
86            .into_iter()
87            .flatten()
88            .map(|col| GrpcColumnInfo {
89                name: &col.name,
90                sql_type: col.r#type,
91            })
92    }
93
94    /// Returns whether there are more result chunks available.
95    #[must_use]
96    pub fn has_chunks(&self) -> bool {
97        !self.chunks.is_empty()
98    }
99
100    /// Takes the next result chunk, if available.
101    pub fn take_chunk(&mut self) -> Option<GrpcResultChunk> {
102        self.chunks.pop_front()
103    }
104
105    /// Returns an iterator over the result chunks.
106    pub fn chunks(&self) -> impl Iterator<Item = &GrpcResultChunk> {
107        self.chunks.iter()
108    }
109
110    /// Returns all Arrow IPC data concatenated.
111    ///
112    /// For queries with multiple chunks, this concatenates all Arrow record
113    /// batches. Note that each chunk may have its own schema message, so the
114    /// result may contain multiple schema messages.
115    ///
116    /// Single-chunk results are returned without any copy (refcount bump on
117    /// the shared `Bytes`). Multi-chunk results are concatenated into a new
118    /// `Bytes`. Prefer `chunk_bytes()` if you can process chunks incrementally.
119    #[must_use]
120    pub fn arrow_data(&self) -> Bytes {
121        match self.chunks.len() {
122            0 => Bytes::new(),
123            1 => self.chunks[0].data.clone(),
124            _ => {
125                let total_len: usize = self.chunks.iter().map(|c| c.data.len()).sum();
126                let mut buf = BytesMut::with_capacity(total_len);
127                for chunk in &self.chunks {
128                    buf.extend_from_slice(&chunk.data);
129                }
130                buf.freeze()
131            }
132        }
133    }
134
135    /// Consumes the result and returns all Arrow IPC data.
136    ///
137    /// Single-chunk results are returned without any copy. Multi-chunk
138    /// results are concatenated into a new `Bytes`.
139    #[must_use]
140    pub fn into_arrow_data(mut self) -> Bytes {
141        match self.chunks.len() {
142            0 => Bytes::new(),
143            1 => self.chunks.pop_front().map(|c| c.data).unwrap_or_default(),
144            _ => {
145                let total_len: usize = self.chunks.iter().map(|c| c.data.len()).sum();
146                let mut buf = BytesMut::with_capacity(total_len);
147                for chunk in self.chunks {
148                    buf.extend_from_slice(&chunk.data);
149                }
150                buf.freeze()
151            }
152        }
153    }
154
155    /// Returns an iterator over the raw Arrow IPC byte chunks.
156    ///
157    /// Each chunk is a refcount-bumped `Bytes` sharing the original gRPC frame
158    /// allocation — no copies are performed. This is the preferred way to feed
159    /// results into an incremental Arrow IPC decoder.
160    pub fn chunk_bytes(&self) -> impl Iterator<Item = Bytes> + '_ {
161        self.chunks.iter().map(|c| c.data.clone())
162    }
163
164    /// Returns the number of rows affected by a DML query.
165    ///
166    /// For SELECT queries, this is `None`. For INSERT/UPDATE/DELETE queries,
167    /// this returns the number of rows affected.
168    #[must_use]
169    pub fn rows_affected(&self) -> Option<u64> {
170        self.rows_affected
171    }
172
173    /// Returns whether the query execution is complete.
174    #[must_use]
175    pub fn is_complete(&self) -> bool {
176        self.is_complete
177    }
178}
179
180impl Default for GrpcQueryResult {
181    fn default() -> Self {
182        Self::new()
183    }
184}
185
186/// A single chunk of gRPC query results.
187///
188/// Each chunk contains Arrow IPC data (schema + record batch) and metadata
189/// about the chunk. The data is held as `Bytes`, so it shares the underlying
190/// allocation with the gRPC frame it was decoded from — cloning the chunk or
191/// extracting the bytes is a refcount bump, not a copy.
192#[derive(Debug)]
193pub struct GrpcResultChunk {
194    /// The chunk ID (for async/adaptive transfer modes)
195    pub(crate) chunk_id: u64,
196    /// Arrow IPC data (may include schema + record batch)
197    pub(crate) data: Bytes,
198    /// Number of rows in this chunk
199    pub(crate) row_count: Option<usize>,
200}
201
202impl GrpcResultChunk {
203    /// Creates a new result chunk.
204    pub(crate) fn new(chunk_id: u64, data: Bytes) -> Self {
205        GrpcResultChunk {
206            chunk_id,
207            data,
208            row_count: None,
209        }
210    }
211
212    /// Returns the chunk ID.
213    pub fn chunk_id(&self) -> u64 {
214        self.chunk_id
215    }
216
217    /// Returns the Arrow IPC data for this chunk.
218    pub fn arrow_data(&self) -> &[u8] {
219        &self.data
220    }
221
222    /// Returns the Arrow IPC data as a shared `Bytes` handle.
223    ///
224    /// This is a refcount bump, not a copy — the returned `Bytes` shares the
225    /// same allocation as the chunk.
226    pub fn arrow_bytes(&self) -> Bytes {
227        self.data.clone()
228    }
229
230    /// Consumes the chunk and returns the Arrow IPC data.
231    pub fn into_arrow_data(self) -> Bytes {
232        self.data
233    }
234
235    /// Returns the number of rows in this chunk, if known.
236    pub fn row_count(&self) -> Option<usize> {
237        self.row_count
238    }
239
240    /// Returns whether this chunk is empty (no data).
241    pub fn is_empty(&self) -> bool {
242        self.data.is_empty()
243    }
244}
245
246/// Column information from a gRPC query result.
247#[derive(Debug)]
248pub struct GrpcColumnInfo<'a> {
249    /// Column name
250    pub name: &'a str,
251    /// SQL type information
252    pub sql_type: Option<SqlType>,
253}
254
255impl GrpcColumnInfo<'_> {
256    /// Returns the column name.
257    #[must_use]
258    pub fn name(&self) -> &str {
259        self.name
260    }
261
262    /// Returns the SQL type tag (e.g., "INTEGER", "TEXT").
263    #[must_use]
264    pub fn type_name(&self) -> Option<&'static str> {
265        use crate::client::grpc::proto::hyper_service::sql_type::TypeTag;
266
267        self.sql_type
268            .as_ref()
269            .and_then(|t| match TypeTag::try_from(t.tag).ok()? {
270                TypeTag::HyperUnspecified => None,
271                TypeTag::HyperBool => Some("BOOLEAN"),
272                TypeTag::HyperBigInt => Some("BIGINT"),
273                TypeTag::HyperSmallInt => Some("SMALLINT"),
274                TypeTag::HyperInt => Some("INTEGER"),
275                TypeTag::HyperNumeric => Some("NUMERIC"),
276                TypeTag::HyperDouble => Some("DOUBLE PRECISION"),
277                TypeTag::HyperFloat => Some("REAL"),
278                TypeTag::HyperOid => Some("OID"),
279                TypeTag::HyperByteA => Some("BYTEA"),
280                TypeTag::HyperText => Some("TEXT"),
281                TypeTag::HyperVarchar => Some("VARCHAR"),
282                TypeTag::HyperChar => Some("CHAR"),
283                TypeTag::HyperJson => Some("JSON"),
284                TypeTag::HyperDate => Some("DATE"),
285                TypeTag::HyperInterval => Some("INTERVAL"),
286                TypeTag::HyperTime => Some("TIME"),
287                TypeTag::HyperTimestamp => Some("TIMESTAMP"),
288                TypeTag::HyperTimestampTz => Some("TIMESTAMPTZ"),
289                TypeTag::HyperGeography => Some("GEOGRAPHY"),
290                TypeTag::HyperArrayOfFloat => Some("FLOAT[]"),
291            })
292    }
293}
294
295/// Converts Hyper SQL types from the gRPC schema to hyper-types.
296///
297/// This is useful for applications that need to work with Hyper's type system
298/// rather than Arrow's type system.
299#[allow(
300    dead_code,
301    reason = "helper retained for callers that want to convert gRPC schema to hyper-types"
302)]
303pub(super) fn sql_type_to_hyper_type(sql_type: &SqlType) -> Result<crate::types::SqlType> {
304    use super::proto::hyper_service::sql_type::{Modifier, TypeTag};
305    use crate::types::SqlType as HyperSqlType;
306
307    let tag = TypeTag::try_from(sql_type.tag).map_err(|_| {
308        Error::new(
309            ErrorKind::Conversion,
310            format!("Unknown SQL type tag: {}", sql_type.tag),
311        )
312    })?;
313
314    // Extract modifier for types that need it
315    let modifier = &sql_type.modifier;
316
317    match tag {
318        TypeTag::HyperUnspecified => Err(Error::new(ErrorKind::Conversion, "Unspecified SQL type")),
319        TypeTag::HyperBool => Ok(HyperSqlType::Bool),
320        TypeTag::HyperSmallInt => Ok(HyperSqlType::SmallInt),
321        TypeTag::HyperInt => Ok(HyperSqlType::Int),
322        TypeTag::HyperBigInt => Ok(HyperSqlType::BigInt),
323        TypeTag::HyperFloat => Ok(HyperSqlType::Float),
324        TypeTag::HyperDouble => Ok(HyperSqlType::Double),
325        TypeTag::HyperNumeric => {
326            // Extract precision/scale from modifier
327            let (precision, scale) = match modifier {
328                Some(Modifier::NumericModifier(m)) => (m.precision, m.scale),
329                _ => (38, 0), // Default precision/scale
330            };
331            Ok(HyperSqlType::Numeric { precision, scale })
332        }
333        TypeTag::HyperText => Ok(HyperSqlType::Text),
334        TypeTag::HyperVarchar => {
335            let max_length = match modifier {
336                Some(Modifier::MaxLength(len)) => Some(*len),
337                _ => None,
338            };
339            Ok(HyperSqlType::Varchar { max_length })
340        }
341        TypeTag::HyperChar => {
342            let length = match modifier {
343                Some(Modifier::MaxLength(len)) => *len,
344                _ => 1,
345            };
346            Ok(HyperSqlType::Char { length })
347        }
348        TypeTag::HyperByteA => Ok(HyperSqlType::ByteA),
349        TypeTag::HyperOid => Ok(HyperSqlType::Oid),
350        TypeTag::HyperJson => Ok(HyperSqlType::Json),
351        TypeTag::HyperDate => Ok(HyperSqlType::Date),
352        TypeTag::HyperTime => Ok(HyperSqlType::Time),
353        TypeTag::HyperTimestamp => Ok(HyperSqlType::Timestamp),
354        TypeTag::HyperTimestampTz => Ok(HyperSqlType::TimestampTz),
355        TypeTag::HyperInterval => Ok(HyperSqlType::Interval),
356        TypeTag::HyperGeography => Ok(HyperSqlType::Geography),
357        TypeTag::HyperArrayOfFloat => {
358            // Array types are not directly supported in crate::types::SqlType
359            Err(Error::new(
360                ErrorKind::Conversion,
361                "Array types not yet supported",
362            ))
363        }
364    }
365}
366
367#[cfg(test)]
368mod tests {
369    use super::*;
370
371    #[test]
372    fn test_grpc_result_empty() {
373        let result = GrpcQueryResult::new();
374        assert!(!result.has_chunks());
375        assert!(result.arrow_data().is_empty());
376        assert_eq!(result.column_count(), 0);
377    }
378
379    #[test]
380    fn test_grpc_result_with_chunks() {
381        let mut result = GrpcQueryResult::new();
382        result
383            .chunks
384            .push_back(GrpcResultChunk::new(0, Bytes::from_static(&[1, 2, 3])));
385        result
386            .chunks
387            .push_back(GrpcResultChunk::new(1, Bytes::from_static(&[4, 5, 6])));
388
389        assert!(result.has_chunks());
390        assert_eq!(result.arrow_data().as_ref(), &[1, 2, 3, 4, 5, 6]);
391
392        let chunk = result.take_chunk().unwrap();
393        assert_eq!(chunk.chunk_id(), 0);
394        assert_eq!(chunk.arrow_data(), &[1, 2, 3]);
395    }
396
397    #[test]
398    fn test_sql_type_mapping() {
399        use crate::client::grpc::proto::hyper_service::sql_type::TypeTag;
400
401        let sql_type = SqlType {
402            tag: TypeTag::HyperInt.into(),
403            modifier: None,
404        };
405
406        let hyper_type = sql_type_to_hyper_type(&sql_type).unwrap();
407        assert_eq!(hyper_type, crate::types::SqlType::Int);
408    }
409}