Skip to main content

hyperdb_api/
async_result.rs

1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Async streaming result sets.
5//!
6//! This module is the async mirror of the sync [`Rowset`](crate::Rowset).
7//! It wraps [`hyperdb_api_core::client::AsyncQueryStream`] (TCP) or
8//! [`crate::ArrowRowset`] (gRPC) and presents the same row-level API as the
9//! sync version — chunked iteration, schema capture, scalar collectors,
10//! first-row helpers — all over `async fn`.
11//!
12//! Arrow-backed rowsets are fully materialized in memory already (gRPC
13//! returns a complete buffer in one shot), so the Arrow path is a thin
14//! wrapper around the sync [`ArrowRowset`] with no additional awaiting.
15
16use std::sync::Arc;
17
18use arrow::record_batch::RecordBatch;
19use hyperdb_api_core::client::{AsyncQueryStream, StreamRow};
20use hyperdb_api_core::types::SqlType;
21
22use crate::arrow_result::ArrowRowset;
23use crate::error::Result;
24use crate::result::{ResultColumn, ResultSchema, Row, RowValue};
25
26/// A streaming result set returned from an async query.
27///
28/// See [`Rowset`](crate::Rowset) for the sync equivalent and the full
29/// memory-behavior contract — both use constant-memory chunked iteration.
30pub struct AsyncRowset<'conn> {
31    inner: AsyncRowsetInner<'conn>,
32    schema_cache: Option<Arc<ResultSchema>>,
33    /// Hold the prepared statement for the one-shot `query_params`
34    /// path. Its Drop-time close fires only after the rowset releases
35    /// its connection lock. See [`Rowset::with_statement_guard`](crate::Rowset)
36    /// for the sync equivalent.
37    _statement_guard: Option<hyperdb_api_core::client::AsyncPreparedStatement>,
38}
39
40impl std::fmt::Debug for AsyncRowset<'_> {
41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42        f.debug_struct("AsyncRowset")
43            .field("has_schema_cache", &self.schema_cache.is_some())
44            .finish_non_exhaustive()
45    }
46}
47
48enum AsyncRowsetInner<'conn> {
49    /// TCP streaming result via `AsyncQueryStream`.
50    Tcp(AsyncQueryStream<'conn>),
51    /// Arrow-based result from gRPC. Already fully materialized; we keep
52    /// it behind an `Option` so collectors that consume `self` can move
53    /// the inner value out.
54    Arrow(ArrowRowset),
55    /// TCP streaming result from a prepared-statement execute.
56    Prepared(hyperdb_api_core::client::AsyncPreparedQueryStream<'conn>),
57}
58
59impl<'conn> AsyncRowset<'conn> {
60    /// Constructs a new async rowset from a TCP `AsyncQueryStream`.
61    pub(crate) fn new(stream: AsyncQueryStream<'conn>) -> Self {
62        Self {
63            inner: AsyncRowsetInner::Tcp(stream),
64            schema_cache: None,
65            _statement_guard: None,
66        }
67    }
68
69    /// Constructs a new async rowset from an already-materialized Arrow
70    /// rowset (gRPC transport).
71    pub(crate) fn from_arrow(arrow_rowset: ArrowRowset) -> Self {
72        Self {
73            inner: AsyncRowsetInner::Arrow(arrow_rowset),
74            schema_cache: None,
75            _statement_guard: None,
76        }
77    }
78
79    /// Constructs a new async rowset from a prepared-statement streaming
80    /// result.
81    pub(crate) fn from_prepared(
82        stream: hyperdb_api_core::client::AsyncPreparedQueryStream<'conn>,
83    ) -> Self {
84        Self {
85            inner: AsyncRowsetInner::Prepared(stream),
86            schema_cache: None,
87            _statement_guard: None,
88        }
89    }
90
91    #[expect(
92        clippy::used_underscore_binding,
93        reason = "underscore-prefixed parameter retained for trait-method signature compatibility"
94    )]
95    /// Attaches an `AsyncPreparedStatement` that should be dropped
96    /// **after** this rowset is consumed. Used by the one-shot
97    /// prepare+execute path inside
98    /// [`crate::AsyncConnection::query_params`].
99    pub(crate) fn with_statement_guard(
100        mut self,
101        statement: hyperdb_api_core::client::AsyncPreparedStatement,
102    ) -> Self {
103        self._statement_guard = Some(statement);
104        self
105    }
106
107    /// Returns the schema (column metadata) for the result set.
108    ///
109    /// For TCP this returns `None` until the first chunk has been
110    /// fetched (the `RowDescription` is the first message of the
111    /// stream). For Arrow it is available immediately.
112    #[must_use]
113    pub fn schema(&self) -> Option<ResultSchema> {
114        if let Some(ref cached) = self.schema_cache {
115            return Some((**cached).clone());
116        }
117        self.build_schema()
118    }
119
120    fn build_schema(&self) -> Option<ResultSchema> {
121        match &self.inner {
122            AsyncRowsetInner::Tcp(stream) => stream.schema().map(|cols| {
123                let columns = cols
124                    .iter()
125                    .enumerate()
126                    .map(|(idx, col)| {
127                        let sql_type =
128                            SqlType::from_oid_and_modifier(col.type_oid().0, col.type_modifier());
129                        ResultColumn::new(col.name(), sql_type, idx)
130                    })
131                    .collect();
132                ResultSchema::from_columns(columns)
133            }),
134            AsyncRowsetInner::Prepared(stream) => {
135                let cols = stream.schema();
136                let columns = cols
137                    .iter()
138                    .enumerate()
139                    .map(|(idx, col)| {
140                        let sql_type =
141                            SqlType::from_oid_and_modifier(col.type_oid().0, col.type_modifier());
142                        ResultColumn::new(col.name(), sql_type, idx)
143                    })
144                    .collect();
145                Some(ResultSchema::from_columns(columns))
146            }
147            AsyncRowsetInner::Arrow(arrow) => {
148                let schema = arrow.schema();
149                let columns = schema
150                    .fields()
151                    .iter()
152                    .enumerate()
153                    .map(|(idx, field)| {
154                        ResultColumn::new(
155                            field.name(),
156                            crate::arrow_result::arrow_type_to_sql_type(field.data_type()),
157                            idx,
158                        )
159                    })
160                    .collect();
161                Some(ResultSchema::from_columns(columns))
162            }
163        }
164    }
165
166    fn cached_schema_arc(&mut self) -> Option<Arc<ResultSchema>> {
167        if self.schema_cache.is_none() {
168            if let Some(schema) = self.build_schema() {
169                self.schema_cache = Some(Arc::new(schema));
170            }
171        }
172        self.schema_cache.clone()
173    }
174
175    /// Returns the next chunk of rows, or `None` when the stream is
176    /// exhausted.
177    ///
178    /// # Errors
179    ///
180    /// - Returns [`crate::Error::Client`] if the server sends an `ErrorResponse`
181    ///   while streaming the result set.
182    /// - Returns [`crate::Error::Io`] on transport-level I/O failures.
183    /// - Returns [`crate::Error::Other`] if an Arrow IPC chunk cannot be decoded.
184    pub async fn next_chunk(&mut self) -> Result<Option<Vec<Row>>> {
185        enum TransportChunk {
186            Tcp(Vec<StreamRow>),
187            Arrow(Arc<RecordBatch>),
188        }
189
190        let chunk_opt: Option<TransportChunk> = match &mut self.inner {
191            AsyncRowsetInner::Tcp(stream) => stream.next_chunk().await?.map(TransportChunk::Tcp),
192            AsyncRowsetInner::Arrow(arrow) => arrow
193                .next_chunk()?
194                .map(|chunk| TransportChunk::Arrow(Arc::new(chunk.into_batch()))),
195            AsyncRowsetInner::Prepared(stream) => {
196                stream.next_chunk().await?.map(TransportChunk::Tcp)
197            }
198        };
199
200        let Some(chunk) = chunk_opt else {
201            return Ok(None);
202        };
203
204        let schema = self.cached_schema_arc();
205        let rows = match chunk {
206            TransportChunk::Tcp(stream_rows) => stream_rows
207                .into_iter()
208                .map(|row| Row::from_tcp(row, schema.clone()))
209                .collect(),
210            TransportChunk::Arrow(batch) => (0..batch.num_rows())
211                .map(|row_index| Row::from_arrow(Arc::clone(&batch), row_index, schema.clone()))
212                .collect(),
213        };
214        Ok(Some(rows))
215    }
216
217    /// Collects every remaining row into a `Vec`. Consumes the rowset.
218    ///
219    /// # Errors
220    ///
221    /// Returns the first error produced by [`next_chunk`](Self::next_chunk)
222    /// while draining the stream.
223    pub async fn collect_rows(mut self) -> Result<Vec<Row>> {
224        let mut all = Vec::new();
225        while let Some(chunk) = self.next_chunk().await? {
226            all.extend(chunk);
227        }
228        Ok(all)
229    }
230
231    /// Collects the first column of every row, preserving NULL as `None`.
232    ///
233    /// # Errors
234    ///
235    /// Returns the first error produced by [`next_chunk`](Self::next_chunk).
236    /// SQL `NULL` cells yield `Option::None` entries, not errors.
237    pub async fn collect_column<T: RowValue>(mut self) -> Result<Vec<Option<T>>> {
238        let mut values = Vec::new();
239        while let Some(chunk) = self.next_chunk().await? {
240            for row in chunk {
241                values.push(row.get::<T>(0));
242            }
243        }
244        Ok(values)
245    }
246
247    /// Collects the first column of every row, dropping NULLs.
248    ///
249    /// # Errors
250    ///
251    /// Returns the first error produced by
252    /// [`collect_column`](Self::collect_column).
253    pub async fn collect_column_non_null<T: RowValue>(self) -> Result<Vec<T>> {
254        Ok(self
255            .collect_column::<T>()
256            .await?
257            .into_iter()
258            .flatten()
259            .collect())
260    }
261
262    /// Returns the first row of the result set, or `None` if the
263    /// result is empty. Consumes the rowset.
264    ///
265    /// # Errors
266    ///
267    /// Returns the error from [`next_chunk`](Self::next_chunk). An empty
268    /// result yields `Ok(None)`.
269    pub async fn first_row(mut self) -> Result<Option<Row>> {
270        if let Some(chunk) = self.next_chunk().await? {
271            Ok(chunk.into_iter().next())
272        } else {
273            Ok(None)
274        }
275    }
276
277    /// Returns the first row, or an error if the result set is empty.
278    ///
279    /// # Errors
280    ///
281    /// - Returns the error from [`first_row`](Self::first_row).
282    /// - Returns [`crate::Error::Other`] with message `"Query returned no rows"`
283    ///   if the result set is empty.
284    pub async fn require_first_row(self) -> Result<Row> {
285        self.first_row()
286            .await?
287            .ok_or_else(|| crate::error::Error::new("Query returned no rows"))
288    }
289
290    /// Returns the first column of the first row as `Option<T>`, or an
291    /// error if the result set is empty.
292    ///
293    /// # Errors
294    ///
295    /// Returns the error from
296    /// [`require_first_row`](Self::require_first_row). SQL `NULL` in the
297    /// single cell yields `Ok(None)`.
298    pub async fn scalar<T: RowValue>(self) -> Result<Option<T>> {
299        Ok(self.require_first_row().await?.get(0))
300    }
301
302    /// Returns the first column of the first row as `T`, or an error
303    /// if the result set is empty *or* the value is NULL.
304    ///
305    /// # Errors
306    ///
307    /// - Returns the error from [`scalar`](Self::scalar).
308    /// - Returns [`crate::Error::Other`] with message `"Scalar query returned NULL"`
309    ///   if the single cell is SQL `NULL`.
310    pub async fn require_scalar<T: RowValue>(self) -> Result<T> {
311        self.scalar()
312            .await?
313            .ok_or_else(|| crate::error::Error::new("Scalar query returned NULL"))
314    }
315}