hyperdb_api/async_result.rs
1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Async streaming result sets.
5//!
6//! This module is the async mirror of the sync [`Rowset`](crate::Rowset).
7//! It wraps [`hyperdb_api_core::client::AsyncQueryStream`] (TCP) or
8//! [`crate::ArrowRowset`] (gRPC) and presents the same row-level API as the
9//! sync version — chunked iteration, schema capture, scalar collectors,
10//! first-row helpers — all over `async fn`.
11//!
12//! Arrow-backed rowsets are fully materialized in memory already (gRPC
13//! returns a complete buffer in one shot), so the Arrow path is a thin
14//! wrapper around the sync [`ArrowRowset`] with no additional awaiting.
15
16use std::sync::Arc;
17
18use arrow::record_batch::RecordBatch;
19use hyperdb_api_core::client::{AsyncQueryStream, StreamRow};
20use hyperdb_api_core::types::SqlType;
21
22use crate::arrow_result::ArrowRowset;
23use crate::error::Result;
24use crate::result::{ResultColumn, ResultSchema, Row, RowValue};
25
26/// A streaming result set returned from an async query.
27///
28/// See [`Rowset`](crate::Rowset) for the sync equivalent and the full
29/// memory-behavior contract — both use constant-memory chunked iteration.
30pub struct AsyncRowset<'conn> {
31 inner: AsyncRowsetInner<'conn>,
32 schema_cache: Option<Arc<ResultSchema>>,
33 /// Hold the prepared statement for the one-shot `query_params`
34 /// path. Its Drop-time close fires only after the rowset releases
35 /// its connection lock. See [`Rowset::with_statement_guard`](crate::Rowset)
36 /// for the sync equivalent.
37 _statement_guard: Option<hyperdb_api_core::client::AsyncPreparedStatement>,
38}
39
40impl std::fmt::Debug for AsyncRowset<'_> {
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 f.debug_struct("AsyncRowset")
43 .field("has_schema_cache", &self.schema_cache.is_some())
44 .finish_non_exhaustive()
45 }
46}
47
48enum AsyncRowsetInner<'conn> {
49 /// TCP streaming result via `AsyncQueryStream`.
50 Tcp(AsyncQueryStream<'conn>),
51 /// Arrow-based result from gRPC. Already fully materialized; we keep
52 /// it behind an `Option` so collectors that consume `self` can move
53 /// the inner value out.
54 Arrow(ArrowRowset),
55 /// TCP streaming result from a prepared-statement execute.
56 Prepared(hyperdb_api_core::client::AsyncPreparedQueryStream<'conn>),
57}
58
59impl<'conn> AsyncRowset<'conn> {
60 /// Constructs a new async rowset from a TCP `AsyncQueryStream`.
61 pub(crate) fn new(stream: AsyncQueryStream<'conn>) -> Self {
62 Self {
63 inner: AsyncRowsetInner::Tcp(stream),
64 schema_cache: None,
65 _statement_guard: None,
66 }
67 }
68
69 /// Constructs a new async rowset from an already-materialized Arrow
70 /// rowset (gRPC transport).
71 pub(crate) fn from_arrow(arrow_rowset: ArrowRowset) -> Self {
72 Self {
73 inner: AsyncRowsetInner::Arrow(arrow_rowset),
74 schema_cache: None,
75 _statement_guard: None,
76 }
77 }
78
79 /// Constructs a new async rowset from a prepared-statement streaming
80 /// result.
81 pub(crate) fn from_prepared(
82 stream: hyperdb_api_core::client::AsyncPreparedQueryStream<'conn>,
83 ) -> Self {
84 Self {
85 inner: AsyncRowsetInner::Prepared(stream),
86 schema_cache: None,
87 _statement_guard: None,
88 }
89 }
90
91 #[expect(
92 clippy::used_underscore_binding,
93 reason = "underscore-prefixed parameter retained for trait-method signature compatibility"
94 )]
95 /// Attaches an `AsyncPreparedStatement` that should be dropped
96 /// **after** this rowset is consumed. Used by the one-shot
97 /// prepare+execute path inside
98 /// [`crate::AsyncConnection::query_params`].
99 pub(crate) fn with_statement_guard(
100 mut self,
101 statement: hyperdb_api_core::client::AsyncPreparedStatement,
102 ) -> Self {
103 self._statement_guard = Some(statement);
104 self
105 }
106
107 /// Returns the schema (column metadata) for the result set.
108 ///
109 /// For TCP this returns `None` until the first chunk has been
110 /// fetched (the `RowDescription` is the first message of the
111 /// stream). For Arrow it is available immediately.
112 #[must_use]
113 pub fn schema(&self) -> Option<ResultSchema> {
114 if let Some(ref cached) = self.schema_cache {
115 return Some((**cached).clone());
116 }
117 self.build_schema()
118 }
119
120 fn build_schema(&self) -> Option<ResultSchema> {
121 match &self.inner {
122 AsyncRowsetInner::Tcp(stream) => stream.schema().map(|cols| {
123 let columns = cols
124 .iter()
125 .enumerate()
126 .map(|(idx, col)| {
127 let sql_type =
128 SqlType::from_oid_and_modifier(col.type_oid().0, col.type_modifier());
129 ResultColumn::new(col.name(), sql_type, idx)
130 })
131 .collect();
132 ResultSchema::from_columns(columns)
133 }),
134 AsyncRowsetInner::Prepared(stream) => {
135 let cols = stream.schema();
136 let columns = cols
137 .iter()
138 .enumerate()
139 .map(|(idx, col)| {
140 let sql_type =
141 SqlType::from_oid_and_modifier(col.type_oid().0, col.type_modifier());
142 ResultColumn::new(col.name(), sql_type, idx)
143 })
144 .collect();
145 Some(ResultSchema::from_columns(columns))
146 }
147 AsyncRowsetInner::Arrow(arrow) => {
148 let schema = arrow.schema();
149 let columns = schema
150 .fields()
151 .iter()
152 .enumerate()
153 .map(|(idx, field)| {
154 ResultColumn::new(
155 field.name(),
156 crate::arrow_result::arrow_type_to_sql_type(field.data_type()),
157 idx,
158 )
159 })
160 .collect();
161 Some(ResultSchema::from_columns(columns))
162 }
163 }
164 }
165
166 fn cached_schema_arc(&mut self) -> Option<Arc<ResultSchema>> {
167 if self.schema_cache.is_none() {
168 if let Some(schema) = self.build_schema() {
169 self.schema_cache = Some(Arc::new(schema));
170 }
171 }
172 self.schema_cache.clone()
173 }
174
175 /// Returns the next chunk of rows, or `None` when the stream is
176 /// exhausted.
177 ///
178 /// # Errors
179 ///
180 /// - Returns [`crate::Error::Client`] if the server sends an `ErrorResponse`
181 /// while streaming the result set.
182 /// - Returns [`crate::Error::Io`] on transport-level I/O failures.
183 /// - Returns [`crate::Error::Other`] if an Arrow IPC chunk cannot be decoded.
184 pub async fn next_chunk(&mut self) -> Result<Option<Vec<Row>>> {
185 enum TransportChunk {
186 Tcp(Vec<StreamRow>),
187 Arrow(Arc<RecordBatch>),
188 }
189
190 let chunk_opt: Option<TransportChunk> = match &mut self.inner {
191 AsyncRowsetInner::Tcp(stream) => stream.next_chunk().await?.map(TransportChunk::Tcp),
192 AsyncRowsetInner::Arrow(arrow) => arrow
193 .next_chunk()?
194 .map(|chunk| TransportChunk::Arrow(Arc::new(chunk.into_batch()))),
195 AsyncRowsetInner::Prepared(stream) => {
196 stream.next_chunk().await?.map(TransportChunk::Tcp)
197 }
198 };
199
200 let Some(chunk) = chunk_opt else {
201 return Ok(None);
202 };
203
204 let schema = self.cached_schema_arc();
205 let rows = match chunk {
206 TransportChunk::Tcp(stream_rows) => stream_rows
207 .into_iter()
208 .map(|row| Row::from_tcp(row, schema.clone()))
209 .collect(),
210 TransportChunk::Arrow(batch) => (0..batch.num_rows())
211 .map(|row_index| Row::from_arrow(Arc::clone(&batch), row_index, schema.clone()))
212 .collect(),
213 };
214 Ok(Some(rows))
215 }
216
217 /// Collects every remaining row into a `Vec`. Consumes the rowset.
218 ///
219 /// # Errors
220 ///
221 /// Returns the first error produced by [`next_chunk`](Self::next_chunk)
222 /// while draining the stream.
223 pub async fn collect_rows(mut self) -> Result<Vec<Row>> {
224 let mut all = Vec::new();
225 while let Some(chunk) = self.next_chunk().await? {
226 all.extend(chunk);
227 }
228 Ok(all)
229 }
230
231 /// Collects the first column of every row, preserving NULL as `None`.
232 ///
233 /// # Errors
234 ///
235 /// Returns the first error produced by [`next_chunk`](Self::next_chunk).
236 /// SQL `NULL` cells yield `Option::None` entries, not errors.
237 pub async fn collect_column<T: RowValue>(mut self) -> Result<Vec<Option<T>>> {
238 let mut values = Vec::new();
239 while let Some(chunk) = self.next_chunk().await? {
240 for row in chunk {
241 values.push(row.get::<T>(0));
242 }
243 }
244 Ok(values)
245 }
246
247 /// Collects the first column of every row, dropping NULLs.
248 ///
249 /// # Errors
250 ///
251 /// Returns the first error produced by
252 /// [`collect_column`](Self::collect_column).
253 pub async fn collect_column_non_null<T: RowValue>(self) -> Result<Vec<T>> {
254 Ok(self
255 .collect_column::<T>()
256 .await?
257 .into_iter()
258 .flatten()
259 .collect())
260 }
261
262 /// Returns the first row of the result set, or `None` if the
263 /// result is empty. Consumes the rowset.
264 ///
265 /// # Errors
266 ///
267 /// Returns the error from [`next_chunk`](Self::next_chunk). An empty
268 /// result yields `Ok(None)`.
269 pub async fn first_row(mut self) -> Result<Option<Row>> {
270 if let Some(chunk) = self.next_chunk().await? {
271 Ok(chunk.into_iter().next())
272 } else {
273 Ok(None)
274 }
275 }
276
277 /// Returns the first row, or an error if the result set is empty.
278 ///
279 /// # Errors
280 ///
281 /// - Returns the error from [`first_row`](Self::first_row).
282 /// - Returns [`crate::Error::Other`] with message `"Query returned no rows"`
283 /// if the result set is empty.
284 pub async fn require_first_row(self) -> Result<Row> {
285 self.first_row()
286 .await?
287 .ok_or_else(|| crate::error::Error::new("Query returned no rows"))
288 }
289
290 /// Returns the first column of the first row as `Option<T>`, or an
291 /// error if the result set is empty.
292 ///
293 /// # Errors
294 ///
295 /// Returns the error from
296 /// [`require_first_row`](Self::require_first_row). SQL `NULL` in the
297 /// single cell yields `Ok(None)`.
298 pub async fn scalar<T: RowValue>(self) -> Result<Option<T>> {
299 Ok(self.require_first_row().await?.get(0))
300 }
301
302 /// Returns the first column of the first row as `T`, or an error
303 /// if the result set is empty *or* the value is NULL.
304 ///
305 /// # Errors
306 ///
307 /// - Returns the error from [`scalar`](Self::scalar).
308 /// - Returns [`crate::Error::Other`] with message `"Scalar query returned NULL"`
309 /// if the single cell is SQL `NULL`.
310 pub async fn require_scalar<T: RowValue>(self) -> Result<T> {
311 self.scalar()
312 .await?
313 .ok_or_else(|| crate::error::Error::new("Scalar query returned NULL"))
314 }
315}