hyperdb_api/result.rs
1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Query result handling with type-safe value access.
5//!
6//! This module provides types for working with query results:
7//! - [`Rowset`] — Streaming result set with memory-efficient chunked iteration
8//! - [`RowIterator`] — C++-like iterator for simple row-by-row processing
9//! - [`ResultSchema`] — Column metadata (names and types)
10//!
11//! # Streaming Design
12//!
13//! Query results are streamed from the server in chunks of up to
14//! [`DEFAULT_BINARY_CHUNK_SIZE`] rows (64K). Only one chunk is held in memory
15//! at a time, so memory usage is `O(chunk_size)` regardless of total result
16//! size — safe for billion-row results.
17//!
18//! # Iteration Patterns
19//!
20//! Two patterns are available, both streaming with constant memory:
21//!
22//! ## Pattern 1: Chunked (`next_chunk()`) — batch processing
23//!
24//! Best for high-throughput scenarios. Error checking happens once per chunk
25//! (~64K rows), and you get direct `Vec<Row>` iteration with good cache
26//! locality. Natural for batch operations, vectorized processing, or
27//! parallelizing across chunks.
28//!
29//! ```no_run
30//! # use hyperdb_api::{Connection, CreateMode, Result};
31//! # fn example(conn: &Connection) -> Result<()> {
32//! let mut result = conn.execute_query("SELECT * FROM table")?;
33//! while let Some(chunk) = result.next_chunk()? {
34//! for row in &chunk {
35//! let id: Option<i32> = row.get(0);
36//! let value: Option<f64> = row.get(1);
37//! }
38//! }
39//! # Ok(())
40//! # }
41//! ```
42//!
43//! ## Pattern 2: Iterator (`rows()`) — simple row-by-row
44//!
45//! Best for simple iteration where you process one row at a time. Each item
46//! is `Result<Row>` since chunk fetches can fail, so error checking happens
47//! per-row. The extra iterator wrapper adds slight overhead compared to
48//! `next_chunk()`.
49//!
50//! ```no_run
51//! # use hyperdb_api::{Connection, Result};
52//! # fn example(conn: &Connection) -> Result<()> {
53//! let result = conn.execute_query("SELECT * FROM table")?;
54//! for row in result.rows() {
55//! let row = row?; // Handle potential errors
56//! let id: Option<i32> = row.get(0);
57//! let value: Option<f64> = row.get(1);
58//! }
59//! # Ok(())
60//! # }
61//! ```
62//!
63//! **When to use which:**
64//! - `rows()` — simple iteration, one row at a time, small overhead acceptable
65//! - `next_chunk()` — maximum performance, large result sets, batch operations
66//!
67//! # Type Coercion
68//!
69//! The generic `row.get::<T>()` method supports automatic widening coercion:
70//!
71//! | Request Type | Coerces From |
72//! |---|---|
73//! | `i32` | `i16` |
74//! | `i64` | `i32`, `i16` |
75//! | `f64` | `f32` |
76//!
77//! Direct accessors (`row.get_i32()`, `row.get_f64()`) skip coercion for
78//! slightly better performance when the exact type is known.
79
80use std::sync::Arc;
81
82use arrow::array::Array;
83use arrow::record_batch::RecordBatch;
84use hyperdb_api_core::client::QueryStream;
85use hyperdb_api_core::client::StreamRow;
86use hyperdb_api_core::types::SqlType;
87
88use crate::arrow_result::{ArrowRowset, FromArrowValue};
89use crate::error::Result;
90
91/// Default chunk size for streaming queries (64K rows).
92pub(crate) const DEFAULT_BINARY_CHUNK_SIZE: usize = 65536;
93
94// =============================================================================
95// Row - Unified row type for both TCP and gRPC
96// =============================================================================
97
98/// A row from a query result, providing typed value access.
99///
100/// This type abstracts over the underlying transport (TCP or gRPC),
101/// providing a consistent API for accessing column values regardless
102/// of how the data was retrieved.
103///
104/// # Example
105///
106/// ```no_run
107/// # use hyperdb_api::Result;
108/// # fn example(result: hyperdb_api::Rowset) -> Result<()> {
109/// for row in result.rows() {
110/// let row = row?;
111/// let id: Option<i32> = row.get(0);
112/// let name: Option<String> = row.get(1);
113/// // Or use direct accessors
114/// let value = row.get_f64(2);
115/// }
116/// # Ok(())
117/// # }
118/// ```
119pub struct Row {
120 inner: RowInner,
121 /// Shared schema reference for the parent rowset. Every row
122 /// produced by [`Rowset::next_chunk`] carries this (cloned cheaply
123 /// from an `Arc`) so that metadata-dependent decoders like
124 /// [`Self::get_numeric`] can look up `SqlType` per column without
125 /// the caller plumbing scale through manually. `None` only in the
126 /// unusual case a row is constructed outside `next_chunk` (no such
127 /// path exists in-tree today; the field is `Option` so future
128 /// schemas-unavailable paths remain compilable).
129 schema: Option<Arc<ResultSchema>>,
130}
131
132impl std::fmt::Debug for Row {
133 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
134 f.debug_struct("Row")
135 .field("has_schema", &self.schema.is_some())
136 .finish_non_exhaustive()
137 }
138}
139
140/// Internal per-transport backing for a [`Row`]. Not public: all
141/// consumer-visible API goes through `Row`'s methods, which dispatch
142/// on this enum internally.
143enum RowInner {
144 /// Row from TCP transport (`StreamRow`).
145 Tcp(StreamRow),
146 /// Row from gRPC transport (Arrow-backed).
147 Arrow {
148 /// The record batch containing this row's data.
149 batch: Arc<RecordBatch>,
150 /// Index of this row within the batch.
151 row_index: usize,
152 },
153}
154
155impl Row {
156 /// Construct a TCP-backed row with an attached schema reference.
157 #[inline]
158 pub(crate) fn from_tcp(row: StreamRow, schema: Option<Arc<ResultSchema>>) -> Self {
159 Row {
160 inner: RowInner::Tcp(row),
161 schema,
162 }
163 }
164
165 /// Construct an Arrow-backed row with an attached schema reference.
166 #[inline]
167 pub(crate) fn from_arrow(
168 batch: Arc<RecordBatch>,
169 row_index: usize,
170 schema: Option<Arc<ResultSchema>>,
171 ) -> Self {
172 Row {
173 inner: RowInner::Arrow { batch, row_index },
174 schema,
175 }
176 }
177
178 /// Returns the schema this row belongs to, if attached.
179 ///
180 /// Every row produced by [`Rowset::next_chunk`] has a schema
181 /// attached — so this returns `Some` for any row obtained through
182 /// the public API.
183 #[inline]
184 pub fn schema(&self) -> Option<&ResultSchema> {
185 self.schema.as_deref()
186 }
187
188 /// Returns the `SqlType` of the column at the given index, if the
189 /// schema is attached and the index is in bounds.
190 ///
191 /// Useful for metadata-dependent decoders like [`Self::get_numeric`]
192 /// that need per-column precision and scale. Most callers reach for
193 /// [`Self::get`] / [`Self::try_get`] instead, which handle this
194 /// lookup internally via the [`RowValue`] trait.
195 #[inline]
196 pub fn sql_type(&self, idx: usize) -> Option<SqlType> {
197 let schema = self.schema.as_deref()?;
198 if idx < schema.column_count() {
199 Some(schema.column(idx).sql_type())
200 } else {
201 None
202 }
203 }
204
205 /// Gets a typed value at the given column index.
206 ///
207 /// # Example
208 ///
209 /// ```no_run
210 /// # use hyperdb_api::Row;
211 /// # fn example(row: &Row) {
212 /// let id: Option<i32> = row.get(0);
213 /// let name: Option<String> = row.get(1);
214 /// # }
215 /// ```
216 #[inline]
217 pub fn get<T: RowValue>(&self, idx: usize) -> Option<T> {
218 T::from_row(self, idx)
219 }
220
221 /// Gets a typed value at the given column index, returning a `Result`
222 /// with a descriptive error on failure.
223 ///
224 /// Use this in [`FromRow`] implementations for better error messages
225 /// than bare `row.get(idx).ok_or(...)`.
226 ///
227 /// # Example
228 ///
229 /// Most callers should reach for [`crate::FromRow`] +
230 /// [`crate::RowAccessor`] for typed mapping. `try_get` is the
231 /// underlying positional building block; useful when you need
232 /// indexed access from a hand-rolled loop.
233 ///
234 /// ```no_run
235 /// # use hyperdb_api::{Row, Result};
236 /// # fn read(row: &Row) -> Result<(i32, String)> {
237 /// let id: i32 = row.try_get(0, "id")?;
238 /// let name: String = row.try_get(1, "name")?;
239 /// # Ok((id, name))
240 /// # }
241 /// ```
242 ///
243 /// # Errors
244 ///
245 /// - Returns [`crate::Error::Conversion`] if `idx` is out of bounds for the row's
246 /// column count.
247 /// - Returns [`crate::Error::Conversion`] if the cell is SQL `NULL` or its value
248 /// cannot be decoded as `T`.
249 pub fn try_get<T: RowValue>(&self, idx: usize, column_name: &str) -> crate::error::Result<T> {
250 if idx >= self.column_count() {
251 return Err(crate::error::Error::conversion(format!(
252 "Column index {} ({:?}) out of bounds — row has {} columns",
253 idx,
254 column_name,
255 self.column_count(),
256 )));
257 }
258 self.get::<T>(idx).ok_or_else(|| {
259 crate::error::Error::conversion(format!(
260 "Column {idx} ({column_name:?}) is NULL or has incompatible type",
261 ))
262 })
263 }
264
265 /// Looks up a column by name and returns its value as `T`.
266 ///
267 /// Convenient for hand-coded paths that aren't using
268 /// [`FromRow`]. The lookup is a linear scan over
269 /// [`ResultSchema::column_index`]; for hot paths (many rows × many
270 /// fields), prefer
271 /// [`fetch_one_as`](crate::Connection::fetch_one_as) /
272 /// [`fetch_all_as`](crate::Connection::fetch_all_as), which build
273 /// a cached column-name → index lookup once per query and hand
274 /// every `FromRow` impl a [`RowAccessor`](crate::RowAccessor) that
275 /// reuses it.
276 ///
277 /// # Errors
278 ///
279 /// - [`crate::Error::Column`] with [`crate::ColumnErrorKind::Missing`]
280 /// if no column with `name` exists in the row's schema (or the
281 /// row has no schema attached).
282 /// - [`crate::Error::Conversion`] if the cell is `NULL` or cannot
283 /// be decoded as `T`. (Inherited from [`Self::try_get`].)
284 pub fn get_by_name<T: RowValue>(&self, name: &str) -> crate::error::Result<T> {
285 let idx = self
286 .schema()
287 .and_then(|s| s.column_index(name))
288 .ok_or_else(|| {
289 crate::error::Error::column(name, crate::error::ColumnErrorKind::Missing)
290 })?;
291 self.try_get(idx, name)
292 }
293
294 /// Returns an Arrow column reference, or `None` if the index is out of bounds.
295 ///
296 /// This is a safe wrapper around `batch.column(idx)` that avoids panicking.
297 #[inline]
298 fn arrow_column(batch: &RecordBatch, idx: usize) -> Option<&Arc<dyn Array>> {
299 if idx < batch.num_columns() {
300 Some(batch.column(idx))
301 } else {
302 None
303 }
304 }
305
306 /// Gets an i16 value at the given column index.
307 #[inline]
308 pub fn get_i16(&self, idx: usize) -> Option<i16> {
309 match &self.inner {
310 RowInner::Tcp(row) => row.get_i16(idx),
311 RowInner::Arrow { batch, row_index } => {
312 i16::from_arrow_column(Self::arrow_column(batch, idx)?, *row_index)
313 }
314 }
315 }
316
317 /// Gets an i32 value at the given column index.
318 #[inline]
319 pub fn get_i32(&self, idx: usize) -> Option<i32> {
320 match &self.inner {
321 RowInner::Tcp(row) => row.get_i32(idx),
322 RowInner::Arrow { batch, row_index } => {
323 i32::from_arrow_column(Self::arrow_column(batch, idx)?, *row_index)
324 }
325 }
326 }
327
328 /// Gets an i64 value at the given column index.
329 #[inline]
330 pub fn get_i64(&self, idx: usize) -> Option<i64> {
331 match &self.inner {
332 RowInner::Tcp(row) => row.get_i64(idx),
333 RowInner::Arrow { batch, row_index } => {
334 i64::from_arrow_column(Self::arrow_column(batch, idx)?, *row_index)
335 }
336 }
337 }
338
339 /// Gets an f32 value at the given column index.
340 #[inline]
341 pub fn get_f32(&self, idx: usize) -> Option<f32> {
342 match &self.inner {
343 RowInner::Tcp(row) => row.get_f32(idx),
344 RowInner::Arrow { batch, row_index } => {
345 f32::from_arrow_column(Self::arrow_column(batch, idx)?, *row_index)
346 }
347 }
348 }
349
350 /// Gets an f64 value at the given column index.
351 #[inline]
352 pub fn get_f64(&self, idx: usize) -> Option<f64> {
353 match &self.inner {
354 RowInner::Tcp(row) => row.get_f64(idx),
355 RowInner::Arrow { batch, row_index } => {
356 f64::from_arrow_column(Self::arrow_column(batch, idx)?, *row_index)
357 }
358 }
359 }
360
361 /// Gets a bool value at the given column index.
362 #[inline]
363 pub fn get_bool(&self, idx: usize) -> Option<bool> {
364 match &self.inner {
365 RowInner::Tcp(row) => row.get_bool(idx),
366 RowInner::Arrow { batch, row_index } => {
367 bool::from_arrow_column(Self::arrow_column(batch, idx)?, *row_index)
368 }
369 }
370 }
371
372 /// Gets a String value at the given column index.
373 #[inline]
374 pub fn get_string(&self, idx: usize) -> Option<String> {
375 match &self.inner {
376 RowInner::Tcp(row) => row.get_string(idx),
377 RowInner::Arrow { batch, row_index } => {
378 String::from_arrow_column(Self::arrow_column(batch, idx)?, *row_index)
379 }
380 }
381 }
382
383 /// Checks if the value at the given column is null.
384 #[inline]
385 pub fn is_null(&self, idx: usize) -> bool {
386 match &self.inner {
387 RowInner::Tcp(row) => row.is_null(idx),
388 RowInner::Arrow { batch, row_index } => match Self::arrow_column(batch, idx) {
389 Some(col) => col.is_null(*row_index),
390 None => true,
391 },
392 }
393 }
394
395 /// Returns the number of columns in this row.
396 #[inline]
397 pub fn column_count(&self) -> usize {
398 match &self.inner {
399 RowInner::Tcp(row) => row.column_count(),
400 RowInner::Arrow { batch, .. } => batch.num_columns(),
401 }
402 }
403
404 /// Gets raw bytes at the given column index.
405 ///
406 /// For TCP rows, returns the raw binary data. For Arrow rows, this method
407 /// is not available and returns None.
408 #[inline]
409 pub fn get_bytes(&self, idx: usize) -> Option<Vec<u8>> {
410 match &self.inner {
411 RowInner::Tcp(row) => row.get_bytes(idx).map(<[u8]>::to_vec),
412 RowInner::Arrow { batch, row_index } => {
413 Vec::<u8>::from_arrow_column(Self::arrow_column(batch, idx)?, *row_index)
414 }
415 }
416 }
417
418 /// Gets a Date value at the given column index.
419 #[inline]
420 pub fn get_date(&self, idx: usize) -> Option<hyperdb_api_core::types::Date> {
421 match &self.inner {
422 RowInner::Tcp(row) => row.get(idx),
423 RowInner::Arrow { batch, row_index } => {
424 // Arrow Date32 is days since Unix epoch (1970-01-01)
425 // Hyper Date is days since Hyper epoch (2000-01-01)
426 use arrow::array::Date32Array;
427 let col = Self::arrow_column(batch, idx)?;
428 let arr = col.as_any().downcast_ref::<Date32Array>()?;
429 if arr.is_null(*row_index) {
430 return None;
431 }
432 let unix_days = arr.value(*row_index);
433 // Convert from Unix epoch to Hyper epoch (diff is 10957 days)
434 let hyper_days = unix_days - 10957;
435 Some(hyperdb_api_core::types::Date::from_days(hyper_days))
436 }
437 }
438 }
439
440 /// Gets a Time value at the given column index.
441 #[inline]
442 pub fn get_time(&self, idx: usize) -> Option<hyperdb_api_core::types::Time> {
443 match &self.inner {
444 RowInner::Tcp(row) => row.get(idx),
445 RowInner::Arrow { batch, row_index } => {
446 // Arrow Time64 is microseconds since midnight
447 use arrow::array::Time64MicrosecondArray;
448 let col = Self::arrow_column(batch, idx)?;
449 let arr = col.as_any().downcast_ref::<Time64MicrosecondArray>()?;
450 if arr.is_null(*row_index) {
451 return None;
452 }
453 let micros = u64::try_from(arr.value(*row_index)).ok()?;
454 Some(hyperdb_api_core::types::Time::from_microseconds(micros))
455 }
456 }
457 }
458
459 /// Gets a Timestamp value at the given column index.
460 #[inline]
461 pub fn get_timestamp(&self, idx: usize) -> Option<hyperdb_api_core::types::Timestamp> {
462 match &self.inner {
463 RowInner::Tcp(row) => row.get(idx),
464 RowInner::Arrow { batch, row_index } => {
465 // Arrow Timestamp is microseconds since Unix epoch
466 // Hyper Timestamp is microseconds since Hyper epoch (2000-01-01)
467 use arrow::array::TimestampMicrosecondArray;
468 let col = Self::arrow_column(batch, idx)?;
469 let arr = col.as_any().downcast_ref::<TimestampMicrosecondArray>()?;
470 if arr.is_null(*row_index) {
471 return None;
472 }
473 let unix_micros = arr.value(*row_index);
474 // Convert from Unix epoch to Hyper epoch
475 // 2000-01-01 is 946684800 seconds after 1970-01-01
476 let hyper_micros = unix_micros - 946_684_800_000_000;
477 Some(hyperdb_api_core::types::Timestamp::from_microseconds(
478 hyper_micros,
479 ))
480 }
481 }
482 }
483
484 /// Gets an `OffsetTimestamp` (TIMESTAMP WITH TIME ZONE) value at the given column index.
485 #[inline]
486 pub fn get_offset_timestamp(
487 &self,
488 idx: usize,
489 ) -> Option<hyperdb_api_core::types::OffsetTimestamp> {
490 match &self.inner {
491 RowInner::Tcp(row) => row.get(idx),
492 RowInner::Arrow { batch, row_index } => {
493 // Arrow TimestampTz is microseconds since Unix epoch with timezone
494 use arrow::array::TimestampMicrosecondArray;
495 let col = Self::arrow_column(batch, idx)?;
496 let arr = col.as_any().downcast_ref::<TimestampMicrosecondArray>()?;
497 if arr.is_null(*row_index) {
498 return None;
499 }
500 let unix_micros = arr.value(*row_index);
501 let hyper_micros = unix_micros - 946_684_800_000_000;
502 let ts = hyperdb_api_core::types::Timestamp::from_microseconds(hyper_micros);
503 Some(hyperdb_api_core::types::OffsetTimestamp::new(ts, 0))
504 }
505 }
506 }
507
508 /// Gets an Interval value at the given column index.
509 #[inline]
510 pub fn get_interval(&self, idx: usize) -> Option<hyperdb_api_core::types::Interval> {
511 match &self.inner {
512 RowInner::Tcp(row) => row.get(idx),
513 RowInner::Arrow { batch, row_index } => {
514 // Arrow MonthDayNano interval → Hyper Interval
515 use arrow::array::IntervalMonthDayNanoArray;
516 let col = Self::arrow_column(batch, idx)?;
517 let arr = col.as_any().downcast_ref::<IntervalMonthDayNanoArray>()?;
518 if arr.is_null(*row_index) {
519 return None;
520 }
521 let v = arr.value(*row_index);
522 let micros = v.nanoseconds / 1000;
523 Some(hyperdb_api_core::types::Interval::new(
524 v.months, v.days, micros,
525 ))
526 }
527 }
528 }
529
530 /// Gets a `NUMERIC` value at the given column index.
531 ///
532 /// This is the metadata-aware variant of [`Self::get_bytes`] +
533 /// [`hyperdb_api_core::types::Numeric::from_binary_with_scale`]: it looks up
534 /// the column's `SqlType::Numeric { scale, .. }` from the attached
535 /// schema and decodes the wire bytes with that scale, handling
536 /// both of Hyper's NUMERIC wire forms transparently:
537 ///
538 /// - **8 bytes** (i64) when the column's declared precision ≤ 18
539 /// (Hyper's `Type::Numeric`). This is what aggregates like
540 /// `AVG(INTEGER)` return as `Numeric(16, 6)`.
541 /// - **16 bytes** (i128) when declared precision > 18
542 /// (Hyper's `Type::BigNumeric`).
543 ///
544 /// Returns `None` if any of the following are true: the value is
545 /// NULL, the schema isn't attached (which never happens for rows
546 /// obtained through [`Rowset::next_chunk`]), the column at `idx`
547 /// isn't `NUMERIC`, or the bytes can't be decoded.
548 ///
549 /// For non-TCP (Arrow/gRPC) rows, this path falls back to reading
550 /// the Arrow-native `Decimal128` / `Decimal256` columns; the scale
551 /// lives in the Arrow type descriptor in that case.
552 pub fn get_numeric(&self, idx: usize) -> Option<hyperdb_api_core::types::Numeric> {
553 match &self.inner {
554 RowInner::Tcp(_) => {
555 // TCP: decode raw bytes with scale from the schema.
556 //
557 // `SqlType::Numeric::scale` is `u32` and Hyper's own
558 // `NUMERIC(p, s)` caps at `p ≤ 38` (per
559 // `hyper/rts/type/Type.hpp`), so any legitimate scale
560 // fits easily in `u8`. But `scale as u8` silently
561 // truncates the high bits for values > 255, and a
562 // malformed server response or a bug in typemod
563 // parsing could deliver such a value — at which point
564 // we'd produce a `Numeric` with the wrong (truncated)
565 // scale and no error signal. `u8::try_from` returns
566 // `Err` for out-of-range, `?` propagates `None`, and
567 // the caller gets a clean "no value" instead of
568 // silent corruption. Symmetric with the Arrow
569 // negative-scale guard a few lines below.
570 let scale: u8 = match self.sql_type(idx)? {
571 SqlType::Numeric { scale, .. } => u8::try_from(scale).ok()?,
572 _ => return None,
573 };
574 let bytes = self.get_bytes(idx)?;
575 hyperdb_api_core::types::Numeric::from_binary_with_scale(&bytes, scale).ok()
576 }
577 RowInner::Arrow { batch, row_index } => {
578 use arrow::array::{Decimal128Array, Decimal256Array};
579 use arrow::datatypes::DataType as ArrowType;
580 let col = Self::arrow_column(batch, idx)?;
581 // Arrow stores decimal precision/scale in the type
582 // descriptor itself, so there's no separate schema
583 // lookup needed on this path.
584 //
585 // Note: Arrow's decimal scale is `i8` and can legally
586 // be negative (negative scale = "value is multiplied
587 // by 10^abs(scale)", e.g. scale=-2 on raw=5 renders
588 // as 500). Hyper's `Numeric` uses `u8` scale and has
589 // no representation for the negative-scale
590 // multiplier. Rather than silently dropping the
591 // multiplier (which would make raw=5 display as 5
592 // instead of 500), we surface it as "no value" via
593 // `try_into` + `?`. Negative-scale decimals don't
594 // originate from Hyper's own gRPC encoder — but
595 // `Row` can be fed from externally-loaded Arrow
596 // files, so defensive handling costs nothing and
597 // prevents a silent-corruption failure mode.
598 match col.data_type() {
599 ArrowType::Decimal128(_precision, scale) => {
600 let scale_u8: u8 = (*scale).try_into().ok()?;
601 let arr = col.as_any().downcast_ref::<Decimal128Array>()?;
602 if arr.is_null(*row_index) {
603 return None;
604 }
605 let raw = arr.value(*row_index); // i128
606 Some(hyperdb_api_core::types::Numeric::new(raw, scale_u8))
607 }
608 ArrowType::Decimal256(_precision, scale) => {
609 // i256 from Arrow; Hyper NUMERIC caps at i128
610 // (precision ≤ 38). Narrow to i128; this is
611 // lossless for any value Hyper would actually
612 // produce. Values outside that range are a
613 // server-side contract violation.
614 let scale_u8: u8 = (*scale).try_into().ok()?;
615 let arr = col.as_any().downcast_ref::<Decimal256Array>()?;
616 if arr.is_null(*row_index) {
617 return None;
618 }
619 let raw = arr.value(*row_index);
620 let as_i128: i128 = raw.to_i128()?;
621 Some(hyperdb_api_core::types::Numeric::new(as_i128, scale_u8))
622 }
623 _ => None,
624 }
625 }
626 }
627 }
628}
629
630/// Trait for types that can be extracted from a Row.
631pub trait RowValue: Sized {
632 /// Extract a value from a Row at the given column index.
633 fn from_row(row: &Row, idx: usize) -> Option<Self>;
634}
635
636impl RowValue for i16 {
637 #[inline]
638 fn from_row(row: &Row, idx: usize) -> Option<Self> {
639 row.get_i16(idx)
640 }
641}
642
643impl RowValue for i32 {
644 #[inline]
645 fn from_row(row: &Row, idx: usize) -> Option<Self> {
646 row.get_i32(idx).or_else(|| row.get_i16(idx).map(i32::from))
647 }
648}
649
650impl RowValue for i64 {
651 #[inline]
652 fn from_row(row: &Row, idx: usize) -> Option<Self> {
653 row.get_i64(idx)
654 .or_else(|| row.get_i32(idx).map(i64::from))
655 .or_else(|| row.get_i16(idx).map(i64::from))
656 }
657}
658
659impl RowValue for f32 {
660 #[inline]
661 fn from_row(row: &Row, idx: usize) -> Option<Self> {
662 row.get_f32(idx)
663 }
664}
665
666impl RowValue for f64 {
667 #[inline]
668 fn from_row(row: &Row, idx: usize) -> Option<Self> {
669 row.get_f64(idx).or_else(|| row.get_f32(idx).map(f64::from))
670 }
671}
672
673impl RowValue for bool {
674 #[inline]
675 fn from_row(row: &Row, idx: usize) -> Option<Self> {
676 row.get_bool(idx)
677 }
678}
679
680impl RowValue for String {
681 #[inline]
682 fn from_row(row: &Row, idx: usize) -> Option<Self> {
683 row.get_string(idx)
684 }
685}
686
687impl RowValue for Vec<u8> {
688 #[inline]
689 fn from_row(row: &Row, idx: usize) -> Option<Self> {
690 row.get_bytes(idx)
691 }
692}
693
694impl RowValue for hyperdb_api_core::types::Date {
695 #[inline]
696 fn from_row(row: &Row, idx: usize) -> Option<Self> {
697 row.get_date(idx)
698 }
699}
700
701impl RowValue for hyperdb_api_core::types::Time {
702 #[inline]
703 fn from_row(row: &Row, idx: usize) -> Option<Self> {
704 row.get_time(idx)
705 }
706}
707
708impl RowValue for hyperdb_api_core::types::Timestamp {
709 #[inline]
710 fn from_row(row: &Row, idx: usize) -> Option<Self> {
711 row.get_timestamp(idx)
712 }
713}
714
715impl RowValue for hyperdb_api_core::types::OffsetTimestamp {
716 #[inline]
717 fn from_row(row: &Row, idx: usize) -> Option<Self> {
718 row.get_offset_timestamp(idx)
719 }
720}
721
722impl RowValue for hyperdb_api_core::types::Interval {
723 #[inline]
724 fn from_row(row: &Row, idx: usize) -> Option<Self> {
725 row.get_interval(idx)
726 }
727}
728
729impl RowValue for hyperdb_api_core::types::Numeric {
730 /// Unlike every other `RowValue` impl, `Numeric` decode requires
731 /// per-column metadata (scale + wire-form width) that lives on the
732 /// row's attached `ResultSchema`. [`Row::get_numeric`] does the
733 /// lookup; this impl delegates there so generic `row.get::<Numeric>()`
734 /// / `row.try_get::<Numeric>(idx, "name")` call sites work the same
735 /// as every other type.
736 #[inline]
737 fn from_row(row: &Row, idx: usize) -> Option<Self> {
738 row.get_numeric(idx)
739 }
740}
741
742// =============================================================================
743// FromRow - Struct mapping trait
744// =============================================================================
745
746/// Trait for types that can be constructed from a database row.
747///
748/// Used by [`Connection::fetch_one_as`](crate::Connection::fetch_one_as)
749/// and [`Connection::fetch_all_as`](crate::Connection::fetch_all_as)
750/// to map query results into typed structs. Implementations receive
751/// a [`RowAccessor`](crate::RowAccessor), which provides name-based
752/// access via a column-name → index lookup built once per query.
753///
754/// # Recommended: derive
755///
756/// In most cases the `#[derive(FromRow)]` macro handles the mapping
757/// for you — match struct field names to column names automatically,
758/// with `#[hyperdb(rename = "...")]` for cases where they differ:
759///
760/// ```ignore
761/// use hyperdb_api::FromRow;
762/// use hyperdb_api_derive::FromRow; // proc-macro derive
763///
764/// #[derive(FromRow)]
765/// struct User {
766/// id: i32,
767/// name: String,
768/// #[hyperdb(rename = "email_address")]
769/// email: Option<String>,
770/// }
771/// ```
772///
773/// # Hand-written impl
774///
775/// For custom mapping logic (computed fields, multi-column composition,
776/// etc.) implement the trait directly:
777///
778/// ```no_run
779/// use hyperdb_api::{FromRow, RowAccessor, Result};
780///
781/// struct User { id: i32, name: String, active: bool }
782///
783/// impl FromRow for User {
784/// fn from_row(row: RowAccessor<'_>) -> Result<Self> {
785/// Ok(User {
786/// id: row.get("id")?,
787/// name: row.get("name")?,
788/// active: row.get("active")?,
789/// })
790/// }
791/// }
792/// ```
793///
794/// For ad-hoc tuple destructuring of small results, use
795/// [`Row::get`](crate::Row::get) directly — there are no blanket
796/// tuple `FromRow` impls. Define a struct with `#[derive(FromRow)]`
797/// for typed access in `fetch_*_as`.
798pub trait FromRow: Sized {
799 /// Constructs an instance from a database row.
800 ///
801 /// # Errors
802 ///
803 /// Returns an [`Error`](crate::Error) — typically
804 /// [`crate::Error::Column`] — when a required column is missing,
805 /// SQL `NULL`, or cannot be decoded as the expected type.
806 /// Implementations decide the exact failure shape.
807 fn from_row(row: crate::RowAccessor<'_>) -> crate::error::Result<Self>;
808}
809
810// =============================================================================
811// ResultSchema and ResultColumn
812// =============================================================================
813
814/// Metadata about a column in a result schema.
815#[derive(Debug, Clone)]
816pub struct ResultColumn {
817 /// The column name.
818 name: String,
819 /// The SQL type of the column.
820 sql_type: SqlType,
821 /// The column index (0-based).
822 index: usize,
823}
824
825impl ResultColumn {
826 /// Creates a new result column.
827 pub fn new(name: impl Into<String>, sql_type: SqlType, index: usize) -> Self {
828 ResultColumn {
829 name: name.into(),
830 sql_type,
831 index,
832 }
833 }
834
835 /// Returns the column name.
836 #[must_use]
837 pub fn name(&self) -> &str {
838 &self.name
839 }
840
841 /// Returns the SQL type of the column.
842 #[must_use]
843 pub fn sql_type(&self) -> SqlType {
844 self.sql_type
845 }
846
847 /// Returns the column index (0-based).
848 #[must_use]
849 pub fn index(&self) -> usize {
850 self.index
851 }
852}
853
854/// Schema information for a query result.
855///
856/// Provides metadata about the columns returned by a query, including
857/// column names and types.
858#[derive(Debug, Clone, Default)]
859pub struct ResultSchema {
860 columns: Vec<ResultColumn>,
861}
862
863impl ResultSchema {
864 /// Creates a new empty result schema.
865 #[must_use]
866 pub fn new() -> Self {
867 ResultSchema {
868 columns: Vec::new(),
869 }
870 }
871
872 /// Creates a result schema from column definitions.
873 #[must_use]
874 pub fn from_columns(columns: Vec<ResultColumn>) -> Self {
875 ResultSchema { columns }
876 }
877
878 /// Adds a column to the schema.
879 pub fn add_column(&mut self, name: impl Into<String>, sql_type: SqlType) {
880 let index = self.columns.len();
881 self.columns.push(ResultColumn::new(name, sql_type, index));
882 }
883
884 /// Returns the number of columns.
885 #[must_use]
886 pub fn column_count(&self) -> usize {
887 self.columns.len()
888 }
889
890 /// Returns all columns.
891 #[must_use]
892 pub fn columns(&self) -> &[ResultColumn] {
893 &self.columns
894 }
895
896 /// Returns the column at the given index.
897 ///
898 /// # Panics
899 ///
900 /// Panics if the index is out of bounds.
901 #[must_use]
902 pub fn column(&self, index: usize) -> &ResultColumn {
903 &self.columns[index]
904 }
905
906 /// Returns the column with the given name, if it exists.
907 #[must_use]
908 pub fn column_by_name(&self, name: &str) -> Option<&ResultColumn> {
909 self.columns.iter().find(|c| c.name == name)
910 }
911
912 /// Returns the index of the column with the given name, if it exists.
913 #[must_use]
914 pub fn column_index(&self, name: &str) -> Option<usize> {
915 self.columns.iter().position(|c| c.name == name)
916 }
917}
918
919// =============================================================================
920// Rowset (Streaming)
921// =============================================================================
922
923/// A streaming result set from a SQL query.
924///
925/// `Rowset` provides memory-efficient streaming access to query results.
926/// Results are fetched on-demand in chunks, keeping memory usage constant
927/// regardless of result set size. This makes it safe for any result size,
928/// from a single row to billions of rows.
929///
930/// # Example
931///
932/// ```no_run
933/// # use hyperdb_api::{Connection, Result};
934/// # fn example(conn: &Connection) -> Result<()> {
935/// let mut result = conn.execute_query("SELECT * FROM big_table")?;
936/// while let Some(chunk) = result.next_chunk()? {
937/// for row in &chunk {
938/// // Generic typed access (like C++ row.get<T>())
939/// let id: Option<i32> = row.get(0);
940/// let value: Option<f64> = row.get(1);
941///
942/// // Or direct accessors for performance
943/// let id = row.get_i32(0);
944/// let value = row.get_f64(1);
945/// }
946/// }
947/// # Ok(())
948/// # }
949/// ```
950///
951/// # Memory Behavior
952///
953/// - Only one chunk is held in memory at a time
954/// - Default chunk size is 64K rows (~few MB depending on row width)
955/// - Memory usage is `O(chunk_size)`, not `O(total_rows)`
956/// - Safe for billion-row results
957pub struct Rowset<'conn> {
958 inner: RowsetInner<'conn>,
959 /// Cached schema for this rowset, built lazily the first time
960 /// [`Self::next_chunk`] produces a non-empty chunk (TCP path — at
961 /// which point the `RowDescription` message has been observed) or
962 /// on first Arrow chunk (gRPC path). Stored as `Arc` so each row
963 /// produced by `next_chunk` gets a cheap ref-count clone — that's
964 /// how metadata-dependent decoders like [`Row::get_numeric`] reach
965 /// the column's `SqlType` without the caller plumbing scale
966 /// through manually.
967 schema_cache: Option<Arc<ResultSchema>>,
968 /// For one-shot prepared statements (the internal
969 /// [`crate::Connection::query_params`] path), hold the statement
970 /// handle here so its `Drop`-time `close_statement` fires *after*
971 /// the rowset releases its connection lock. Dropping the statement
972 /// before the rowset would deadlock because the inner stream owns
973 /// the connection's `MutexGuard`.
974 _statement_guard: Option<hyperdb_api_core::client::OwnedPreparedStatement>,
975}
976
977impl std::fmt::Debug for Rowset<'_> {
978 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
979 f.debug_struct("Rowset")
980 .field("has_schema_cache", &self.schema_cache.is_some())
981 .finish_non_exhaustive()
982 }
983}
984
985/// Internal enum to hold either TCP stream or Arrow data.
986enum RowsetInner<'conn> {
987 /// TCP streaming result (uses `QueryStream`).
988 Tcp(QueryStream<'conn>),
989 /// Arrow-based result from gRPC (all data loaded).
990 Arrow(ArrowRowset),
991 /// TCP streaming result from a prepared-statement execute.
992 Prepared(hyperdb_api_core::client::PreparedQueryStream<'conn>),
993}
994
995impl<'conn> Rowset<'conn> {
996 /// Creates a new Rowset from a `QueryStream` (TCP).
997 pub(crate) fn new(stream: QueryStream<'conn>) -> Self {
998 Rowset {
999 inner: RowsetInner::Tcp(stream),
1000 schema_cache: None,
1001 _statement_guard: None,
1002 }
1003 }
1004
1005 /// Creates a new Rowset from Arrow IPC data (gRPC).
1006 pub(crate) fn from_arrow(arrow_rowset: ArrowRowset) -> Self {
1007 Rowset {
1008 inner: RowsetInner::Arrow(arrow_rowset),
1009 schema_cache: None,
1010 _statement_guard: None,
1011 }
1012 }
1013
1014 /// Creates a new Rowset from a prepared-statement streaming result.
1015 pub(crate) fn from_prepared(
1016 stream: hyperdb_api_core::client::PreparedQueryStream<'conn>,
1017 ) -> Self {
1018 Rowset {
1019 inner: RowsetInner::Prepared(stream),
1020 schema_cache: None,
1021 _statement_guard: None,
1022 }
1023 }
1024
1025 #[expect(
1026 clippy::used_underscore_binding,
1027 reason = "underscore-prefixed parameter retained for trait-method signature compatibility"
1028 )]
1029 /// Attaches a `OwnedPreparedStatement` that should be dropped
1030 /// **after** this rowset is consumed. Used by the one-shot
1031 /// prepare+execute path inside
1032 /// [`crate::Connection::query_params`] so the statement's
1033 /// Drop-time close doesn't deadlock on the rowset's still-held
1034 /// connection lock.
1035 pub(crate) fn with_statement_guard(
1036 mut self,
1037 statement: hyperdb_api_core::client::OwnedPreparedStatement,
1038 ) -> Self {
1039 self._statement_guard = Some(statement);
1040 self
1041 }
1042
1043 /// Returns the schema (column metadata) for the result set.
1044 ///
1045 /// For TCP connections, the schema is captured from the `RowDescription` message
1046 /// after the first chunk is read. For gRPC connections, the schema is available
1047 /// immediately from the Arrow data.
1048 ///
1049 /// Returns `None` if no data has been read yet (TCP only).
1050 ///
1051 /// # Example
1052 ///
1053 /// ```no_run
1054 /// # use hyperdb_api::{Connection, Result};
1055 /// # fn example(conn: &Connection) -> Result<()> {
1056 /// let mut result = conn.execute_query("SELECT id, name FROM users")?;
1057 /// // Read first chunk to capture schema (TCP) or get it immediately (gRPC)
1058 /// let _ = result.next_chunk()?;
1059 /// if let Some(schema) = result.schema() {
1060 /// for col in schema.columns() {
1061 /// println!("Column: {} ({})", col.name(), col.sql_type());
1062 /// }
1063 /// }
1064 /// # Ok(())
1065 /// # }
1066 /// ```
1067 #[must_use]
1068 pub fn schema(&self) -> Option<ResultSchema> {
1069 // Fast path: cache already populated by a previous call or by
1070 // `next_chunk`. Clone from the Arc so external callers get an
1071 // owned value independent of internal lifetimes.
1072 if let Some(ref cached) = self.schema_cache {
1073 return Some((**cached).clone());
1074 }
1075 // Slow path: schema hasn't been materialized yet. Build it from
1076 // the transport without populating the cache — `schema()` takes
1077 // `&self`, so mutation isn't possible here. `next_chunk` does
1078 // the caching pass for the row-construction hot path; if a
1079 // caller really only wants the schema and never touches rows,
1080 // they pay one build per call but this is rarely the pattern.
1081 self.build_schema()
1082 }
1083
1084 /// Compute the current schema without populating the cache.
1085 ///
1086 /// Pulls column metadata from the underlying transport and
1087 /// constructs a fresh `ResultSchema`. TCP builds `SqlType` via
1088 /// [`SqlType::from_oid_and_modifier`] so
1089 /// `NUMERIC(precision, scale)` and `VARCHAR(n)` recover their
1090 /// declared parameters from the `RowDescription` `atttypmod`
1091 /// field — dropping the modifier (which bare
1092 /// [`SqlType::from_oid`] does) silently turns every `NUMERIC`
1093 /// into `(precision: 0, scale: 0)` and corrupts decimal decodes
1094 /// downstream. Arrow comes pre-typed via
1095 /// `arrow_type_to_sql_type`.
1096 fn build_schema(&self) -> Option<ResultSchema> {
1097 match &self.inner {
1098 RowsetInner::Tcp(stream) => stream.schema().map(|cols| {
1099 let columns = cols
1100 .iter()
1101 .enumerate()
1102 .map(|(idx, col)| {
1103 let sql_type =
1104 SqlType::from_oid_and_modifier(col.type_oid().0, col.type_modifier());
1105 ResultColumn::new(col.name(), sql_type, idx)
1106 })
1107 .collect();
1108 ResultSchema::from_columns(columns)
1109 }),
1110 RowsetInner::Arrow(arrow) => {
1111 let schema = arrow.schema();
1112 let columns = schema
1113 .fields()
1114 .iter()
1115 .enumerate()
1116 .map(|(idx, field)| {
1117 ResultColumn::new(
1118 field.name(),
1119 crate::arrow_result::arrow_type_to_sql_type(field.data_type()),
1120 idx,
1121 )
1122 })
1123 .collect();
1124 Some(ResultSchema::from_columns(columns))
1125 }
1126 // Prepared statements: schema was captured at prepare time,
1127 // so it is always available immediately.
1128 RowsetInner::Prepared(stream) => {
1129 let cols = stream.schema();
1130 let columns = cols
1131 .iter()
1132 .enumerate()
1133 .map(|(idx, col)| {
1134 let sql_type =
1135 SqlType::from_oid_and_modifier(col.type_oid().0, col.type_modifier());
1136 ResultColumn::new(col.name(), sql_type, idx)
1137 })
1138 .collect();
1139 Some(ResultSchema::from_columns(columns))
1140 }
1141 }
1142 }
1143
1144 /// Populate `schema_cache` if not yet set, then return an `Arc`
1145 /// clone of the cached schema for row construction. Called by
1146 /// `next_chunk` so every row produced gets a cheap schema
1147 /// reference without re-building the `ResultSchema` per chunk.
1148 fn cached_schema_arc(&mut self) -> Option<Arc<ResultSchema>> {
1149 if self.schema_cache.is_none() {
1150 if let Some(schema) = self.build_schema() {
1151 self.schema_cache = Some(Arc::new(schema));
1152 }
1153 }
1154 self.schema_cache.clone()
1155 }
1156
1157 /// Returns the next chunk of rows from the result set.
1158 ///
1159 /// Each chunk contains up to `chunk_size` rows (default 64K).
1160 /// Returns `Ok(None)` when all rows have been consumed.
1161 ///
1162 /// # Example
1163 ///
1164 /// ```no_run
1165 /// # use hyperdb_api::{Rowset, Result};
1166 /// # fn example(mut result: Rowset) -> Result<()> {
1167 /// while let Some(chunk) = result.next_chunk()? {
1168 /// for row in &chunk {
1169 /// let id: Option<i32> = row.get(0); // Generic typed access
1170 /// let value = row.get_f64(1); // Direct accessor
1171 /// }
1172 /// }
1173 /// # Ok(())
1174 /// # }
1175 /// ```
1176 ///
1177 /// # Errors
1178 ///
1179 /// - Returns [`crate::Error::Server`] if the server sends an `ErrorResponse`
1180 /// while streaming the result set.
1181 /// - Returns [`crate::Error::Io`] on transport-level I/O failures.
1182 /// - Returns [`crate::Error::Conversion`] if an Arrow IPC chunk cannot be decoded.
1183 pub fn next_chunk(&mut self) -> Result<Option<Vec<Row>>> {
1184 // Pull the next raw chunk from the underlying transport first;
1185 // on TCP, this is what makes the `RowDescription` bytes arrive
1186 // so we can cache the schema in the step below. We collect a
1187 // `TransportChunk` instead of a `Vec<Row>` directly so the
1188 // schema can be attached after we've populated the cache.
1189 enum TransportChunk {
1190 Tcp(Vec<StreamRow>),
1191 Arrow(Arc<RecordBatch>),
1192 }
1193
1194 let chunk_opt: Option<TransportChunk> = match &mut self.inner {
1195 RowsetInner::Tcp(stream) => stream.next_chunk()?.map(TransportChunk::Tcp),
1196 RowsetInner::Arrow(arrow) => arrow
1197 .next_chunk()?
1198 .map(|chunk| TransportChunk::Arrow(Arc::new(chunk.into_batch()))),
1199 RowsetInner::Prepared(stream) => stream.next_chunk()?.map(TransportChunk::Tcp),
1200 };
1201
1202 let Some(chunk) = chunk_opt else {
1203 return Ok(None);
1204 };
1205
1206 // Populate the schema cache if not already set, then clone the
1207 // Arc into each Row so `Row::get::<Numeric>` and friends can
1208 // look up per-column precision / scale without any caller
1209 // having to thread the schema through manually.
1210 let schema = self.cached_schema_arc();
1211 let rows = match chunk {
1212 TransportChunk::Tcp(stream_rows) => stream_rows
1213 .into_iter()
1214 .map(|row| Row::from_tcp(row, schema.clone()))
1215 .collect(),
1216 TransportChunk::Arrow(batch) => (0..batch.num_rows())
1217 .map(|row_index| Row::from_arrow(Arc::clone(&batch), row_index, schema.clone()))
1218 .collect(),
1219 };
1220 Ok(Some(rows))
1221 }
1222
1223 /// Returns an iterator over all rows in the result set.
1224 ///
1225 /// This provides a C++-like iteration experience while maintaining
1226 /// Rust's explicit error handling. Chunks are fetched internally
1227 /// as needed, keeping memory usage constant.
1228 ///
1229 /// # Example
1230 ///
1231 /// ```no_run
1232 /// # use hyperdb_api::{Connection, Result};
1233 /// # fn example(conn: &Connection) -> Result<()> {
1234 /// // Simple iteration (like C++)
1235 /// let result = conn.execute_query("SELECT * FROM users")?;
1236 /// for row in result.rows() {
1237 /// let row = row?; // Handle potential network errors
1238 /// let id: Option<i32> = row.get(0);
1239 /// let name: Option<String> = row.get(1);
1240 /// println!("User: {:?} - {:?}", id, name);
1241 /// }
1242 /// # Ok(())
1243 /// # }
1244 /// ```
1245 ///
1246 /// # Error Handling
1247 ///
1248 /// Unlike C++ which uses exceptions, Rust requires explicit error handling.
1249 /// Each item in the iterator is a `Result<LightweightRow>` to handle
1250 /// potential network or protocol errors during streaming.
1251 ///
1252 /// # Comparison with `next_chunk()`
1253 ///
1254 /// | Aspect | `rows()` | `next_chunk()` |
1255 /// |--------|----------|----------------|
1256 /// | Syntax | Simpler, C++-like | More verbose |
1257 /// | Error handling | Per-row with `?` | Per-chunk |
1258 /// | Batch ops | Use `.collect()` | Natural |
1259 /// | Best for | Simple iteration | Batch processing |
1260 #[must_use]
1261 pub fn rows(self) -> RowIterator<'conn> {
1262 RowIterator {
1263 rowset: self,
1264 current_iter: Vec::new().into_iter(),
1265 }
1266 }
1267
1268 /// Collects all rows into a Vec.
1269 ///
1270 /// This is a convenience method that handles error collection more elegantly
1271 /// than the standard `collect::<Result<Vec<_>, _>>()` pattern.
1272 ///
1273 /// # Example
1274 ///
1275 /// ```no_run
1276 /// # use hyperdb_api::{Connection, Result};
1277 /// # fn example(conn: &Connection) -> Result<()> {
1278 /// let result = conn.execute_query("SELECT id, name FROM users")?;
1279 /// let rows = result.collect_rows()?; // Much cleaner than collect::<Result<Vec<_>, _>>()
1280 ///
1281 /// for row in rows {
1282 /// let id: Option<i32> = row.get(0);
1283 /// let name: Option<String> = row.get(1);
1284 /// println!("User: {:?} - {:?}", id, name);
1285 /// }
1286 /// # Ok(())
1287 /// # }
1288 /// ```
1289 ///
1290 /// # Errors
1291 ///
1292 /// Returns the first error produced by [`next_chunk`](Self::next_chunk)
1293 /// while draining the stream (transport I/O failure or server-side
1294 /// error).
1295 pub fn collect_rows(self) -> crate::error::Result<Vec<Row>> {
1296 self.rows().collect::<crate::error::Result<Vec<_>>>()
1297 }
1298
1299 /// Collects the first column of each row into a Vec.
1300 ///
1301 /// This is useful for single-column queries or when you only need one column.
1302 ///
1303 /// # Example
1304 ///
1305 /// ```no_run
1306 /// # use hyperdb_api::{Connection, Result};
1307 /// # fn example(conn: &Connection) -> Result<()> {
1308 /// let result = conn.execute_query("SELECT name FROM users")?;
1309 /// let names: Vec<Option<String>> = result.collect_column()?;
1310 ///
1311 /// for name in names {
1312 /// if let Some(name) = name {
1313 /// println!("User: {}", name);
1314 /// }
1315 /// }
1316 /// # Ok(())
1317 /// # }
1318 /// ```
1319 ///
1320 /// # Errors
1321 ///
1322 /// Returns the first streaming error from
1323 /// [`next_chunk`](Self::next_chunk). SQL `NULL` cells yield
1324 /// `Option::None` entries, not errors.
1325 pub fn collect_column<T: crate::result::RowValue>(
1326 self,
1327 ) -> crate::error::Result<Vec<Option<T>>> {
1328 self.rows()
1329 .map(|row| row.map(|r| r.get::<T>(0)))
1330 .collect::<crate::error::Result<Vec<_>>>()
1331 }
1332
1333 /// Collects the first column, filtering out NULL values.
1334 ///
1335 /// This is useful when you know the column doesn't contain NULLs or want to ignore them.
1336 ///
1337 /// # Example
1338 ///
1339 /// ```no_run
1340 /// # use hyperdb_api::{Connection, Result};
1341 /// # fn example(conn: &Connection) -> Result<()> {
1342 /// let result = conn.execute_query("SELECT name FROM users WHERE name IS NOT NULL")?;
1343 /// let names: Vec<String> = result.collect_column_non_null()?;
1344 ///
1345 /// for name in names {
1346 /// println!("User: {}", name); // No need to handle Option
1347 /// }
1348 /// # Ok(())
1349 /// # }
1350 /// ```
1351 ///
1352 /// # Errors
1353 ///
1354 /// Returns the first streaming error from
1355 /// [`collect_column`](Self::collect_column).
1356 pub fn collect_column_non_null<T: crate::result::RowValue>(
1357 self,
1358 ) -> crate::error::Result<Vec<T>> {
1359 Ok(self.collect_column::<T>()?.into_iter().flatten().collect())
1360 }
1361
1362 /// Gets the first row of the result set.
1363 ///
1364 /// This is useful for queries that are expected to return exactly one row,
1365 /// such as aggregate queries or lookups by unique key.
1366 ///
1367 /// # Example
1368 ///
1369 /// ```no_run
1370 /// # use hyperdb_api::{Connection, Result};
1371 /// # fn example(conn: &Connection) -> Result<()> {
1372 /// let result = conn.execute_query("SELECT COUNT(*) FROM users")?;
1373 /// if let Some(row) = result.first_row()? {
1374 /// let count: Option<i64> = row.get(0);
1375 /// println!("User count: {:?}", count);
1376 /// }
1377 /// # Ok(())
1378 /// # }
1379 /// ```
1380 ///
1381 /// # Errors
1382 ///
1383 /// Returns the error from [`next_chunk`](Self::next_chunk). An empty
1384 /// result set yields `Ok(None)`, not an error.
1385 pub fn first_row(mut self) -> crate::error::Result<Option<Row>> {
1386 if let Some(chunk) = self.next_chunk()? {
1387 Ok(chunk.into_iter().next())
1388 } else {
1389 Ok(None)
1390 }
1391 }
1392
1393 /// Gets the first row or returns an error if no rows were found.
1394 ///
1395 /// This is useful when you expect exactly one row and want to fail if that's not the case.
1396 ///
1397 /// # Example
1398 ///
1399 /// ```no_run
1400 /// # use hyperdb_api::{Connection, Result};
1401 /// # fn example(conn: &Connection) -> Result<()> {
1402 /// let result = conn.execute_query("SELECT id, name FROM users WHERE id = 1")?;
1403 /// let row = result.require_first_row()?; // Fails if no row found
1404 /// let id: Option<i32> = row.get(0);
1405 /// let name: Option<String> = row.get(1);
1406 /// println!("Found user: {:?} - {:?}", id, name);
1407 /// # Ok(())
1408 /// # }
1409 /// ```
1410 ///
1411 /// # Errors
1412 ///
1413 /// - Returns the error from [`first_row`](Self::first_row).
1414 /// - Returns [`crate::Error::Conversion`] with message `"Query returned no rows"`
1415 /// if the result set is empty.
1416 pub fn require_first_row(self) -> crate::error::Result<Row> {
1417 self.first_row()?
1418 .ok_or_else(|| crate::error::Error::conversion("Query returned no rows"))
1419 }
1420
1421 /// Gets a scalar value from the first row, first column.
1422 ///
1423 /// This is a convenience method for scalar queries like `SELECT COUNT(*)` or `SELECT MAX(id)`.
1424 ///
1425 /// # Example
1426 ///
1427 /// ```no_run
1428 /// # use hyperdb_api::{Connection, Result};
1429 /// # fn example(conn: &Connection) -> Result<()> {
1430 /// let result = conn.execute_query("SELECT COUNT(*) FROM users")?;
1431 /// let count: Option<i64> = result.scalar()?; // Much cleaner than manual row handling
1432 /// println!("User count: {:?}", count);
1433 /// # Ok(())
1434 /// # }
1435 /// ```
1436 ///
1437 /// # Errors
1438 ///
1439 /// Returns the error from [`require_first_row`](Self::require_first_row):
1440 /// streaming error or empty result. SQL `NULL` in the single cell
1441 /// yields `Ok(None)`.
1442 pub fn scalar<T: crate::result::RowValue>(self) -> crate::error::Result<Option<T>> {
1443 Ok(self.require_first_row()?.get(0))
1444 }
1445
1446 /// Gets a scalar value from the first row, first column, or returns an error if NULL.
1447 ///
1448 /// This is useful when you expect a non-NULL scalar result.
1449 ///
1450 /// # Example
1451 ///
1452 /// ```no_run
1453 /// # use hyperdb_api::{Connection, Result};
1454 /// # fn example(conn: &Connection) -> Result<()> {
1455 /// let result = conn.execute_query("SELECT COUNT(*) FROM users")?;
1456 /// let count: i64 = result.require_scalar()?; // Fails if NULL
1457 /// println!("User count: {}", count);
1458 /// # Ok(())
1459 /// # }
1460 /// ```
1461 ///
1462 /// # Errors
1463 ///
1464 /// - Returns the error from [`scalar`](Self::scalar).
1465 /// - Returns [`crate::Error::Conversion`] with message `"Scalar query returned NULL"`
1466 /// if the single cell is SQL `NULL`.
1467 pub fn require_scalar<T: crate::result::RowValue>(self) -> crate::error::Result<T> {
1468 self.scalar()?
1469 .ok_or_else(|| crate::error::Error::conversion("Scalar query returned NULL"))
1470 }
1471}
1472
1473// =============================================================================
1474// RowIterator - C++-like iteration over query results
1475// =============================================================================
1476
1477/// An iterator over rows in a query result set.
1478///
1479/// `RowIterator` provides a C++-like iteration experience, hiding the
1480/// chunked fetching internally. Each call to `next()` returns the next
1481/// row, automatically fetching new chunks as needed.
1482///
1483/// # Memory Behavior
1484///
1485/// Memory usage remains constant regardless of result set size:
1486/// - Internally fetches 64K rows at a time
1487/// - Previous chunks are dropped when exhausted
1488/// - Safe for billion-row results
1489///
1490/// # Example
1491///
1492/// ```no_run
1493/// # use hyperdb_api::{Connection, Result};
1494/// # fn example(conn: &Connection) -> Result<()> {
1495/// let result = conn.execute_query("SELECT id, name FROM users")?;
1496/// for row in result.rows() {
1497/// let row = row?;
1498/// let id = row.get_i32(0).unwrap_or(-1);
1499/// let name = row.get::<String>(1).unwrap_or_default();
1500/// println!("{}: {}", id, name);
1501/// }
1502/// # Ok(())
1503/// # }
1504/// ```
1505///
1506/// # Error Handling
1507///
1508/// Each iteration yields a `Result<Row>`. Errors can occur
1509/// when fetching new chunks from the server (network issues, protocol
1510/// errors, etc.). Use `?` or match to handle them:
1511///
1512/// ```no_run
1513/// # use hyperdb_api::{Rowset, Result};
1514/// # fn example(mut result: Rowset) -> Result<()> {
1515/// // Using ? in a function that returns Result
1516/// for row in result.rows() {
1517/// let row = row?;
1518/// // process row...
1519/// }
1520/// # Ok(())
1521/// # }
1522/// # fn example2(mut result: Rowset) -> Result<()> {
1523/// // Using try_for_each
1524/// result.rows().try_for_each(|row| -> Result<()> {
1525/// let row = row?;
1526/// // process row...
1527/// Ok(())
1528/// })?;
1529/// # Ok(())
1530/// # }
1531/// ```
1532#[derive(Debug)]
1533pub struct RowIterator<'conn> {
1534 rowset: Rowset<'conn>,
1535 current_iter: std::vec::IntoIter<Row>,
1536}
1537
1538impl Iterator for RowIterator<'_> {
1539 type Item = Result<Row>;
1540
1541 fn next(&mut self) -> Option<Self::Item> {
1542 // Try to get next row from current chunk
1543 if let Some(row) = self.current_iter.next() {
1544 return Some(Ok(row));
1545 }
1546
1547 // Current chunk exhausted, fetch next chunk
1548 match self.rowset.next_chunk() {
1549 Ok(Some(chunk)) => {
1550 self.current_iter = chunk.into_iter();
1551 // Return first row of new chunk
1552 self.current_iter.next().map(Ok)
1553 }
1554 Ok(None) => None, // No more rows
1555 Err(e) => Some(Err(e)), // Error fetching chunk
1556 }
1557 }
1558}
1559
1560/// Iterator over rows of a result set with `FromRow` deserialization.
1561///
1562/// Returned by [`Connection::stream_as`]. Lazily fetches chunks from the
1563/// server and maps each row to `T` via [`FromRow::from_row`]. Memory usage is
1564/// bounded by the chunk size: at most one chunk of rows is held in memory at a
1565/// time.
1566///
1567/// The column-name → index lookup table is built exactly once (on the first
1568/// non-empty chunk) and reused for all rows, making per-row mapping O(1) in
1569/// column count.
1570///
1571/// [`Connection::stream_as`]: crate::Connection::stream_as
1572/// [`FromRow::from_row`]: crate::FromRow::from_row
1573///
1574/// # Example
1575///
1576/// ```no_run
1577/// # use hyperdb_api::{Connection, CreateMode, FromRow, RowAccessor, Result};
1578/// # struct User { id: i32, name: String }
1579/// # impl FromRow for User {
1580/// # fn from_row(row: RowAccessor<'_>) -> Result<Self> {
1581/// # Ok(User { id: row.get("id")?, name: row.get("name")? })
1582/// # }
1583/// # }
1584/// # fn example(conn: &Connection) -> Result<()> {
1585/// for row in conn.stream_as::<User>("SELECT id, name FROM users")? {
1586/// let user = row?;
1587/// println!("{}: {}", user.id, user.name);
1588/// }
1589/// # Ok(())
1590/// # }
1591/// ```
1592///
1593/// # Errors
1594///
1595/// Each yielded item is a `Result<T>`:
1596/// - `Ok(T)` if the row was successfully mapped via `FromRow`.
1597/// - `Err(e)` if mapping failed (e.g., missing column, type mismatch, NULL in
1598/// a non-optional field).
1599///
1600/// Transport-level errors (chunk-fetching failures) are also surfaced as
1601/// `Err` items.
1602pub(crate) struct TypedRowIterator<'conn, T> {
1603 rowset: Rowset<'conn>,
1604 current_iter: std::vec::IntoIter<Row>,
1605 indices: Option<std::collections::HashMap<String, usize>>,
1606 _marker: std::marker::PhantomData<fn() -> T>,
1607}
1608
1609impl<T> std::fmt::Debug for TypedRowIterator<'_, T> {
1610 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1611 f.debug_struct("TypedRowIterator")
1612 .field("rowset", &self.rowset)
1613 .field("current_iter_len", &self.current_iter.len())
1614 .field("indices_built", &self.indices.is_some())
1615 .finish_non_exhaustive()
1616 }
1617}
1618
1619impl<'conn, T> TypedRowIterator<'conn, T> {
1620 /// Constructs a new `TypedRowIterator` over the given rowset.
1621 /// Crate-internal: callers go through `Connection::stream_as`.
1622 pub(crate) fn new(rowset: Rowset<'conn>) -> Self {
1623 Self {
1624 rowset,
1625 current_iter: Vec::new().into_iter(),
1626 indices: None,
1627 _marker: std::marker::PhantomData,
1628 }
1629 }
1630}
1631
1632impl<T: crate::FromRow> Iterator for TypedRowIterator<'_, T> {
1633 type Item = Result<T>;
1634
1635 fn next(&mut self) -> Option<Self::Item> {
1636 loop {
1637 // Try to get next row from current chunk. `get_or_insert_with` is
1638 // a no-op here (the map was already built when this chunk was
1639 // fetched, below) but lets us borrow it without an Option unwrap.
1640 if let Some(row) = self.current_iter.next() {
1641 let indices = self.indices.get_or_insert_with(Default::default);
1642 return Some(T::from_row(crate::RowAccessor::new_owned(&row, indices)));
1643 }
1644
1645 // Current chunk exhausted, fetch next chunk
1646 match self.rowset.next_chunk() {
1647 Ok(Some(chunk)) => {
1648 // Build the index map once, on the first chunk. The schema
1649 // is populated by `next_chunk` as a side effect. If the
1650 // schema is somehow unavailable, fall back to an empty map
1651 // so column lookups surface a `Missing` error per row —
1652 // matching `fetch_all_as`'s `unwrap_or_default()` rather
1653 // than silently truncating the stream.
1654 if self.indices.is_none() {
1655 let map = self
1656 .rowset
1657 .schema()
1658 .map(|schema| crate::RowAccessor::build_owned_indices(&schema))
1659 .unwrap_or_default();
1660 self.indices = Some(map);
1661 }
1662 self.current_iter = chunk.into_iter();
1663 // loop around to drain the freshly fetched chunk
1664 }
1665 Ok(None) => return None, // No more rows
1666 Err(e) => return Some(Err(e)), // Error fetching chunk
1667 }
1668 }
1669 }
1670}
1671
1672// =============================================================================
1673// Unit tests that don't need a live hyperd backend.
1674//
1675// Anything requiring a real Hyper process lives in `hyperdb-api/tests/*.rs` where
1676// `TestConnection` spins up a `HyperProcess` per test. These tests exercise
1677// pure in-process logic — specifically the Arrow-path branches of
1678// `Row::get_numeric`, where we can construct a synthetic `RecordBatch` with a
1679// specific `DataType::Decimal128(p, s)` descriptor and probe `Row`'s
1680// handling of it without hyperd in the loop.
1681// =============================================================================
1682
1683#[cfg(test)]
1684mod arrow_path_tests {
1685 use super::*;
1686 use arrow::array::Decimal128Array;
1687 use arrow::datatypes::{DataType as ArrowType, Field, Schema};
1688
1689 /// Build a single-row `RecordBatch` with a Decimal128 column whose
1690 /// value is `raw` and whose precision/scale are those passed in.
1691 fn decimal128_batch(raw: i128, precision: u8, scale: i8) -> Arc<RecordBatch> {
1692 let array = Decimal128Array::from(vec![Some(raw)])
1693 .with_precision_and_scale(precision, scale)
1694 .expect("valid Arrow Decimal128");
1695 let field = Field::new("v", ArrowType::Decimal128(precision, scale), true);
1696 let schema = Arc::new(Schema::new(vec![field]));
1697 Arc::new(RecordBatch::try_new(schema, vec![Arc::new(array)]).expect("batch"))
1698 }
1699
1700 /// Happy-path: a positive-scale Arrow Decimal128 decodes correctly
1701 /// via `row.get::<Numeric>()`, locking in the common case alongside
1702 /// the negative-scale test below.
1703 #[test]
1704 fn get_numeric_reads_arrow_decimal128_with_positive_scale() {
1705 // NUMERIC(10, 2), unscaled value 123 → 1.23
1706 let batch = decimal128_batch(123, 10, 2);
1707 let row = Row::from_arrow(Arc::clone(&batch), 0, None);
1708
1709 let numeric = row.get_numeric(0).expect("Some for positive-scale decimal");
1710 assert_eq!(numeric.unscaled_value(), 123);
1711 assert_eq!(numeric.scale(), 2);
1712 assert!((numeric.to_f64() - 1.23).abs() < 1e-9);
1713
1714 // Same result via the generic `row.get::<Numeric>` path.
1715 let via_rowvalue: hyperdb_api_core::types::Numeric =
1716 row.get(0).expect("RowValue path agrees with get_numeric");
1717 assert_eq!(via_rowvalue, numeric);
1718 }
1719
1720 /// Arrow's `DataType::Decimal128(u8, i8)` allows negative scale —
1721 /// a legitimate Arrow concept meaning "raw × 10^abs(scale)" (e.g.
1722 /// scale=-2 with raw=5 renders as 500). Hyper's `Numeric` uses a
1723 /// `u8` scale with no representation for that multiplier.
1724 ///
1725 /// The earlier `.max(0) as u8` code silently clamped the scale to
1726 /// 0 while keeping `raw` unchanged — which produces a value with
1727 /// the wrong magnitude (`5` instead of `500` in the example
1728 /// above). The fix here is to reject negative scales via
1729 /// `try_into` + `?`, which surfaces as `None` to the caller.
1730 /// That's strictly safer than a silent-wrong-magnitude value.
1731 #[test]
1732 fn get_numeric_rejects_arrow_decimal128_with_negative_scale() {
1733 // NUMERIC(10, -2) — Arrow allows this; Hyper's Numeric can't
1734 // represent it. Our `get_numeric` must return None rather
1735 // than silently drop the negative-scale multiplier.
1736 let batch = decimal128_batch(5, 10, -2);
1737 let row = Row::from_arrow(Arc::clone(&batch), 0, None);
1738
1739 assert!(
1740 row.get_numeric(0).is_none(),
1741 "negative Arrow scale must not produce a silently-wrong-magnitude Numeric",
1742 );
1743
1744 // And the same through the `RowValue` blanket path.
1745 let via_rowvalue: Option<hyperdb_api_core::types::Numeric> = row.get(0);
1746 assert!(via_rowvalue.is_none());
1747 }
1748
1749 /// Boundary: scale = 0 is a legal `u8` and must still succeed.
1750 /// Guards against an over-tightened check that accidentally
1751 /// rejects zero along with negatives.
1752 #[test]
1753 fn get_numeric_accepts_arrow_decimal128_with_zero_scale() {
1754 let batch = decimal128_batch(42, 10, 0);
1755 let row = Row::from_arrow(Arc::clone(&batch), 0, None);
1756 let numeric = row.get_numeric(0).expect("scale 0 is fine");
1757 assert_eq!(numeric.unscaled_value(), 42);
1758 assert_eq!(numeric.scale(), 0);
1759 }
1760}