Skip to main content

hyperdb_api/
arrow_result.rs

1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Arrow IPC result parsing for unified query results.
5//!
6//! This module provides utilities for parsing Arrow IPC data into
7//! row-based iteration, enabling a consistent API across TCP and gRPC transports.
8//!
9//! # Zero-copy decoding
10//!
11//! The parsing entry points (`ArrowRowset::from_bytes`, `from_buffer`,
12//! `from_chunks`, and `parse_arrow_ipc`) feed Arrow's `StreamDecoder` directly
13//! from a shared buffer. Record-batch columnar buffers share the input
14//! allocation, so fixed-width primitive columns are genuinely zero-copy from
15//! the HTTP/2 frame (for gRPC) or from the COPY response buffer (for TCP)
16//! all the way to the `RecordBatch`.
17
18#![expect(
19    dead_code,
20    reason = "experimental zero-copy Arrow path; helpers retained for upcoming gRPC/TCP wiring"
21)]
22
23use std::collections::VecDeque;
24use std::sync::Arc;
25
26use arrow::array::{
27    Array, BinaryArray, BooleanArray, Date32Array, Float32Array, Float64Array, Int16Array,
28    Int32Array, Int64Array, LargeBinaryArray, LargeStringArray, StringArray,
29    TimestampMicrosecondArray,
30};
31use arrow::buffer::Buffer;
32use arrow::datatypes::{DataType, Schema, TimeUnit};
33use arrow::ipc::reader::StreamDecoder;
34use arrow::record_batch::RecordBatch;
35use bytes::Bytes;
36
37use crate::error::{Error, Result};
38use hyperdb_api_core::types::SqlType;
39
40/// A row from an Arrow record batch, providing typed value access.
41#[derive(Debug)]
42pub struct ArrowRow<'a> {
43    batch: &'a RecordBatch,
44    row_index: usize,
45}
46
47impl<'a> ArrowRow<'a> {
48    /// Creates a new `ArrowRow` referencing a specific row in a batch.
49    pub(crate) fn new(batch: &'a RecordBatch, row_index: usize) -> Self {
50        ArrowRow { batch, row_index }
51    }
52
53    /// Returns the number of columns in this row.
54    #[must_use]
55    pub fn column_count(&self) -> usize {
56        self.batch.num_columns()
57    }
58
59    /// Gets a value at the given column index, with type conversion.
60    #[must_use]
61    pub fn get<T: FromArrowValue>(&self, col: usize) -> Option<T> {
62        if col >= self.batch.num_columns() {
63            return None;
64        }
65        T::from_arrow_column(self.batch.column(col), self.row_index)
66    }
67
68    /// Gets an i16 value at the given column index.
69    #[must_use]
70    pub fn get_i16(&self, col: usize) -> Option<i16> {
71        self.get::<i16>(col)
72    }
73
74    /// Gets an i32 value at the given column index.
75    #[must_use]
76    pub fn get_i32(&self, col: usize) -> Option<i32> {
77        self.get::<i32>(col)
78    }
79
80    /// Gets an i64 value at the given column index.
81    #[must_use]
82    pub fn get_i64(&self, col: usize) -> Option<i64> {
83        self.get::<i64>(col)
84    }
85
86    /// Gets an f32 value at the given column index.
87    #[must_use]
88    pub fn get_f32(&self, col: usize) -> Option<f32> {
89        self.get::<f32>(col)
90    }
91
92    /// Gets an f64 value at the given column index.
93    #[must_use]
94    pub fn get_f64(&self, col: usize) -> Option<f64> {
95        self.get::<f64>(col)
96    }
97
98    /// Gets a bool value at the given column index.
99    #[must_use]
100    pub fn get_bool(&self, col: usize) -> Option<bool> {
101        self.get::<bool>(col)
102    }
103
104    /// Gets a String value at the given column index.
105    #[must_use]
106    pub fn get_string(&self, col: usize) -> Option<String> {
107        self.get::<String>(col)
108    }
109
110    /// Gets bytes at the given column index.
111    #[must_use]
112    pub fn get_bytes(&self, col: usize) -> Option<Vec<u8>> {
113        self.get::<Vec<u8>>(col)
114    }
115
116    /// Checks if the value at the given column is null.
117    #[must_use]
118    pub fn is_null(&self, col: usize) -> bool {
119        if col >= self.batch.num_columns() {
120            return true;
121        }
122        self.batch.column(col).is_null(self.row_index)
123    }
124}
125
126/// Trait for types that can be extracted from Arrow columns.
127pub trait FromArrowValue: Sized {
128    /// Extract a value from an Arrow array at the given row index.
129    fn from_arrow_column(array: &Arc<dyn Array>, row: usize) -> Option<Self>;
130}
131
132impl FromArrowValue for i16 {
133    fn from_arrow_column(array: &Arc<dyn Array>, row: usize) -> Option<Self> {
134        if array.is_null(row) {
135            return None;
136        }
137        if let Some(arr) = array.as_any().downcast_ref::<Int16Array>() {
138            Some(arr.value(row))
139        } else if let Some(arr) = array.as_any().downcast_ref::<Int32Array>() {
140            i16::try_from(arr.value(row)).ok()
141        } else {
142            array
143                .as_any()
144                .downcast_ref::<Int64Array>()
145                .and_then(|arr| i16::try_from(arr.value(row)).ok())
146        }
147    }
148}
149
150impl FromArrowValue for i32 {
151    fn from_arrow_column(array: &Arc<dyn Array>, row: usize) -> Option<Self> {
152        if array.is_null(row) {
153            return None;
154        }
155        if let Some(arr) = array.as_any().downcast_ref::<Int32Array>() {
156            Some(arr.value(row))
157        } else if let Some(arr) = array.as_any().downcast_ref::<Int16Array>() {
158            Some(i32::from(arr.value(row)))
159        } else {
160            array
161                .as_any()
162                .downcast_ref::<Int64Array>()
163                .and_then(|arr| i32::try_from(arr.value(row)).ok())
164        }
165    }
166}
167
168impl FromArrowValue for i64 {
169    fn from_arrow_column(array: &Arc<dyn Array>, row: usize) -> Option<Self> {
170        if array.is_null(row) {
171            return None;
172        }
173        if let Some(arr) = array.as_any().downcast_ref::<Int64Array>() {
174            Some(arr.value(row))
175        } else if let Some(arr) = array.as_any().downcast_ref::<Int32Array>() {
176            Some(i64::from(arr.value(row)))
177        } else if let Some(arr) = array.as_any().downcast_ref::<Int16Array>() {
178            Some(i64::from(arr.value(row)))
179        } else if let Some(arr) = array.as_any().downcast_ref::<Date32Array>() {
180            Some(i64::from(arr.value(row)))
181        } else {
182            array
183                .as_any()
184                .downcast_ref::<TimestampMicrosecondArray>()
185                .map(|arr| arr.value(row))
186        }
187    }
188}
189
190impl FromArrowValue for f32 {
191    fn from_arrow_column(array: &Arc<dyn Array>, row: usize) -> Option<Self> {
192        if array.is_null(row) {
193            return None;
194        }
195        if let Some(arr) = array.as_any().downcast_ref::<Float32Array>() {
196            Some(arr.value(row))
197        } else {
198            array.as_any().downcast_ref::<Float64Array>().map(|arr| {
199                // Narrowing f64 → f32 is an inherent precision loss across the
200                // Float64 column path. Callers that need full precision should
201                // use the f64 accessor; this path preserves the historical
202                // best-effort behavior.
203                #[expect(
204                    clippy::cast_possible_truncation,
205                    reason = "f64 → f32 narrowing is caller-accepted precision loss for this column-coercion path"
206                )]
207                let narrowed = arr.value(row) as f32;
208                narrowed
209            })
210        }
211    }
212}
213
214impl FromArrowValue for f64 {
215    fn from_arrow_column(array: &Arc<dyn Array>, row: usize) -> Option<Self> {
216        if array.is_null(row) {
217            return None;
218        }
219        if let Some(arr) = array.as_any().downcast_ref::<Float64Array>() {
220            Some(arr.value(row))
221        } else {
222            array
223                .as_any()
224                .downcast_ref::<Float32Array>()
225                .map(|arr| f64::from(arr.value(row)))
226        }
227    }
228}
229
230impl FromArrowValue for bool {
231    fn from_arrow_column(array: &Arc<dyn Array>, row: usize) -> Option<Self> {
232        if array.is_null(row) {
233            return None;
234        }
235        array
236            .as_any()
237            .downcast_ref::<BooleanArray>()
238            .map(|arr| arr.value(row))
239    }
240}
241
242impl FromArrowValue for String {
243    fn from_arrow_column(array: &Arc<dyn Array>, row: usize) -> Option<Self> {
244        if array.is_null(row) {
245            return None;
246        }
247        if let Some(arr) = array.as_any().downcast_ref::<StringArray>() {
248            Some(arr.value(row).to_string())
249        } else {
250            array
251                .as_any()
252                .downcast_ref::<LargeStringArray>()
253                .map(|arr| arr.value(row).to_string())
254        }
255    }
256}
257
258impl FromArrowValue for Vec<u8> {
259    fn from_arrow_column(array: &Arc<dyn Array>, row: usize) -> Option<Self> {
260        if array.is_null(row) {
261            return None;
262        }
263        if let Some(arr) = array.as_any().downcast_ref::<BinaryArray>() {
264            Some(arr.value(row).to_vec())
265        } else {
266            array
267                .as_any()
268                .downcast_ref::<LargeBinaryArray>()
269                .map(|arr| arr.value(row).to_vec())
270        }
271    }
272}
273
274/// A chunk of rows from Arrow data, analogous to TCP's row chunks.
275#[derive(Debug)]
276pub struct ArrowChunk {
277    batch: RecordBatch,
278}
279
280impl ArrowChunk {
281    /// Creates a new `ArrowChunk` from a `RecordBatch`.
282    pub(crate) fn new(batch: RecordBatch) -> Self {
283        ArrowChunk { batch }
284    }
285
286    /// Returns the number of rows in this chunk.
287    #[must_use]
288    pub fn len(&self) -> usize {
289        self.batch.num_rows()
290    }
291
292    /// Returns true if this chunk has no rows.
293    #[must_use]
294    pub fn is_empty(&self) -> bool {
295        self.batch.num_rows() == 0
296    }
297
298    /// Returns the number of columns.
299    #[must_use]
300    pub fn column_count(&self) -> usize {
301        self.batch.num_columns()
302    }
303
304    /// Gets the row at the given index.
305    #[must_use]
306    pub fn row(&self, index: usize) -> Option<ArrowRow<'_>> {
307        if index < self.batch.num_rows() {
308            Some(ArrowRow::new(&self.batch, index))
309        } else {
310            None
311        }
312    }
313
314    /// Returns the first row, if any.
315    #[must_use]
316    pub fn first(&self) -> Option<ArrowRow<'_>> {
317        self.row(0)
318    }
319
320    /// Returns an iterator over the rows.
321    #[must_use]
322    pub fn iter(&self) -> ArrowChunkIter<'_> {
323        ArrowChunkIter {
324            chunk: self,
325            index: 0,
326        }
327    }
328
329    /// Consumes the chunk and returns the underlying `RecordBatch`.
330    #[must_use]
331    pub fn into_batch(self) -> RecordBatch {
332        self.batch
333    }
334}
335
336impl<'a> IntoIterator for &'a ArrowChunk {
337    type Item = ArrowRow<'a>;
338    type IntoIter = ArrowChunkIter<'a>;
339
340    fn into_iter(self) -> Self::IntoIter {
341        self.iter()
342    }
343}
344
345/// Iterator over rows in an `ArrowChunk`.
346#[derive(Debug)]
347pub struct ArrowChunkIter<'a> {
348    chunk: &'a ArrowChunk,
349    index: usize,
350}
351
352impl<'a> Iterator for ArrowChunkIter<'a> {
353    type Item = ArrowRow<'a>;
354
355    fn next(&mut self) -> Option<Self::Item> {
356        if self.index < self.chunk.len() {
357            let row = ArrowRow::new(&self.chunk.batch, self.index);
358            self.index += 1;
359            Some(row)
360        } else {
361            None
362        }
363    }
364
365    fn size_hint(&self) -> (usize, Option<usize>) {
366        let remaining = self.chunk.len() - self.index;
367        (remaining, Some(remaining))
368    }
369}
370
371impl ExactSizeIterator for ArrowChunkIter<'_> {}
372
373/// Source of Arrow IPC byte chunks for streaming decode.
374///
375/// Implement this for any blocking chunk producer. The canonical use is
376/// wrapping a gRPC chunk stream so `ArrowRowset` can decode record batches
377/// lazily, keeping peak memory bounded by roughly one chunk regardless of
378/// total result size.
379///
380/// Returning `Ok(None)` signals end-of-stream.
381pub trait ChunkSource: Send {
382    /// Returns the next Arrow IPC byte chunk, or `Ok(None)` once the source
383    /// is exhausted. Subsequent calls after `Ok(None)` should keep
384    /// returning `Ok(None)`.
385    ///
386    /// # Errors
387    ///
388    /// Implementations return whatever transport error the underlying
389    /// source produces (typically [`Error::Client`] from a gRPC stream or
390    /// [`Error::Io`] on network failures).
391    fn next_chunk(&mut self) -> Result<Option<Bytes>>;
392}
393
394/// Parsed Arrow result set for streaming row access.
395///
396/// Constructed either from fully materialized bytes
397/// ([`from_bytes`](Self::from_bytes), [`from_buffer`](Self::from_buffer),
398/// [`from_chunks`](Self::from_chunks), [`from_ipc_slice`](Self::from_ipc_slice))
399/// or from a lazy chunk source
400/// ([`from_stream`](Self::from_stream)).
401///
402/// The lazy constructor pulls and decodes chunks on demand from
403/// [`next_chunk`](Self::next_chunk), so for very large result sets (GB-class
404/// gRPC query results) peak client memory is bounded by roughly one chunk
405/// plus whatever batches the caller is holding, rather than growing to the
406/// full result size.
407pub struct ArrowRowset {
408    inner: ArrowRowsetInner,
409    schema: Arc<Schema>,
410}
411
412impl std::fmt::Debug for ArrowRowset {
413    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
414        f.debug_struct("ArrowRowset")
415            .field("schema", &self.schema)
416            .finish_non_exhaustive()
417    }
418}
419
420enum ArrowRowsetInner {
421    /// All batches are already decoded and held in `batches`; `current` is
422    /// the next index to return from `next_chunk`.
423    Buffered {
424        batches: Vec<RecordBatch>,
425        current: usize,
426    },
427    /// Lazy: pull bytes from `source` and decode record batches on demand.
428    ///
429    /// Keeps a single `StreamDecoder` across source chunks so its schema
430    /// state carries over — hyperd typically sends the schema only in the
431    /// first chunk and follows up with batch-only continuation chunks.
432    /// When the decoder hits an EOS marker or a second schema message, we
433    /// swap in a fresh decoder (but keep the already-learned schema) so we
434    /// also tolerate the "multiple concatenated IPC streams" shape.
435    ///
436    /// `leftover` carries unconsumed bytes from the previous chunk so
437    /// messages split across chunk boundaries are reassembled.
438    Streaming {
439        source: Box<dyn ChunkSource>,
440        decoder: StreamDecoder,
441        pending: VecDeque<RecordBatch>,
442        leftover: Option<Buffer>,
443        exhausted: bool,
444    },
445}
446
447impl ArrowRowset {
448    /// Empty rowset (no schema, no batches).
449    fn empty() -> Self {
450        ArrowRowset {
451            inner: ArrowRowsetInner::Buffered {
452                batches: Vec::new(),
453                current: 0,
454            },
455            schema: Arc::new(Schema::empty()),
456        }
457    }
458
459    /// Parse Arrow IPC bytes from a shared `Bytes` handle (zero-copy).
460    ///
461    /// Tolerant of two shapes:
462    /// - a single continuous Arrow IPC stream (what libpq COPY TO STDOUT
463    ///   with `arrowstream` format produces), or
464    /// - one or more self-contained streams concatenated end-to-end (what
465    ///   hyperd's gRPC `execute_query_to_arrow` produces when the server
466    ///   split the result across multiple `BinaryPart` messages —
467    ///   `into_arrow_data` glued them together).
468    ///
469    /// Arrow record batches reference the same allocation as the input
470    /// `Bytes`, so fixed-width columns do not incur any memcpy. Prefer this
471    /// over [`from_ipc_slice`](Self::from_ipc_slice) whenever you already
472    /// have a `Bytes` (which is the native return type of the gRPC path).
473    ///
474    /// # Errors
475    ///
476    /// Returns [`Error::Other`] wrapping an Arrow IPC decode error if
477    /// `bytes` is not a valid Arrow IPC stream (or concatenation thereof).
478    pub fn from_bytes(bytes: Bytes) -> Result<Self> {
479        if bytes.is_empty() {
480            return Ok(Self::empty());
481        }
482        Self::from_buffer(Buffer::from(bytes))
483    }
484
485    /// Parse Arrow IPC bytes from an arrow `Buffer` (zero-copy).
486    ///
487    /// See [`from_bytes`](Self::from_bytes) for how this tolerates both
488    /// continuous and concatenated-stream inputs.
489    ///
490    /// # Errors
491    ///
492    /// Returns [`Error::Other`] wrapping an Arrow IPC decode error if
493    /// `buf` is not a valid Arrow IPC stream.
494    pub fn from_buffer(buf: Buffer) -> Result<Self> {
495        if buf.is_empty() {
496            return Ok(Self::empty());
497        }
498        let (schema, batches) = decode_possibly_concatenated_streams(buf)?;
499        Ok(ArrowRowset {
500            inner: ArrowRowsetInner::Buffered {
501                batches,
502                current: 0,
503            },
504            schema,
505        })
506    }
507
508    /// Parse Arrow IPC bytes from multiple independent chunks (zero-copy).
509    ///
510    /// Each chunk is treated as its own self-contained Arrow IPC stream
511    /// (schema + batches + optional EOS). This matches hyperd's gRPC
512    /// output, where every `BinaryPart` message carries a fresh schema.
513    /// For a single continuous stream, use [`from_bytes`](Self::from_bytes).
514    ///
515    /// # Errors
516    ///
517    /// Returns [`Error::Other`] wrapping an Arrow IPC decode error if any
518    /// chunk cannot be parsed as a self-contained IPC stream.
519    pub fn from_chunks<I>(chunks: I) -> Result<Self>
520    where
521        I: IntoIterator<Item = Bytes>,
522    {
523        let mut batches = Vec::new();
524        let mut schema = Arc::new(Schema::empty());
525        for chunk in chunks {
526            if chunk.is_empty() {
527                continue;
528            }
529            let (chunk_schema, chunk_batches) = decode_chunk(chunk)?;
530            if schema.fields().is_empty() {
531                schema = chunk_schema;
532            }
533            batches.extend(chunk_batches);
534        }
535        Ok(ArrowRowset {
536            inner: ArrowRowsetInner::Buffered {
537                batches,
538                current: 0,
539            },
540            schema,
541        })
542    }
543
544    /// Build a streaming rowset that pulls chunks from `source` on demand.
545    ///
546    /// Unlike the `from_*` constructors, this does **not** pre-decode the
547    /// whole IPC stream up front. Each call to
548    /// [`next_chunk`](Self::next_chunk) pulls just enough bytes from
549    /// `source` to produce one Arrow `RecordBatch`. Peak memory is bounded
550    /// by one source chunk (typically the tonic `max_decoding_message_size`
551    /// default of 64 MB) plus any batches the caller is still holding —
552    /// regardless of total result size.
553    ///
554    /// The first source chunk is pulled eagerly so that [`schema`](Self::schema)
555    /// returns the real schema before the first `next_chunk` call. If the
556    /// stream is empty, an empty rowset with `Schema::empty()` is returned.
557    ///
558    /// # Errors
559    ///
560    /// - Returns the transport error from `source.next_chunk()` when
561    ///   priming the decoder with the first chunk.
562    /// - Returns [`Error::Other`] wrapping an Arrow IPC decode error if
563    ///   that first chunk is not a valid Arrow IPC stream prefix.
564    pub fn from_stream(source: Box<dyn ChunkSource>) -> Result<Self> {
565        let mut rowset = ArrowRowset {
566            inner: ArrowRowsetInner::Streaming {
567                source,
568                decoder: StreamDecoder::new(),
569                pending: VecDeque::new(),
570                leftover: None,
571                exhausted: false,
572            },
573            schema: Arc::new(Schema::empty()),
574        };
575        // Eagerly consume source chunks until the schema is available so
576        // `schema()` returns the real schema before `next_chunk()` is
577        // called. Any decoded batches go straight into `pending`.
578        rowset.prime_stream()?;
579        Ok(rowset)
580    }
581
582    /// Drive the streaming decoder until we have the schema (or the
583    /// source is exhausted). Decoded batches land in `pending`, any
584    /// leftover bytes get stashed on the rowset for the first
585    /// `next_chunk` call to consume.
586    fn prime_stream(&mut self) -> Result<()> {
587        let new_schema = {
588            let ArrowRowsetInner::Streaming {
589                source,
590                decoder,
591                pending,
592                leftover,
593                exhausted,
594            } = &mut self.inner
595            else {
596                return Ok(());
597            };
598            while decoder.schema().is_none() && !*exhausted {
599                let mut buf = match leftover.take() {
600                    Some(b) => b,
601                    None => match source.next_chunk()? {
602                        Some(bytes) if !bytes.is_empty() => Buffer::from(bytes),
603                        Some(_) => continue,
604                        None => {
605                            *exhausted = true;
606                            break;
607                        }
608                    },
609                };
610                drive_streaming_decoder(decoder, &mut buf, pending)?;
611                if !buf.is_empty() {
612                    *leftover = Some(buf);
613                }
614            }
615            decoder.schema()
616        };
617        if let Some(s) = new_schema {
618            self.schema = s;
619        }
620        Ok(())
621    }
622
623    /// Parse Arrow IPC bytes from a borrowed slice.
624    ///
625    /// This copies `data` into an arrow `Buffer` before decoding. Prefer
626    /// [`from_bytes`](Self::from_bytes) when you already own a `Bytes`.
627    ///
628    /// # Errors
629    ///
630    /// Returns [`Error::Other`] wrapping an Arrow IPC decode error if
631    /// `data` is not a valid Arrow IPC stream.
632    pub fn from_ipc_slice(data: &[u8]) -> Result<Self> {
633        if data.is_empty() {
634            return Ok(Self::empty());
635        }
636        // `Buffer::from` over a `Vec<u8>` takes ownership without copying,
637        // but we start from a borrowed slice, so we must copy once here.
638        Self::from_buffer(Buffer::from(data.to_vec()))
639    }
640
641    /// Returns the schema of the result set.
642    #[must_use]
643    pub fn schema(&self) -> &Arc<Schema> {
644        &self.schema
645    }
646
647    /// Returns the number of columns.
648    #[must_use]
649    pub fn column_count(&self) -> usize {
650        self.schema.fields().len()
651    }
652
653    /// Returns column names.
654    #[must_use]
655    pub fn column_names(&self) -> Vec<String> {
656        self.schema
657            .fields()
658            .iter()
659            .map(|f| f.name().clone())
660            .collect()
661    }
662
663    /// Returns the column name at the given index.
664    #[must_use]
665    pub fn column_name(&self, index: usize) -> Option<&str> {
666        self.schema.fields().get(index).map(|f| f.name().as_str())
667    }
668
669    /// Gets the next chunk of rows.
670    ///
671    /// For buffered rowsets this walks a preallocated `Vec<RecordBatch>`.
672    /// For streaming rowsets it pulls and decodes source chunks on demand
673    /// until at least one record batch is ready (or the source is
674    /// exhausted).
675    ///
676    /// # Errors
677    ///
678    /// For streaming rowsets:
679    /// - Returns the transport error from `source.next_chunk()`.
680    /// - Returns [`Error::Other`] wrapping an Arrow IPC decode error if a
681    ///   chunk contains malformed stream bytes.
682    ///
683    /// Buffered rowsets never error — they walk a pre-decoded vector.
684    pub fn next_chunk(&mut self) -> Result<Option<ArrowChunk>> {
685        match &mut self.inner {
686            ArrowRowsetInner::Buffered { batches, current } => {
687                if *current >= batches.len() {
688                    return Ok(None);
689                }
690                let batch = batches[*current].clone();
691                *current += 1;
692                Ok(Some(ArrowChunk::new(batch)))
693            }
694            ArrowRowsetInner::Streaming {
695                source,
696                decoder,
697                pending,
698                leftover,
699                exhausted,
700            } => loop {
701                if let Some(batch) = pending.pop_front() {
702                    return Ok(Some(ArrowChunk::new(batch)));
703                }
704                if *exhausted {
705                    return Ok(None);
706                }
707                let mut buf = match leftover.take() {
708                    Some(b) => b,
709                    None => match source.next_chunk()? {
710                        Some(bytes) if !bytes.is_empty() => Buffer::from(bytes),
711                        Some(_) => continue,
712                        None => {
713                            *exhausted = true;
714                            continue;
715                        }
716                    },
717                };
718                drive_streaming_decoder(decoder, &mut buf, pending)?;
719                if !buf.is_empty() {
720                    *leftover = Some(buf);
721                }
722            },
723        }
724    }
725
726    /// Returns the total number of rows across all batches.
727    ///
728    /// For streaming rowsets this reflects only batches decoded **so far** —
729    /// until [`next_chunk`](Self::next_chunk) has pulled everything from the
730    /// source, the total is not yet known.
731    #[must_use]
732    pub fn total_rows(&self) -> usize {
733        match &self.inner {
734            ArrowRowsetInner::Buffered { batches, .. } => batches
735                .iter()
736                .map(arrow::array::RecordBatch::num_rows)
737                .sum(),
738            ArrowRowsetInner::Streaming { pending, .. } => pending
739                .iter()
740                .map(arrow::array::RecordBatch::num_rows)
741                .sum(),
742        }
743    }
744
745    /// Returns true if there are no rows available **right now**.
746    ///
747    /// For streaming rowsets this only reflects the currently-decoded
748    /// batches, not the full result — a streaming rowset that has not been
749    /// iterated will usually report `is_empty() == true` even if the server
750    /// will send more data on `next_chunk`.
751    #[must_use]
752    pub fn is_empty(&self) -> bool {
753        match &self.inner {
754            ArrowRowsetInner::Buffered { batches, .. } => {
755                batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0)
756            }
757            ArrowRowsetInner::Streaming {
758                pending, exhausted, ..
759            } => *exhausted && pending.is_empty(),
760        }
761    }
762}
763
764/// Convert Arrow `DataType` to SQL type name string.
765fn arrow_type_to_sql_name(dt: &DataType) -> String {
766    match dt {
767        DataType::Boolean => "BOOLEAN".to_string(),
768        DataType::Int8 => "SMALLINT".to_string(),
769        DataType::Int16 => "SMALLINT".to_string(),
770        DataType::Int32 => "INTEGER".to_string(),
771        DataType::Int64 => "BIGINT".to_string(),
772        DataType::UInt8 => "SMALLINT".to_string(),
773        DataType::UInt16 => "INTEGER".to_string(),
774        DataType::UInt32 => "BIGINT".to_string(),
775        DataType::UInt64 => "BIGINT".to_string(),
776        DataType::Float16 => "REAL".to_string(),
777        DataType::Float32 => "REAL".to_string(),
778        DataType::Float64 => "DOUBLE PRECISION".to_string(),
779        DataType::Utf8 | DataType::LargeUtf8 => "TEXT".to_string(),
780        DataType::Binary | DataType::LargeBinary => "BYTEA".to_string(),
781        DataType::Date32 | DataType::Date64 => "DATE".to_string(),
782        DataType::Time32(_) | DataType::Time64(_) => "TIME".to_string(),
783        DataType::Timestamp(TimeUnit::Microsecond, None) => "TIMESTAMP".to_string(),
784        DataType::Timestamp(TimeUnit::Microsecond, Some(_)) => "TIMESTAMPTZ".to_string(),
785        DataType::Timestamp(_, None) => "TIMESTAMP".to_string(),
786        DataType::Timestamp(_, Some(_)) => "TIMESTAMPTZ".to_string(),
787        DataType::Decimal128(p, s) => format!("NUMERIC({p}, {s})"),
788        DataType::Decimal256(p, s) => format!("NUMERIC({p}, {s})"),
789        DataType::Interval(_) => "INTERVAL".to_string(),
790        DataType::List(_) => "ARRAY".to_string(),
791        DataType::Struct(_) => "RECORD".to_string(),
792        _ => "UNKNOWN".to_string(),
793    }
794}
795
796/// Narrows an Arrow `Decimal*` scale (`i8`) to the `u32` scale carried by
797/// `SqlType::Numeric`. Negative scales are not representable in `SqlType` and
798/// indicate a schema mismatch; we clamp to `0` rather than panic so that a
799/// malformed Arrow schema does not take down an entire query.
800fn decimal_scale_to_u32(scale: i8) -> u32 {
801    u32::try_from(scale).unwrap_or(0)
802}
803
804/// Convert Arrow `DataType` to `SqlType`.
805pub(crate) fn arrow_type_to_sql_type(dt: &DataType) -> SqlType {
806    match dt {
807        DataType::Boolean => SqlType::Bool,
808        DataType::Int8 | DataType::Int16 => SqlType::SmallInt,
809        DataType::Int32 => SqlType::Int,
810        DataType::Int64 => SqlType::BigInt,
811        DataType::UInt8 | DataType::UInt16 => SqlType::SmallInt,
812        DataType::UInt32 => SqlType::Int,
813        DataType::UInt64 => SqlType::BigInt,
814        DataType::Float16 | DataType::Float32 => SqlType::Float,
815        DataType::Float64 => SqlType::Double,
816        DataType::Utf8 | DataType::LargeUtf8 => SqlType::Text,
817        DataType::Binary | DataType::LargeBinary => SqlType::ByteA,
818        DataType::Date32 | DataType::Date64 => SqlType::Date,
819        DataType::Time32(_) | DataType::Time64(_) => SqlType::Time,
820        DataType::Timestamp(_, None) => SqlType::Timestamp,
821        DataType::Timestamp(_, Some(_)) => SqlType::TimestampTz,
822        DataType::Decimal128(p, s) => SqlType::Numeric {
823            precision: u32::from(*p),
824            scale: decimal_scale_to_u32(*s),
825        },
826        DataType::Decimal256(p, s) => SqlType::Numeric {
827            precision: u32::from(*p),
828            scale: decimal_scale_to_u32(*s),
829        },
830        DataType::Interval(_) => SqlType::Interval,
831        _ => SqlType::Text, // Fallback to text for unknown types
832    }
833}
834
835/// Parses Arrow IPC stream bytes into a vector of `RecordBatches` (zero-copy).
836///
837/// The returned batches share their underlying allocation with the input
838/// `Bytes`, so fixed-width columns do not incur any memcpy.
839///
840/// # Errors
841///
842/// Returns [`Error::Other`] wrapping an Arrow IPC decode error if `bytes`
843/// is not a valid Arrow IPC stream (or concatenation thereof).
844pub fn parse_arrow_ipc(bytes: Bytes) -> Result<Vec<RecordBatch>> {
845    if bytes.is_empty() {
846        return Ok(Vec::new());
847    }
848    let (_, batches) = decode_possibly_concatenated_streams(Buffer::from(bytes))?;
849    Ok(batches)
850}
851
852/// Decode a single self-contained Arrow IPC stream (schema + batches +
853/// optional EOS) and return the schema plus the record batches. Used per
854/// chunk on the streaming / gRPC path. Tolerates trailing EOS bytes and
855/// trailing extra streams by delegating to
856/// `decode_possibly_concatenated_streams`.
857fn decode_chunk(bytes: Bytes) -> Result<(Arc<Schema>, Vec<RecordBatch>)> {
858    decode_possibly_concatenated_streams(Buffer::from(bytes))
859}
860
861/// Arrow IPC end-of-stream marker: continuation tag `0xFFFFFFFF` followed
862/// by a zero length. When `StreamDecoder` consumes this marker it moves
863/// to its `Finished` state and subsequent calls fail with
864/// "Unexpected EOS"; detecting the marker ourselves lets us spin up a
865/// fresh decoder to start the next stream.
866const ARROW_IPC_EOS: [u8; 8] = [0xFF, 0xFF, 0xFF, 0xFF, 0, 0, 0, 0];
867
868/// Run `decoder` over `buf`, pushing every decoded record batch onto
869/// `pending`. Advances `buf` as bytes are consumed; stops when `buf` is
870/// empty, the decoder signals it needs more data, or we hit a stream
871/// boundary that requires a fresh decoder.
872///
873/// Stream boundaries we recognize:
874/// - `ARROW_IPC_EOS` marker at the start of `buf` — consume the 8 bytes
875///   and swap `*decoder` for a fresh one so the next stream can start.
876/// - "Not expecting a schema when messages are read" error from the
877///   decoder — hyperd sometimes concatenates streams without an EOS in
878///   between. Roll `buf` back to before the decode call and swap the
879///   decoder; the caller will re-enter this function to decode the new
880///   stream.
881fn drive_streaming_decoder(
882    decoder: &mut StreamDecoder,
883    buf: &mut Buffer,
884    pending: &mut VecDeque<RecordBatch>,
885) -> Result<()> {
886    loop {
887        if buf.is_empty() {
888            return Ok(());
889        }
890        // EOS marker at the head → consume it. If there are more bytes
891        // after it, reset the decoder to start a fresh stream. Preserve
892        // the decoder (and its schema) if this is a trailing EOS with no
893        // more data, since the caller may pull more bytes later that
894        // continue using the same schema.
895        if buf.len() >= ARROW_IPC_EOS.len() && buf[..ARROW_IPC_EOS.len()] == ARROW_IPC_EOS {
896            let new_len = buf.len() - ARROW_IPC_EOS.len();
897            *buf = buf.slice_with_length(ARROW_IPC_EOS.len(), new_len);
898            if !buf.is_empty() {
899                *decoder = StreamDecoder::new();
900            }
901            continue;
902        }
903        // When we already have a schema and the next message is another
904        // schema (hyperd emits repeated schemas at chunk boundaries),
905        // peek the message size and skip past it rather than resetting
906        // the decoder — resetting would discard the schema and make the
907        // decoder reject the next RecordBatch with "Missing schema".
908        if decoder.schema().is_some() && peek_is_schema_message(buf) {
909            match peek_message_total_size(buf) {
910                Some(total) if buf.len() >= total => {
911                    *buf = buf.slice_with_length(total, buf.len() - total);
912                    continue;
913                }
914                _ => {}
915            }
916        }
917        let buf_before = buf.clone();
918        match decoder.decode(buf) {
919            Ok(Some(batch)) => pending.push_back(batch),
920            Ok(None) => return Ok(()),
921            Err(e) => {
922                let msg = e.to_string();
923                if msg.contains("Not expecting a schema when messages are read") {
924                    // Roll back and swap the decoder; the next iteration
925                    // will start a fresh stream.
926                    *buf = buf_before;
927                    *decoder = StreamDecoder::new();
928                    continue;
929                }
930                if msg.contains("Unexpected EOS") {
931                    // Decoder already in Finished state; roll back and
932                    // swap. Keep any pending bytes (probably a new
933                    // stream start) for the next pass.
934                    *buf = buf_before;
935                    *decoder = StreamDecoder::new();
936                    continue;
937                }
938                return Err(Error::new(format!("Failed to parse Arrow IPC data: {e}")));
939            }
940        }
941    }
942}
943
944/// Peeks at the first message of `buf` and returns `true` if it is an
945/// Arrow IPC Schema message (so we can skip repeated schemas emitted at
946/// chunk boundaries). Only inspects the flatbuffer header type byte.
947fn peek_is_schema_message(buf: &Buffer) -> bool {
948    // Header layout: 4-byte continuation marker + 4-byte message length +
949    // `length` bytes of flatbuffer message. Inside the flatbuffer, the
950    // message `header_type` is an enum where Schema = 1 (see the Arrow
951    // IPC spec).
952    let Some((_len, body)) = peek_message_body(buf) else {
953        return false;
954    };
955    // Root table offset is the first 4 bytes of the message. The type
956    // byte lives at offset root + vtable[header_type]. Parsing the full
957    // flatbuffer just to check one enum is overkill, so we use
958    // `arrow_ipc::root_as_message` lightly — if parsing fails, fall back
959    // to letting the decoder handle it.
960    match arrow::ipc::root_as_message(body) {
961        Ok(msg) => msg.header_type() == arrow::ipc::MessageHeader::Schema,
962        Err(_) => false,
963    }
964}
965
966/// Returns `Some((message_len, message_body_bytes))` if `buf` starts with
967/// a full Arrow IPC framed message whose flatbuffer body is present.
968fn peek_message_body(buf: &Buffer) -> Option<(usize, &[u8])> {
969    let bytes: &[u8] = buf;
970    // Optional continuation marker.
971    let (length_offset, remaining) = if bytes.len() >= 4 && bytes[0..4] == [0xFF; 4] {
972        (4, &bytes[4..])
973    } else {
974        (0, bytes)
975    };
976    if remaining.len() < 4 {
977        return None;
978    }
979    let length =
980        u32::from_le_bytes([remaining[0], remaining[1], remaining[2], remaining[3]]) as usize;
981    let body_start = length_offset + 4;
982    if buf.len() < body_start + length {
983        return None;
984    }
985    Some((body_start + length, &bytes[body_start..body_start + length]))
986}
987
988/// Returns the total byte size of the first framed Arrow IPC message in
989/// `buf` — the continuation marker (if present) + 4-byte length +
990/// flatbuffer body + any body bytes signalled by the flatbuffer. We only
991/// use this for schema messages, which have zero body bytes.
992fn peek_message_total_size(buf: &Buffer) -> Option<usize> {
993    let (total, _body) = peek_message_body(buf)?;
994    Some(total)
995}
996
997/// Decode an arrow Buffer that may be one continuous Arrow IPC stream,
998/// multiple self-contained streams concatenated end to end, or multiple
999/// continuation streams that share a schema from the first one. Handles
1000/// all three shapes via `drive_streaming_decoder`, which carries decoder
1001/// state across stream boundaries.
1002fn decode_possibly_concatenated_streams(
1003    mut buf: Buffer,
1004) -> Result<(Arc<Schema>, Vec<RecordBatch>)> {
1005    let mut decoder = StreamDecoder::new();
1006    let mut pending = VecDeque::new();
1007    while !buf.is_empty() {
1008        let before_len = buf.len();
1009        drive_streaming_decoder(&mut decoder, &mut buf, &mut pending)?;
1010        if buf.len() == before_len {
1011            // drive_streaming_decoder consumed nothing; we're either
1012            // done (if buf is empty) or stuck (malformed input).
1013            if !buf.is_empty() {
1014                return Err(Error::new(
1015                    "Failed to parse Arrow IPC data: decoder made no progress",
1016                ));
1017            }
1018            break;
1019        }
1020    }
1021    let schema = decoder
1022        .schema()
1023        .unwrap_or_else(|| Arc::new(Schema::empty()));
1024    Ok((schema, pending.into_iter().collect()))
1025}
1026
1027#[cfg(test)]
1028mod tests {
1029    use super::*;
1030    use arrow::datatypes::Field;
1031
1032    #[test]
1033    fn test_arrow_rowset_empty() {
1034        let rowset = ArrowRowset::from_bytes(Bytes::new()).unwrap();
1035        assert!(rowset.is_empty());
1036        assert_eq!(rowset.column_count(), 0);
1037
1038        let rowset = ArrowRowset::from_ipc_slice(&[]).unwrap();
1039        assert!(rowset.is_empty());
1040    }
1041
1042    #[test]
1043    fn test_arrow_chunk_iteration() {
1044        let schema = Arc::new(Schema::new(vec![
1045            Field::new("id", DataType::Int32, false),
1046            Field::new("name", DataType::Utf8, true),
1047        ]));
1048
1049        let id_array = Int32Array::from(vec![1, 2, 3]);
1050        let name_array = StringArray::from(vec![Some("Alice"), Some("Bob"), None]);
1051
1052        let batch =
1053            RecordBatch::try_new(schema, vec![Arc::new(id_array), Arc::new(name_array)]).unwrap();
1054
1055        let chunk = ArrowChunk::new(batch);
1056        assert_eq!(chunk.len(), 3);
1057        assert_eq!(chunk.column_count(), 2);
1058
1059        let mut iter = chunk.iter();
1060
1061        let row0 = iter.next().unwrap();
1062        assert_eq!(row0.get::<i32>(0), Some(1));
1063        assert_eq!(row0.get::<String>(1), Some("Alice".to_string()));
1064
1065        let row1 = iter.next().unwrap();
1066        assert_eq!(row1.get::<i32>(0), Some(2));
1067        assert_eq!(row1.get::<String>(1), Some("Bob".to_string()));
1068
1069        let row2 = iter.next().unwrap();
1070        assert_eq!(row2.get::<i32>(0), Some(3));
1071        assert_eq!(row2.get::<String>(1), None);
1072        assert!(row2.is_null(1));
1073
1074        assert!(iter.next().is_none());
1075    }
1076
1077    /// A `ChunkSource` backed by a pre-populated `VecDeque<Bytes>`, used to
1078    /// exercise the streaming path in tests.
1079    struct VecChunkSource {
1080        chunks: VecDeque<Bytes>,
1081    }
1082
1083    impl VecChunkSource {
1084        fn new(chunks: Vec<Bytes>) -> Self {
1085            VecChunkSource {
1086                chunks: chunks.into(),
1087            }
1088        }
1089    }
1090
1091    impl ChunkSource for VecChunkSource {
1092        fn next_chunk(&mut self) -> Result<Option<Bytes>> {
1093            Ok(self.chunks.pop_front())
1094        }
1095    }
1096
1097    /// Builds `num_streams` self-contained Arrow IPC streams, each with
1098    /// the same schema and `rows_per_stream` rows, and returns them as a
1099    /// `Vec<Bytes>`. Mirrors the shape of real gRPC `BinaryPart` chunks:
1100    /// every chunk is its own complete IPC stream.
1101    fn serialize_independent_streams(num_streams: usize, rows_per_stream: i32) -> Vec<Bytes> {
1102        use arrow::ipc::writer::StreamWriter;
1103        let schema = Arc::new(Schema::new(vec![
1104            Field::new("id", DataType::Int32, false),
1105            Field::new("name", DataType::Utf8, true),
1106        ]));
1107        let mut out = Vec::with_capacity(num_streams);
1108        for s in 0..num_streams {
1109            let start = i32::try_from(s).expect("test uses small stream counts") * rows_per_stream;
1110            let id_array = Int32Array::from((start..start + rows_per_stream).collect::<Vec<_>>());
1111            let name_array = StringArray::from(
1112                (start..start + rows_per_stream)
1113                    .map(|i| Some(format!("n{i}")))
1114                    .collect::<Vec<_>>(),
1115            );
1116            let batch = RecordBatch::try_new(
1117                Arc::clone(&schema),
1118                vec![Arc::new(id_array), Arc::new(name_array)],
1119            )
1120            .unwrap();
1121
1122            let mut buf: Vec<u8> = Vec::new();
1123            {
1124                let mut writer = StreamWriter::try_new(&mut buf, &schema).unwrap();
1125                writer.write(&batch).unwrap();
1126                writer.finish().unwrap();
1127            }
1128            out.push(Bytes::from(buf));
1129        }
1130        out
1131    }
1132
1133    #[test]
1134    fn test_streaming_rowset_single_chunk() {
1135        let chunks = serialize_independent_streams(1, 100);
1136        let source = Box::new(VecChunkSource::new(chunks));
1137        let mut rowset = ArrowRowset::from_stream(source).unwrap();
1138
1139        // Schema is primed eagerly so it's available before next_chunk.
1140        assert_eq!(rowset.column_count(), 2);
1141        assert_eq!(rowset.column_name(0), Some("id"));
1142
1143        let chunk = rowset.next_chunk().unwrap().expect("one chunk");
1144        assert_eq!(chunk.len(), 100);
1145        assert!(rowset.next_chunk().unwrap().is_none());
1146    }
1147
1148    #[test]
1149    fn test_streaming_rowset_multiple_streams() {
1150        // Four independent IPC streams of 500 rows each — the shape of
1151        // multi-chunk gRPC results. Total row count is verified.
1152        let chunks = serialize_independent_streams(4, 500);
1153        assert_eq!(chunks.len(), 4);
1154        let source = Box::new(VecChunkSource::new(chunks));
1155        let mut rowset = ArrowRowset::from_stream(source).unwrap();
1156
1157        let mut total_rows = 0;
1158        while let Some(chunk) = rowset.next_chunk().unwrap() {
1159            total_rows += chunk.len();
1160        }
1161        assert_eq!(total_rows, 2000);
1162    }
1163
1164    #[test]
1165    fn test_streaming_rowset_empty_source() {
1166        let source = Box::new(VecChunkSource::new(vec![]));
1167        let mut rowset = ArrowRowset::from_stream(source).unwrap();
1168        assert!(rowset.next_chunk().unwrap().is_none());
1169        assert!(rowset.is_empty());
1170    }
1171
1172    #[test]
1173    fn test_from_bytes_concatenated_streams() {
1174        // Two self-contained IPC streams concatenated end to end — the
1175        // shape `GrpcQueryResult::into_arrow_data` produces when multiple
1176        // chunks are glued together.
1177        let streams = serialize_independent_streams(2, 300);
1178        let mut concat = bytes::BytesMut::new();
1179        for s in &streams {
1180            concat.extend_from_slice(s);
1181        }
1182        let rowset = ArrowRowset::from_bytes(concat.freeze()).unwrap();
1183        let mut total_rows = 0usize;
1184        let mut rowset = rowset;
1185        while let Some(chunk) = rowset.next_chunk().unwrap() {
1186            total_rows += chunk.len();
1187        }
1188        assert_eq!(total_rows, 600);
1189    }
1190}