Skip to main content

hyperdb_api/
result.rs

1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Query result handling with type-safe value access.
5//!
6//! This module provides types for working with query results:
7//! - [`Rowset`] — Streaming result set with memory-efficient chunked iteration
8//! - [`RowIterator`] — C++-like iterator for simple row-by-row processing
9//! - [`ResultSchema`] — Column metadata (names and types)
10//!
11//! # Streaming Design
12//!
13//! Query results are streamed from the server in chunks of up to
14//! [`DEFAULT_BINARY_CHUNK_SIZE`] rows (64K). Only one chunk is held in memory
15//! at a time, so memory usage is `O(chunk_size)` regardless of total result
16//! size — safe for billion-row results.
17//!
18//! # Iteration Patterns
19//!
20//! Two patterns are available, both streaming with constant memory:
21//!
22//! ## Pattern 1: Chunked (`next_chunk()`) — batch processing
23//!
24//! Best for high-throughput scenarios. Error checking happens once per chunk
25//! (~64K rows), and you get direct `Vec<Row>` iteration with good cache
26//! locality. Natural for batch operations, vectorized processing, or
27//! parallelizing across chunks.
28//!
29//! ```no_run
30//! # use hyperdb_api::{Connection, CreateMode, Result};
31//! # fn example(conn: &Connection) -> Result<()> {
32//! let mut result = conn.execute_query("SELECT * FROM table")?;
33//! while let Some(chunk) = result.next_chunk()? {
34//!     for row in &chunk {
35//!         let id: Option<i32> = row.get(0);
36//!         let value: Option<f64> = row.get(1);
37//!     }
38//! }
39//! # Ok(())
40//! # }
41//! ```
42//!
43//! ## Pattern 2: Iterator (`rows()`) — simple row-by-row
44//!
45//! Best for simple iteration where you process one row at a time. Each item
46//! is `Result<Row>` since chunk fetches can fail, so error checking happens
47//! per-row. The extra iterator wrapper adds slight overhead compared to
48//! `next_chunk()`.
49//!
50//! ```no_run
51//! # use hyperdb_api::{Connection, Result};
52//! # fn example(conn: &Connection) -> Result<()> {
53//! let result = conn.execute_query("SELECT * FROM table")?;
54//! for row in result.rows() {
55//!     let row = row?;  // Handle potential errors
56//!     let id: Option<i32> = row.get(0);
57//!     let value: Option<f64> = row.get(1);
58//! }
59//! # Ok(())
60//! # }
61//! ```
62//!
63//! **When to use which:**
64//! - `rows()` — simple iteration, one row at a time, small overhead acceptable
65//! - `next_chunk()` — maximum performance, large result sets, batch operations
66//!
67//! # Type Coercion
68//!
69//! The generic `row.get::<T>()` method supports automatic widening coercion:
70//!
71//! | Request Type | Coerces From |
72//! |---|---|
73//! | `i32` | `i16` |
74//! | `i64` | `i32`, `i16` |
75//! | `f64` | `f32` |
76//!
77//! Direct accessors (`row.get_i32()`, `row.get_f64()`) skip coercion for
78//! slightly better performance when the exact type is known.
79
80use std::sync::Arc;
81
82use arrow::array::Array;
83use arrow::record_batch::RecordBatch;
84use hyperdb_api_core::client::QueryStream;
85use hyperdb_api_core::client::StreamRow;
86use hyperdb_api_core::types::SqlType;
87
88use crate::arrow_result::{ArrowRowset, FromArrowValue};
89use crate::error::Result;
90
91/// Default chunk size for streaming queries (64K rows).
92pub(crate) const DEFAULT_BINARY_CHUNK_SIZE: usize = 65536;
93
94// =============================================================================
95// Row - Unified row type for both TCP and gRPC
96// =============================================================================
97
98/// A row from a query result, providing typed value access.
99///
100/// This type abstracts over the underlying transport (TCP or gRPC),
101/// providing a consistent API for accessing column values regardless
102/// of how the data was retrieved.
103///
104/// # Example
105///
106/// ```no_run
107/// # use hyperdb_api::Result;
108/// # fn example(result: hyperdb_api::Rowset) -> Result<()> {
109/// for row in result.rows() {
110///     let row = row?;
111///     let id: Option<i32> = row.get(0);
112///     let name: Option<String> = row.get(1);
113///     // Or use direct accessors
114///     let value = row.get_f64(2);
115/// }
116/// # Ok(())
117/// # }
118/// ```
119pub struct Row {
120    inner: RowInner,
121    /// Shared schema reference for the parent rowset. Every row
122    /// produced by [`Rowset::next_chunk`] carries this (cloned cheaply
123    /// from an `Arc`) so that metadata-dependent decoders like
124    /// [`Self::get_numeric`] can look up `SqlType` per column without
125    /// the caller plumbing scale through manually. `None` only in the
126    /// unusual case a row is constructed outside `next_chunk` (no such
127    /// path exists in-tree today; the field is `Option` so future
128    /// schemas-unavailable paths remain compilable).
129    schema: Option<Arc<ResultSchema>>,
130}
131
132impl std::fmt::Debug for Row {
133    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
134        f.debug_struct("Row")
135            .field("has_schema", &self.schema.is_some())
136            .finish_non_exhaustive()
137    }
138}
139
140/// Internal per-transport backing for a [`Row`]. Not public: all
141/// consumer-visible API goes through `Row`'s methods, which dispatch
142/// on this enum internally.
143enum RowInner {
144    /// Row from TCP transport (`StreamRow`).
145    Tcp(StreamRow),
146    /// Row from gRPC transport (Arrow-backed).
147    Arrow {
148        /// The record batch containing this row's data.
149        batch: Arc<RecordBatch>,
150        /// Index of this row within the batch.
151        row_index: usize,
152    },
153}
154
155impl Row {
156    /// Construct a TCP-backed row with an attached schema reference.
157    #[inline]
158    pub(crate) fn from_tcp(row: StreamRow, schema: Option<Arc<ResultSchema>>) -> Self {
159        Row {
160            inner: RowInner::Tcp(row),
161            schema,
162        }
163    }
164
165    /// Construct an Arrow-backed row with an attached schema reference.
166    #[inline]
167    pub(crate) fn from_arrow(
168        batch: Arc<RecordBatch>,
169        row_index: usize,
170        schema: Option<Arc<ResultSchema>>,
171    ) -> Self {
172        Row {
173            inner: RowInner::Arrow { batch, row_index },
174            schema,
175        }
176    }
177
178    /// Returns the schema this row belongs to, if attached.
179    ///
180    /// Every row produced by [`Rowset::next_chunk`] has a schema
181    /// attached — so this returns `Some` for any row obtained through
182    /// the public API.
183    #[inline]
184    pub fn schema(&self) -> Option<&ResultSchema> {
185        self.schema.as_deref()
186    }
187
188    /// Returns the `SqlType` of the column at the given index, if the
189    /// schema is attached and the index is in bounds.
190    ///
191    /// Useful for metadata-dependent decoders like [`Self::get_numeric`]
192    /// that need per-column precision and scale. Most callers reach for
193    /// [`Self::get`] / [`Self::try_get`] instead, which handle this
194    /// lookup internally via the [`RowValue`] trait.
195    #[inline]
196    pub fn sql_type(&self, idx: usize) -> Option<SqlType> {
197        let schema = self.schema.as_deref()?;
198        if idx < schema.column_count() {
199            Some(schema.column(idx).sql_type())
200        } else {
201            None
202        }
203    }
204
205    /// Gets a typed value at the given column index.
206    ///
207    /// # Example
208    ///
209    /// ```no_run
210    /// # use hyperdb_api::Row;
211    /// # fn example(row: &Row) {
212    /// let id: Option<i32> = row.get(0);
213    /// let name: Option<String> = row.get(1);
214    /// # }
215    /// ```
216    #[inline]
217    pub fn get<T: RowValue>(&self, idx: usize) -> Option<T> {
218        T::from_row(self, idx)
219    }
220
221    /// Gets a typed value at the given column index, returning a `Result`
222    /// with a descriptive error on failure.
223    ///
224    /// Use this in [`FromRow`] implementations for better error messages
225    /// than bare `row.get(idx).ok_or(...)`.
226    ///
227    /// # Example
228    ///
229    /// Most callers should reach for [`crate::FromRow`] +
230    /// [`crate::RowAccessor`] for typed mapping. `try_get` is the
231    /// underlying positional building block; useful when you need
232    /// indexed access from a hand-rolled loop.
233    ///
234    /// ```no_run
235    /// # use hyperdb_api::{Row, Result};
236    /// # fn read(row: &Row) -> Result<(i32, String)> {
237    /// let id: i32 = row.try_get(0, "id")?;
238    /// let name: String = row.try_get(1, "name")?;
239    /// # Ok((id, name))
240    /// # }
241    /// ```
242    ///
243    /// # Errors
244    ///
245    /// - Returns [`crate::Error::Conversion`] if `idx` is out of bounds for the row's
246    ///   column count.
247    /// - Returns [`crate::Error::Conversion`] if the cell is SQL `NULL` or its value
248    ///   cannot be decoded as `T`.
249    pub fn try_get<T: RowValue>(&self, idx: usize, column_name: &str) -> crate::error::Result<T> {
250        if idx >= self.column_count() {
251            return Err(crate::error::Error::conversion(format!(
252                "Column index {} ({:?}) out of bounds — row has {} columns",
253                idx,
254                column_name,
255                self.column_count(),
256            )));
257        }
258        self.get::<T>(idx).ok_or_else(|| {
259            crate::error::Error::conversion(format!(
260                "Column {idx} ({column_name:?}) is NULL or has incompatible type",
261            ))
262        })
263    }
264
265    /// Looks up a column by name and returns its value as `T`.
266    ///
267    /// Convenient for hand-coded paths that aren't using
268    /// [`FromRow`]. The lookup is a linear scan over
269    /// [`ResultSchema::column_index`]; for hot paths (many rows × many
270    /// fields), prefer
271    /// [`fetch_one_as`](crate::Connection::fetch_one_as) /
272    /// [`fetch_all_as`](crate::Connection::fetch_all_as), which build
273    /// a cached column-name → index lookup once per query and hand
274    /// every `FromRow` impl a [`RowAccessor`](crate::RowAccessor) that
275    /// reuses it.
276    ///
277    /// # Errors
278    ///
279    /// - [`crate::Error::Column`] with [`crate::ColumnErrorKind::Missing`]
280    ///   if no column with `name` exists in the row's schema (or the
281    ///   row has no schema attached).
282    /// - [`crate::Error::Conversion`] if the cell is `NULL` or cannot
283    ///   be decoded as `T`. (Inherited from [`Self::try_get`].)
284    pub fn get_by_name<T: RowValue>(&self, name: &str) -> crate::error::Result<T> {
285        let idx = self
286            .schema()
287            .and_then(|s| s.column_index(name))
288            .ok_or_else(|| {
289                crate::error::Error::column(name, crate::error::ColumnErrorKind::Missing)
290            })?;
291        self.try_get(idx, name)
292    }
293
294    /// Returns an Arrow column reference, or `None` if the index is out of bounds.
295    ///
296    /// This is a safe wrapper around `batch.column(idx)` that avoids panicking.
297    #[inline]
298    fn arrow_column(batch: &RecordBatch, idx: usize) -> Option<&Arc<dyn Array>> {
299        if idx < batch.num_columns() {
300            Some(batch.column(idx))
301        } else {
302            None
303        }
304    }
305
306    /// Gets an i16 value at the given column index.
307    #[inline]
308    pub fn get_i16(&self, idx: usize) -> Option<i16> {
309        match &self.inner {
310            RowInner::Tcp(row) => row.get_i16(idx),
311            RowInner::Arrow { batch, row_index } => {
312                i16::from_arrow_column(Self::arrow_column(batch, idx)?, *row_index)
313            }
314        }
315    }
316
317    /// Gets an i32 value at the given column index.
318    #[inline]
319    pub fn get_i32(&self, idx: usize) -> Option<i32> {
320        match &self.inner {
321            RowInner::Tcp(row) => row.get_i32(idx),
322            RowInner::Arrow { batch, row_index } => {
323                i32::from_arrow_column(Self::arrow_column(batch, idx)?, *row_index)
324            }
325        }
326    }
327
328    /// Gets an i64 value at the given column index.
329    #[inline]
330    pub fn get_i64(&self, idx: usize) -> Option<i64> {
331        match &self.inner {
332            RowInner::Tcp(row) => row.get_i64(idx),
333            RowInner::Arrow { batch, row_index } => {
334                i64::from_arrow_column(Self::arrow_column(batch, idx)?, *row_index)
335            }
336        }
337    }
338
339    /// Gets an f32 value at the given column index.
340    #[inline]
341    pub fn get_f32(&self, idx: usize) -> Option<f32> {
342        match &self.inner {
343            RowInner::Tcp(row) => row.get_f32(idx),
344            RowInner::Arrow { batch, row_index } => {
345                f32::from_arrow_column(Self::arrow_column(batch, idx)?, *row_index)
346            }
347        }
348    }
349
350    /// Gets an f64 value at the given column index.
351    #[inline]
352    pub fn get_f64(&self, idx: usize) -> Option<f64> {
353        match &self.inner {
354            RowInner::Tcp(row) => row.get_f64(idx),
355            RowInner::Arrow { batch, row_index } => {
356                f64::from_arrow_column(Self::arrow_column(batch, idx)?, *row_index)
357            }
358        }
359    }
360
361    /// Gets a bool value at the given column index.
362    #[inline]
363    pub fn get_bool(&self, idx: usize) -> Option<bool> {
364        match &self.inner {
365            RowInner::Tcp(row) => row.get_bool(idx),
366            RowInner::Arrow { batch, row_index } => {
367                bool::from_arrow_column(Self::arrow_column(batch, idx)?, *row_index)
368            }
369        }
370    }
371
372    /// Gets a String value at the given column index.
373    #[inline]
374    pub fn get_string(&self, idx: usize) -> Option<String> {
375        match &self.inner {
376            RowInner::Tcp(row) => row.get_string(idx),
377            RowInner::Arrow { batch, row_index } => {
378                String::from_arrow_column(Self::arrow_column(batch, idx)?, *row_index)
379            }
380        }
381    }
382
383    /// Checks if the value at the given column is null.
384    #[inline]
385    pub fn is_null(&self, idx: usize) -> bool {
386        match &self.inner {
387            RowInner::Tcp(row) => row.is_null(idx),
388            RowInner::Arrow { batch, row_index } => match Self::arrow_column(batch, idx) {
389                Some(col) => col.is_null(*row_index),
390                None => true,
391            },
392        }
393    }
394
395    /// Returns the number of columns in this row.
396    #[inline]
397    pub fn column_count(&self) -> usize {
398        match &self.inner {
399            RowInner::Tcp(row) => row.column_count(),
400            RowInner::Arrow { batch, .. } => batch.num_columns(),
401        }
402    }
403
404    /// Gets raw bytes at the given column index.
405    ///
406    /// For TCP rows, returns the raw binary data. For Arrow rows, this method
407    /// is not available and returns None.
408    #[inline]
409    pub fn get_bytes(&self, idx: usize) -> Option<Vec<u8>> {
410        match &self.inner {
411            RowInner::Tcp(row) => row.get_bytes(idx).map(<[u8]>::to_vec),
412            RowInner::Arrow { batch, row_index } => {
413                Vec::<u8>::from_arrow_column(Self::arrow_column(batch, idx)?, *row_index)
414            }
415        }
416    }
417
418    /// Gets a Date value at the given column index.
419    #[inline]
420    pub fn get_date(&self, idx: usize) -> Option<hyperdb_api_core::types::Date> {
421        match &self.inner {
422            RowInner::Tcp(row) => row.get(idx),
423            RowInner::Arrow { batch, row_index } => {
424                // Arrow Date32 is days since Unix epoch (1970-01-01)
425                // Hyper Date is days since Hyper epoch (2000-01-01)
426                use arrow::array::Date32Array;
427                let col = Self::arrow_column(batch, idx)?;
428                let arr = col.as_any().downcast_ref::<Date32Array>()?;
429                if arr.is_null(*row_index) {
430                    return None;
431                }
432                let unix_days = arr.value(*row_index);
433                // Convert from Unix epoch to Hyper epoch (diff is 10957 days)
434                let hyper_days = unix_days - 10957;
435                Some(hyperdb_api_core::types::Date::from_days(hyper_days))
436            }
437        }
438    }
439
440    /// Gets a Time value at the given column index.
441    #[inline]
442    pub fn get_time(&self, idx: usize) -> Option<hyperdb_api_core::types::Time> {
443        match &self.inner {
444            RowInner::Tcp(row) => row.get(idx),
445            RowInner::Arrow { batch, row_index } => {
446                // Arrow Time64 is microseconds since midnight
447                use arrow::array::Time64MicrosecondArray;
448                let col = Self::arrow_column(batch, idx)?;
449                let arr = col.as_any().downcast_ref::<Time64MicrosecondArray>()?;
450                if arr.is_null(*row_index) {
451                    return None;
452                }
453                let micros = u64::try_from(arr.value(*row_index)).ok()?;
454                Some(hyperdb_api_core::types::Time::from_microseconds(micros))
455            }
456        }
457    }
458
459    /// Gets a Timestamp value at the given column index.
460    #[inline]
461    pub fn get_timestamp(&self, idx: usize) -> Option<hyperdb_api_core::types::Timestamp> {
462        match &self.inner {
463            RowInner::Tcp(row) => row.get(idx),
464            RowInner::Arrow { batch, row_index } => {
465                // Arrow Timestamp is microseconds since Unix epoch
466                // Hyper Timestamp is microseconds since Hyper epoch (2000-01-01)
467                use arrow::array::TimestampMicrosecondArray;
468                let col = Self::arrow_column(batch, idx)?;
469                let arr = col.as_any().downcast_ref::<TimestampMicrosecondArray>()?;
470                if arr.is_null(*row_index) {
471                    return None;
472                }
473                let unix_micros = arr.value(*row_index);
474                // Convert from Unix epoch to Hyper epoch
475                // 2000-01-01 is 946684800 seconds after 1970-01-01
476                let hyper_micros = unix_micros - 946_684_800_000_000;
477                Some(hyperdb_api_core::types::Timestamp::from_microseconds(
478                    hyper_micros,
479                ))
480            }
481        }
482    }
483
484    /// Gets an `OffsetTimestamp` (TIMESTAMP WITH TIME ZONE) value at the given column index.
485    #[inline]
486    pub fn get_offset_timestamp(
487        &self,
488        idx: usize,
489    ) -> Option<hyperdb_api_core::types::OffsetTimestamp> {
490        match &self.inner {
491            RowInner::Tcp(row) => row.get(idx),
492            RowInner::Arrow { batch, row_index } => {
493                // Arrow TimestampTz is microseconds since Unix epoch with timezone
494                use arrow::array::TimestampMicrosecondArray;
495                let col = Self::arrow_column(batch, idx)?;
496                let arr = col.as_any().downcast_ref::<TimestampMicrosecondArray>()?;
497                if arr.is_null(*row_index) {
498                    return None;
499                }
500                let unix_micros = arr.value(*row_index);
501                let hyper_micros = unix_micros - 946_684_800_000_000;
502                let ts = hyperdb_api_core::types::Timestamp::from_microseconds(hyper_micros);
503                Some(hyperdb_api_core::types::OffsetTimestamp::new(ts, 0))
504            }
505        }
506    }
507
508    /// Gets an Interval value at the given column index.
509    #[inline]
510    pub fn get_interval(&self, idx: usize) -> Option<hyperdb_api_core::types::Interval> {
511        match &self.inner {
512            RowInner::Tcp(row) => row.get(idx),
513            RowInner::Arrow { batch, row_index } => {
514                // Arrow MonthDayNano interval → Hyper Interval
515                use arrow::array::IntervalMonthDayNanoArray;
516                let col = Self::arrow_column(batch, idx)?;
517                let arr = col.as_any().downcast_ref::<IntervalMonthDayNanoArray>()?;
518                if arr.is_null(*row_index) {
519                    return None;
520                }
521                let v = arr.value(*row_index);
522                let micros = v.nanoseconds / 1000;
523                Some(hyperdb_api_core::types::Interval::new(
524                    v.months, v.days, micros,
525                ))
526            }
527        }
528    }
529
530    /// Gets a `NUMERIC` value at the given column index.
531    ///
532    /// This is the metadata-aware variant of [`Self::get_bytes`] +
533    /// [`hyperdb_api_core::types::Numeric::from_binary_with_scale`]: it looks up
534    /// the column's `SqlType::Numeric { scale, .. }` from the attached
535    /// schema and decodes the wire bytes with that scale, handling
536    /// both of Hyper's NUMERIC wire forms transparently:
537    ///
538    /// - **8 bytes** (i64) when the column's declared precision ≤ 18
539    ///   (Hyper's `Type::Numeric`). This is what aggregates like
540    ///   `AVG(INTEGER)` return as `Numeric(16, 6)`.
541    /// - **16 bytes** (i128) when declared precision > 18
542    ///   (Hyper's `Type::BigNumeric`).
543    ///
544    /// Returns `None` if any of the following are true: the value is
545    /// NULL, the schema isn't attached (which never happens for rows
546    /// obtained through [`Rowset::next_chunk`]), the column at `idx`
547    /// isn't `NUMERIC`, or the bytes can't be decoded.
548    ///
549    /// For non-TCP (Arrow/gRPC) rows, this path falls back to reading
550    /// the Arrow-native `Decimal128` / `Decimal256` columns; the scale
551    /// lives in the Arrow type descriptor in that case.
552    pub fn get_numeric(&self, idx: usize) -> Option<hyperdb_api_core::types::Numeric> {
553        match &self.inner {
554            RowInner::Tcp(_) => {
555                // TCP: decode raw bytes with scale from the schema.
556                //
557                // `SqlType::Numeric::scale` is `u32` and Hyper's own
558                // `NUMERIC(p, s)` caps at `p ≤ 38` (per
559                // `hyper/rts/type/Type.hpp`), so any legitimate scale
560                // fits easily in `u8`. But `scale as u8` silently
561                // truncates the high bits for values > 255, and a
562                // malformed server response or a bug in typemod
563                // parsing could deliver such a value — at which point
564                // we'd produce a `Numeric` with the wrong (truncated)
565                // scale and no error signal. `u8::try_from` returns
566                // `Err` for out-of-range, `?` propagates `None`, and
567                // the caller gets a clean "no value" instead of
568                // silent corruption. Symmetric with the Arrow
569                // negative-scale guard a few lines below.
570                let scale: u8 = match self.sql_type(idx)? {
571                    SqlType::Numeric { scale, .. } => u8::try_from(scale).ok()?,
572                    _ => return None,
573                };
574                let bytes = self.get_bytes(idx)?;
575                hyperdb_api_core::types::Numeric::from_binary_with_scale(&bytes, scale).ok()
576            }
577            RowInner::Arrow { batch, row_index } => {
578                use arrow::array::{Decimal128Array, Decimal256Array};
579                use arrow::datatypes::DataType as ArrowType;
580                let col = Self::arrow_column(batch, idx)?;
581                // Arrow stores decimal precision/scale in the type
582                // descriptor itself, so there's no separate schema
583                // lookup needed on this path.
584                //
585                // Note: Arrow's decimal scale is `i8` and can legally
586                // be negative (negative scale = "value is multiplied
587                // by 10^abs(scale)", e.g. scale=-2 on raw=5 renders
588                // as 500). Hyper's `Numeric` uses `u8` scale and has
589                // no representation for the negative-scale
590                // multiplier. Rather than silently dropping the
591                // multiplier (which would make raw=5 display as 5
592                // instead of 500), we surface it as "no value" via
593                // `try_into` + `?`. Negative-scale decimals don't
594                // originate from Hyper's own gRPC encoder — but
595                // `Row` can be fed from externally-loaded Arrow
596                // files, so defensive handling costs nothing and
597                // prevents a silent-corruption failure mode.
598                match col.data_type() {
599                    ArrowType::Decimal128(_precision, scale) => {
600                        let scale_u8: u8 = (*scale).try_into().ok()?;
601                        let arr = col.as_any().downcast_ref::<Decimal128Array>()?;
602                        if arr.is_null(*row_index) {
603                            return None;
604                        }
605                        let raw = arr.value(*row_index); // i128
606                        Some(hyperdb_api_core::types::Numeric::new(raw, scale_u8))
607                    }
608                    ArrowType::Decimal256(_precision, scale) => {
609                        // i256 from Arrow; Hyper NUMERIC caps at i128
610                        // (precision ≤ 38). Narrow to i128; this is
611                        // lossless for any value Hyper would actually
612                        // produce. Values outside that range are a
613                        // server-side contract violation.
614                        let scale_u8: u8 = (*scale).try_into().ok()?;
615                        let arr = col.as_any().downcast_ref::<Decimal256Array>()?;
616                        if arr.is_null(*row_index) {
617                            return None;
618                        }
619                        let raw = arr.value(*row_index);
620                        let as_i128: i128 = raw.to_i128()?;
621                        Some(hyperdb_api_core::types::Numeric::new(as_i128, scale_u8))
622                    }
623                    _ => None,
624                }
625            }
626        }
627    }
628}
629
630/// Trait for types that can be extracted from a Row.
631pub trait RowValue: Sized {
632    /// Extract a value from a Row at the given column index.
633    fn from_row(row: &Row, idx: usize) -> Option<Self>;
634}
635
636impl RowValue for i16 {
637    #[inline]
638    fn from_row(row: &Row, idx: usize) -> Option<Self> {
639        row.get_i16(idx)
640    }
641}
642
643impl RowValue for i32 {
644    #[inline]
645    fn from_row(row: &Row, idx: usize) -> Option<Self> {
646        row.get_i32(idx).or_else(|| row.get_i16(idx).map(i32::from))
647    }
648}
649
650impl RowValue for i64 {
651    #[inline]
652    fn from_row(row: &Row, idx: usize) -> Option<Self> {
653        row.get_i64(idx)
654            .or_else(|| row.get_i32(idx).map(i64::from))
655            .or_else(|| row.get_i16(idx).map(i64::from))
656    }
657}
658
659impl RowValue for f32 {
660    #[inline]
661    fn from_row(row: &Row, idx: usize) -> Option<Self> {
662        row.get_f32(idx)
663    }
664}
665
666impl RowValue for f64 {
667    #[inline]
668    fn from_row(row: &Row, idx: usize) -> Option<Self> {
669        row.get_f64(idx).or_else(|| row.get_f32(idx).map(f64::from))
670    }
671}
672
673impl RowValue for bool {
674    #[inline]
675    fn from_row(row: &Row, idx: usize) -> Option<Self> {
676        row.get_bool(idx)
677    }
678}
679
680impl RowValue for String {
681    #[inline]
682    fn from_row(row: &Row, idx: usize) -> Option<Self> {
683        row.get_string(idx)
684    }
685}
686
687impl RowValue for Vec<u8> {
688    #[inline]
689    fn from_row(row: &Row, idx: usize) -> Option<Self> {
690        row.get_bytes(idx)
691    }
692}
693
694impl RowValue for hyperdb_api_core::types::Date {
695    #[inline]
696    fn from_row(row: &Row, idx: usize) -> Option<Self> {
697        row.get_date(idx)
698    }
699}
700
701impl RowValue for hyperdb_api_core::types::Time {
702    #[inline]
703    fn from_row(row: &Row, idx: usize) -> Option<Self> {
704        row.get_time(idx)
705    }
706}
707
708impl RowValue for hyperdb_api_core::types::Timestamp {
709    #[inline]
710    fn from_row(row: &Row, idx: usize) -> Option<Self> {
711        row.get_timestamp(idx)
712    }
713}
714
715impl RowValue for hyperdb_api_core::types::OffsetTimestamp {
716    #[inline]
717    fn from_row(row: &Row, idx: usize) -> Option<Self> {
718        row.get_offset_timestamp(idx)
719    }
720}
721
722impl RowValue for hyperdb_api_core::types::Interval {
723    #[inline]
724    fn from_row(row: &Row, idx: usize) -> Option<Self> {
725        row.get_interval(idx)
726    }
727}
728
729impl RowValue for hyperdb_api_core::types::Numeric {
730    /// Unlike every other `RowValue` impl, `Numeric` decode requires
731    /// per-column metadata (scale + wire-form width) that lives on the
732    /// row's attached `ResultSchema`. [`Row::get_numeric`] does the
733    /// lookup; this impl delegates there so generic `row.get::<Numeric>()`
734    /// / `row.try_get::<Numeric>(idx, "name")` call sites work the same
735    /// as every other type.
736    #[inline]
737    fn from_row(row: &Row, idx: usize) -> Option<Self> {
738        row.get_numeric(idx)
739    }
740}
741
742// =============================================================================
743// FromRow - Struct mapping trait
744// =============================================================================
745
746/// Trait for types that can be constructed from a database row.
747///
748/// Used by [`Connection::fetch_one_as`](crate::Connection::fetch_one_as)
749/// and [`Connection::fetch_all_as`](crate::Connection::fetch_all_as)
750/// to map query results into typed structs. Implementations receive
751/// a [`RowAccessor`](crate::RowAccessor), which provides name-based
752/// access via a column-name → index lookup built once per query.
753///
754/// To map the results of a **parameterized** query (`$1` placeholders
755/// bound via [`ToSqlParam`](crate::params::ToSqlParam)) directly into a
756/// struct, use the `_as_params` variants —
757/// [`fetch_one_as_params`](crate::Connection::fetch_one_as_params),
758/// [`fetch_all_as_params`](crate::Connection::fetch_all_as_params), and
759/// [`stream_as_params`](crate::Connection::stream_as_params) — which combine
760/// this trait with parameter binding in one call (also on
761/// [`AsyncConnection`](crate::AsyncConnection)).
762///
763/// # Recommended: derive
764///
765/// In most cases the `#[derive(FromRow)]` macro handles the mapping
766/// for you — match struct field names to column names automatically,
767/// with `#[hyperdb(rename = "...")]` for cases where they differ:
768///
769/// ```ignore
770/// use hyperdb_api::FromRow;
771/// use hyperdb_api_derive::FromRow; // proc-macro derive
772///
773/// #[derive(FromRow)]
774/// struct User {
775///     id: i32,
776///     name: String,
777///     #[hyperdb(rename = "email_address")]
778///     email: Option<String>,
779/// }
780/// ```
781///
782/// # Hand-written impl
783///
784/// For custom mapping logic (computed fields, multi-column composition,
785/// etc.) implement the trait directly:
786///
787/// ```no_run
788/// use hyperdb_api::{FromRow, RowAccessor, Result};
789///
790/// struct User { id: i32, name: String, active: bool }
791///
792/// impl FromRow for User {
793///     fn from_row(row: RowAccessor<'_>) -> Result<Self> {
794///         Ok(User {
795///             id: row.get("id")?,
796///             name: row.get("name")?,
797///             active: row.get("active")?,
798///         })
799///     }
800/// }
801/// ```
802///
803/// For ad-hoc tuple destructuring of small results, use
804/// [`Row::get`](crate::Row::get) directly — there are no blanket
805/// tuple `FromRow` impls. Define a struct with `#[derive(FromRow)]`
806/// for typed access in `fetch_*_as`.
807pub trait FromRow: Sized {
808    /// Constructs an instance from a database row.
809    ///
810    /// # Errors
811    ///
812    /// Returns an [`Error`](crate::Error) — typically
813    /// [`crate::Error::Column`] — when a required column is missing,
814    /// SQL `NULL`, or cannot be decoded as the expected type.
815    /// Implementations decide the exact failure shape.
816    fn from_row(row: crate::RowAccessor<'_>) -> crate::error::Result<Self>;
817}
818
819// =============================================================================
820// ResultSchema and ResultColumn
821// =============================================================================
822
823/// Metadata about a column in a result schema.
824#[derive(Debug, Clone)]
825pub struct ResultColumn {
826    /// The column name.
827    name: String,
828    /// The SQL type of the column.
829    sql_type: SqlType,
830    /// The column index (0-based).
831    index: usize,
832}
833
834impl ResultColumn {
835    /// Creates a new result column.
836    pub fn new(name: impl Into<String>, sql_type: SqlType, index: usize) -> Self {
837        ResultColumn {
838            name: name.into(),
839            sql_type,
840            index,
841        }
842    }
843
844    /// Returns the column name.
845    #[must_use]
846    pub fn name(&self) -> &str {
847        &self.name
848    }
849
850    /// Returns the SQL type of the column.
851    #[must_use]
852    pub fn sql_type(&self) -> SqlType {
853        self.sql_type
854    }
855
856    /// Returns the column index (0-based).
857    #[must_use]
858    pub fn index(&self) -> usize {
859        self.index
860    }
861}
862
863/// Schema information for a query result.
864///
865/// Provides metadata about the columns returned by a query, including
866/// column names and types.
867#[derive(Debug, Clone, Default)]
868pub struct ResultSchema {
869    columns: Vec<ResultColumn>,
870}
871
872impl ResultSchema {
873    /// Creates a new empty result schema.
874    #[must_use]
875    pub fn new() -> Self {
876        ResultSchema {
877            columns: Vec::new(),
878        }
879    }
880
881    /// Creates a result schema from column definitions.
882    #[must_use]
883    pub fn from_columns(columns: Vec<ResultColumn>) -> Self {
884        ResultSchema { columns }
885    }
886
887    /// Adds a column to the schema.
888    pub fn add_column(&mut self, name: impl Into<String>, sql_type: SqlType) {
889        let index = self.columns.len();
890        self.columns.push(ResultColumn::new(name, sql_type, index));
891    }
892
893    /// Returns the number of columns.
894    #[must_use]
895    pub fn column_count(&self) -> usize {
896        self.columns.len()
897    }
898
899    /// Returns all columns.
900    #[must_use]
901    pub fn columns(&self) -> &[ResultColumn] {
902        &self.columns
903    }
904
905    /// Returns the column at the given index.
906    ///
907    /// # Panics
908    ///
909    /// Panics if the index is out of bounds.
910    #[must_use]
911    pub fn column(&self, index: usize) -> &ResultColumn {
912        &self.columns[index]
913    }
914
915    /// Returns the column with the given name, if it exists.
916    #[must_use]
917    pub fn column_by_name(&self, name: &str) -> Option<&ResultColumn> {
918        self.columns.iter().find(|c| c.name == name)
919    }
920
921    /// Returns the index of the column with the given name, if it exists.
922    #[must_use]
923    pub fn column_index(&self, name: &str) -> Option<usize> {
924        self.columns.iter().position(|c| c.name == name)
925    }
926}
927
928// =============================================================================
929// Rowset (Streaming)
930// =============================================================================
931
932/// A streaming result set from a SQL query.
933///
934/// `Rowset` provides memory-efficient streaming access to query results.
935/// Results are fetched on-demand in chunks, keeping memory usage constant
936/// regardless of result set size. This makes it safe for any result size,
937/// from a single row to billions of rows.
938///
939/// # Example
940///
941/// ```no_run
942/// # use hyperdb_api::{Connection, Result};
943/// # fn example(conn: &Connection) -> Result<()> {
944/// let mut result = conn.execute_query("SELECT * FROM big_table")?;
945/// while let Some(chunk) = result.next_chunk()? {
946///     for row in &chunk {
947///         // Generic typed access (like C++ row.get<T>())
948///         let id: Option<i32> = row.get(0);
949///         let value: Option<f64> = row.get(1);
950///
951///         // Or direct accessors for performance
952///         let id = row.get_i32(0);
953///         let value = row.get_f64(1);
954///     }
955/// }
956/// # Ok(())
957/// # }
958/// ```
959///
960/// # Memory Behavior
961///
962/// - Only one chunk is held in memory at a time
963/// - Default chunk size is 64K rows (~few MB depending on row width)
964/// - Memory usage is `O(chunk_size)`, not `O(total_rows)`
965/// - Safe for billion-row results
966pub struct Rowset<'conn> {
967    inner: RowsetInner<'conn>,
968    /// Cached schema for this rowset, built lazily the first time
969    /// [`Self::next_chunk`] produces a non-empty chunk (TCP path — at
970    /// which point the `RowDescription` message has been observed) or
971    /// on first Arrow chunk (gRPC path). Stored as `Arc` so each row
972    /// produced by `next_chunk` gets a cheap ref-count clone — that's
973    /// how metadata-dependent decoders like [`Row::get_numeric`] reach
974    /// the column's `SqlType` without the caller plumbing scale
975    /// through manually.
976    schema_cache: Option<Arc<ResultSchema>>,
977    /// For one-shot prepared statements (the internal
978    /// [`crate::Connection::query_params`] path), hold the statement
979    /// handle here so its `Drop`-time `close_statement` fires *after*
980    /// the rowset releases its connection lock. Dropping the statement
981    /// before the rowset would deadlock because the inner stream owns
982    /// the connection's `MutexGuard`.
983    _statement_guard: Option<hyperdb_api_core::client::OwnedPreparedStatement>,
984}
985
986impl std::fmt::Debug for Rowset<'_> {
987    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
988        f.debug_struct("Rowset")
989            .field("has_schema_cache", &self.schema_cache.is_some())
990            .finish_non_exhaustive()
991    }
992}
993
994/// Internal enum to hold either TCP stream or Arrow data.
995enum RowsetInner<'conn> {
996    /// TCP streaming result (uses `QueryStream`).
997    Tcp(QueryStream<'conn>),
998    /// Arrow-based result from gRPC (all data loaded).
999    Arrow(ArrowRowset),
1000    /// TCP streaming result from a prepared-statement execute.
1001    Prepared(hyperdb_api_core::client::PreparedQueryStream<'conn>),
1002}
1003
1004impl<'conn> Rowset<'conn> {
1005    /// Creates a new Rowset from a `QueryStream` (TCP).
1006    pub(crate) fn new(stream: QueryStream<'conn>) -> Self {
1007        Rowset {
1008            inner: RowsetInner::Tcp(stream),
1009            schema_cache: None,
1010            _statement_guard: None,
1011        }
1012    }
1013
1014    /// Creates a new Rowset from Arrow IPC data (gRPC).
1015    pub(crate) fn from_arrow(arrow_rowset: ArrowRowset) -> Self {
1016        Rowset {
1017            inner: RowsetInner::Arrow(arrow_rowset),
1018            schema_cache: None,
1019            _statement_guard: None,
1020        }
1021    }
1022
1023    /// Creates a new Rowset from a prepared-statement streaming result.
1024    pub(crate) fn from_prepared(
1025        stream: hyperdb_api_core::client::PreparedQueryStream<'conn>,
1026    ) -> Self {
1027        Rowset {
1028            inner: RowsetInner::Prepared(stream),
1029            schema_cache: None,
1030            _statement_guard: None,
1031        }
1032    }
1033
1034    #[expect(
1035        clippy::used_underscore_binding,
1036        reason = "underscore-prefixed parameter retained for trait-method signature compatibility"
1037    )]
1038    /// Attaches a `OwnedPreparedStatement` that should be dropped
1039    /// **after** this rowset is consumed. Used by the one-shot
1040    /// prepare+execute path inside
1041    /// [`crate::Connection::query_params`] so the statement's
1042    /// Drop-time close doesn't deadlock on the rowset's still-held
1043    /// connection lock.
1044    pub(crate) fn with_statement_guard(
1045        mut self,
1046        statement: hyperdb_api_core::client::OwnedPreparedStatement,
1047    ) -> Self {
1048        self._statement_guard = Some(statement);
1049        self
1050    }
1051
1052    /// Returns the schema (column metadata) for the result set.
1053    ///
1054    /// For TCP connections, the schema is captured from the `RowDescription` message
1055    /// after the first chunk is read. For gRPC connections, the schema is available
1056    /// immediately from the Arrow data.
1057    ///
1058    /// Returns `None` if no data has been read yet (TCP only).
1059    ///
1060    /// # Example
1061    ///
1062    /// ```no_run
1063    /// # use hyperdb_api::{Connection, Result};
1064    /// # fn example(conn: &Connection) -> Result<()> {
1065    /// let mut result = conn.execute_query("SELECT id, name FROM users")?;
1066    /// // Read first chunk to capture schema (TCP) or get it immediately (gRPC)
1067    /// let _ = result.next_chunk()?;
1068    /// if let Some(schema) = result.schema() {
1069    ///     for col in schema.columns() {
1070    ///         println!("Column: {} ({})", col.name(), col.sql_type());
1071    ///     }
1072    /// }
1073    /// # Ok(())
1074    /// # }
1075    /// ```
1076    #[must_use]
1077    pub fn schema(&self) -> Option<ResultSchema> {
1078        // Fast path: cache already populated by a previous call or by
1079        // `next_chunk`. Clone from the Arc so external callers get an
1080        // owned value independent of internal lifetimes.
1081        if let Some(ref cached) = self.schema_cache {
1082            return Some((**cached).clone());
1083        }
1084        // Slow path: schema hasn't been materialized yet. Build it from
1085        // the transport without populating the cache — `schema()` takes
1086        // `&self`, so mutation isn't possible here. `next_chunk` does
1087        // the caching pass for the row-construction hot path; if a
1088        // caller really only wants the schema and never touches rows,
1089        // they pay one build per call but this is rarely the pattern.
1090        self.build_schema()
1091    }
1092
1093    /// Compute the current schema without populating the cache.
1094    ///
1095    /// Pulls column metadata from the underlying transport and
1096    /// constructs a fresh `ResultSchema`. TCP builds `SqlType` via
1097    /// [`SqlType::from_oid_and_modifier`] so
1098    /// `NUMERIC(precision, scale)` and `VARCHAR(n)` recover their
1099    /// declared parameters from the `RowDescription` `atttypmod`
1100    /// field — dropping the modifier (which bare
1101    /// [`SqlType::from_oid`] does) silently turns every `NUMERIC`
1102    /// into `(precision: 0, scale: 0)` and corrupts decimal decodes
1103    /// downstream. Arrow comes pre-typed via
1104    /// `arrow_type_to_sql_type`.
1105    fn build_schema(&self) -> Option<ResultSchema> {
1106        match &self.inner {
1107            RowsetInner::Tcp(stream) => stream.schema().map(|cols| {
1108                let columns = cols
1109                    .iter()
1110                    .enumerate()
1111                    .map(|(idx, col)| {
1112                        let sql_type =
1113                            SqlType::from_oid_and_modifier(col.type_oid().0, col.type_modifier());
1114                        ResultColumn::new(col.name(), sql_type, idx)
1115                    })
1116                    .collect();
1117                ResultSchema::from_columns(columns)
1118            }),
1119            RowsetInner::Arrow(arrow) => {
1120                let schema = arrow.schema();
1121                let columns = schema
1122                    .fields()
1123                    .iter()
1124                    .enumerate()
1125                    .map(|(idx, field)| {
1126                        ResultColumn::new(
1127                            field.name(),
1128                            crate::arrow_result::arrow_type_to_sql_type(field.data_type()),
1129                            idx,
1130                        )
1131                    })
1132                    .collect();
1133                Some(ResultSchema::from_columns(columns))
1134            }
1135            // Prepared statements: schema was captured at prepare time,
1136            // so it is always available immediately.
1137            RowsetInner::Prepared(stream) => {
1138                let cols = stream.schema();
1139                let columns = cols
1140                    .iter()
1141                    .enumerate()
1142                    .map(|(idx, col)| {
1143                        let sql_type =
1144                            SqlType::from_oid_and_modifier(col.type_oid().0, col.type_modifier());
1145                        ResultColumn::new(col.name(), sql_type, idx)
1146                    })
1147                    .collect();
1148                Some(ResultSchema::from_columns(columns))
1149            }
1150        }
1151    }
1152
1153    /// Populate `schema_cache` if not yet set, then return an `Arc`
1154    /// clone of the cached schema for row construction. Called by
1155    /// `next_chunk` so every row produced gets a cheap schema
1156    /// reference without re-building the `ResultSchema` per chunk.
1157    fn cached_schema_arc(&mut self) -> Option<Arc<ResultSchema>> {
1158        if self.schema_cache.is_none() {
1159            if let Some(schema) = self.build_schema() {
1160                self.schema_cache = Some(Arc::new(schema));
1161            }
1162        }
1163        self.schema_cache.clone()
1164    }
1165
1166    /// Returns the next chunk of rows from the result set.
1167    ///
1168    /// Each chunk contains up to `chunk_size` rows (default 64K).
1169    /// Returns `Ok(None)` when all rows have been consumed.
1170    ///
1171    /// # Example
1172    ///
1173    /// ```no_run
1174    /// # use hyperdb_api::{Rowset, Result};
1175    /// # fn example(mut result: Rowset) -> Result<()> {
1176    /// while let Some(chunk) = result.next_chunk()? {
1177    ///     for row in &chunk {
1178    ///         let id: Option<i32> = row.get(0);  // Generic typed access
1179    ///         let value = row.get_f64(1);        // Direct accessor
1180    ///     }
1181    /// }
1182    /// # Ok(())
1183    /// # }
1184    /// ```
1185    ///
1186    /// # Errors
1187    ///
1188    /// - Returns [`crate::Error::Server`] if the server sends an `ErrorResponse`
1189    ///   while streaming the result set.
1190    /// - Returns [`crate::Error::Io`] on transport-level I/O failures.
1191    /// - Returns [`crate::Error::Conversion`] if an Arrow IPC chunk cannot be decoded.
1192    pub fn next_chunk(&mut self) -> Result<Option<Vec<Row>>> {
1193        // Pull the next raw chunk from the underlying transport first;
1194        // on TCP, this is what makes the `RowDescription` bytes arrive
1195        // so we can cache the schema in the step below. We collect a
1196        // `TransportChunk` instead of a `Vec<Row>` directly so the
1197        // schema can be attached after we've populated the cache.
1198        enum TransportChunk {
1199            Tcp(Vec<StreamRow>),
1200            Arrow(Arc<RecordBatch>),
1201        }
1202
1203        let chunk_opt: Option<TransportChunk> = match &mut self.inner {
1204            RowsetInner::Tcp(stream) => stream.next_chunk()?.map(TransportChunk::Tcp),
1205            RowsetInner::Arrow(arrow) => arrow
1206                .next_chunk()?
1207                .map(|chunk| TransportChunk::Arrow(Arc::new(chunk.into_batch()))),
1208            RowsetInner::Prepared(stream) => stream.next_chunk()?.map(TransportChunk::Tcp),
1209        };
1210
1211        let Some(chunk) = chunk_opt else {
1212            return Ok(None);
1213        };
1214
1215        // Populate the schema cache if not already set, then clone the
1216        // Arc into each Row so `Row::get::<Numeric>` and friends can
1217        // look up per-column precision / scale without any caller
1218        // having to thread the schema through manually.
1219        let schema = self.cached_schema_arc();
1220        let rows = match chunk {
1221            TransportChunk::Tcp(stream_rows) => stream_rows
1222                .into_iter()
1223                .map(|row| Row::from_tcp(row, schema.clone()))
1224                .collect(),
1225            TransportChunk::Arrow(batch) => (0..batch.num_rows())
1226                .map(|row_index| Row::from_arrow(Arc::clone(&batch), row_index, schema.clone()))
1227                .collect(),
1228        };
1229        Ok(Some(rows))
1230    }
1231
1232    /// Returns an iterator over all rows in the result set.
1233    ///
1234    /// This provides a C++-like iteration experience while maintaining
1235    /// Rust's explicit error handling. Chunks are fetched internally
1236    /// as needed, keeping memory usage constant.
1237    ///
1238    /// # Example
1239    ///
1240    /// ```no_run
1241    /// # use hyperdb_api::{Connection, Result};
1242    /// # fn example(conn: &Connection) -> Result<()> {
1243    /// // Simple iteration (like C++)
1244    /// let result = conn.execute_query("SELECT * FROM users")?;
1245    /// for row in result.rows() {
1246    ///     let row = row?;  // Handle potential network errors
1247    ///     let id: Option<i32> = row.get(0);
1248    ///     let name: Option<String> = row.get(1);
1249    ///     println!("User: {:?} - {:?}", id, name);
1250    /// }
1251    /// # Ok(())
1252    /// # }
1253    /// ```
1254    ///
1255    /// # Error Handling
1256    ///
1257    /// Unlike C++ which uses exceptions, Rust requires explicit error handling.
1258    /// Each item in the iterator is a `Result<LightweightRow>` to handle
1259    /// potential network or protocol errors during streaming.
1260    ///
1261    /// # Comparison with `next_chunk()`
1262    ///
1263    /// | Aspect | `rows()` | `next_chunk()` |
1264    /// |--------|----------|----------------|
1265    /// | Syntax | Simpler, C++-like | More verbose |
1266    /// | Error handling | Per-row with `?` | Per-chunk |
1267    /// | Batch ops | Use `.collect()` | Natural |
1268    /// | Best for | Simple iteration | Batch processing |
1269    #[must_use]
1270    pub fn rows(self) -> RowIterator<'conn> {
1271        RowIterator {
1272            rowset: self,
1273            current_iter: Vec::new().into_iter(),
1274        }
1275    }
1276
1277    /// Collects all rows into a Vec.
1278    ///
1279    /// This is a convenience method that handles error collection more elegantly
1280    /// than the standard `collect::<Result<Vec<_>, _>>()` pattern.
1281    ///
1282    /// # Example
1283    ///
1284    /// ```no_run
1285    /// # use hyperdb_api::{Connection, Result};
1286    /// # fn example(conn: &Connection) -> Result<()> {
1287    /// let result = conn.execute_query("SELECT id, name FROM users")?;
1288    /// let rows = result.collect_rows()?;  // Much cleaner than collect::<Result<Vec<_>, _>>()
1289    ///
1290    /// for row in rows {
1291    ///     let id: Option<i32> = row.get(0);
1292    ///     let name: Option<String> = row.get(1);
1293    ///     println!("User: {:?} - {:?}", id, name);
1294    /// }
1295    /// # Ok(())
1296    /// # }
1297    /// ```
1298    ///
1299    /// # Errors
1300    ///
1301    /// Returns the first error produced by [`next_chunk`](Self::next_chunk)
1302    /// while draining the stream (transport I/O failure or server-side
1303    /// error).
1304    pub fn collect_rows(self) -> crate::error::Result<Vec<Row>> {
1305        self.rows().collect::<crate::error::Result<Vec<_>>>()
1306    }
1307
1308    /// Collects the first column of each row into a Vec.
1309    ///
1310    /// This is useful for single-column queries or when you only need one column.
1311    ///
1312    /// # Example
1313    ///
1314    /// ```no_run
1315    /// # use hyperdb_api::{Connection, Result};
1316    /// # fn example(conn: &Connection) -> Result<()> {
1317    /// let result = conn.execute_query("SELECT name FROM users")?;
1318    /// let names: Vec<Option<String>> = result.collect_column()?;
1319    ///
1320    /// for name in names {
1321    ///     if let Some(name) = name {
1322    ///         println!("User: {}", name);
1323    ///     }
1324    /// }
1325    /// # Ok(())
1326    /// # }
1327    /// ```
1328    ///
1329    /// # Errors
1330    ///
1331    /// Returns the first streaming error from
1332    /// [`next_chunk`](Self::next_chunk). SQL `NULL` cells yield
1333    /// `Option::None` entries, not errors.
1334    pub fn collect_column<T: crate::result::RowValue>(
1335        self,
1336    ) -> crate::error::Result<Vec<Option<T>>> {
1337        self.rows()
1338            .map(|row| row.map(|r| r.get::<T>(0)))
1339            .collect::<crate::error::Result<Vec<_>>>()
1340    }
1341
1342    /// Collects the first column, filtering out NULL values.
1343    ///
1344    /// This is useful when you know the column doesn't contain NULLs or want to ignore them.
1345    ///
1346    /// # Example
1347    ///
1348    /// ```no_run
1349    /// # use hyperdb_api::{Connection, Result};
1350    /// # fn example(conn: &Connection) -> Result<()> {
1351    /// let result = conn.execute_query("SELECT name FROM users WHERE name IS NOT NULL")?;
1352    /// let names: Vec<String> = result.collect_column_non_null()?;
1353    ///
1354    /// for name in names {
1355    ///     println!("User: {}", name);  // No need to handle Option
1356    /// }
1357    /// # Ok(())
1358    /// # }
1359    /// ```
1360    ///
1361    /// # Errors
1362    ///
1363    /// Returns the first streaming error from
1364    /// [`collect_column`](Self::collect_column).
1365    pub fn collect_column_non_null<T: crate::result::RowValue>(
1366        self,
1367    ) -> crate::error::Result<Vec<T>> {
1368        Ok(self.collect_column::<T>()?.into_iter().flatten().collect())
1369    }
1370
1371    /// Gets the first row of the result set.
1372    ///
1373    /// This is useful for queries that are expected to return exactly one row,
1374    /// such as aggregate queries or lookups by unique key.
1375    ///
1376    /// # Example
1377    ///
1378    /// ```no_run
1379    /// # use hyperdb_api::{Connection, Result};
1380    /// # fn example(conn: &Connection) -> Result<()> {
1381    /// let result = conn.execute_query("SELECT COUNT(*) FROM users")?;
1382    /// if let Some(row) = result.first_row()? {
1383    ///     let count: Option<i64> = row.get(0);
1384    ///     println!("User count: {:?}", count);
1385    /// }
1386    /// # Ok(())
1387    /// # }
1388    /// ```
1389    ///
1390    /// # Errors
1391    ///
1392    /// Returns the error from [`next_chunk`](Self::next_chunk). An empty
1393    /// result set yields `Ok(None)`, not an error.
1394    pub fn first_row(mut self) -> crate::error::Result<Option<Row>> {
1395        if let Some(chunk) = self.next_chunk()? {
1396            Ok(chunk.into_iter().next())
1397        } else {
1398            Ok(None)
1399        }
1400    }
1401
1402    /// Gets the first row or returns an error if no rows were found.
1403    ///
1404    /// This is useful when you expect exactly one row and want to fail if that's not the case.
1405    ///
1406    /// # Example
1407    ///
1408    /// ```no_run
1409    /// # use hyperdb_api::{Connection, Result};
1410    /// # fn example(conn: &Connection) -> Result<()> {
1411    /// let result = conn.execute_query("SELECT id, name FROM users WHERE id = 1")?;
1412    /// let row = result.require_first_row()?;  // Fails if no row found
1413    /// let id: Option<i32> = row.get(0);
1414    /// let name: Option<String> = row.get(1);
1415    /// println!("Found user: {:?} - {:?}", id, name);
1416    /// # Ok(())
1417    /// # }
1418    /// ```
1419    ///
1420    /// # Errors
1421    ///
1422    /// - Returns the error from [`first_row`](Self::first_row).
1423    /// - Returns [`crate::Error::Conversion`] with message `"Query returned no rows"`
1424    ///   if the result set is empty.
1425    pub fn require_first_row(self) -> crate::error::Result<Row> {
1426        self.first_row()?
1427            .ok_or_else(|| crate::error::Error::conversion("Query returned no rows"))
1428    }
1429
1430    /// Gets a scalar value from the first row, first column.
1431    ///
1432    /// This is a convenience method for scalar queries like `SELECT COUNT(*)` or `SELECT MAX(id)`.
1433    ///
1434    /// # Example
1435    ///
1436    /// ```no_run
1437    /// # use hyperdb_api::{Connection, Result};
1438    /// # fn example(conn: &Connection) -> Result<()> {
1439    /// let result = conn.execute_query("SELECT COUNT(*) FROM users")?;
1440    /// let count: Option<i64> = result.scalar()?;  // Much cleaner than manual row handling
1441    /// println!("User count: {:?}", count);
1442    /// # Ok(())
1443    /// # }
1444    /// ```
1445    ///
1446    /// # Errors
1447    ///
1448    /// Returns the error from [`require_first_row`](Self::require_first_row):
1449    /// streaming error or empty result. SQL `NULL` in the single cell
1450    /// yields `Ok(None)`.
1451    pub fn scalar<T: crate::result::RowValue>(self) -> crate::error::Result<Option<T>> {
1452        Ok(self.require_first_row()?.get(0))
1453    }
1454
1455    /// Gets a scalar value from the first row, first column, or returns an error if NULL.
1456    ///
1457    /// This is useful when you expect a non-NULL scalar result.
1458    ///
1459    /// # Example
1460    ///
1461    /// ```no_run
1462    /// # use hyperdb_api::{Connection, Result};
1463    /// # fn example(conn: &Connection) -> Result<()> {
1464    /// let result = conn.execute_query("SELECT COUNT(*) FROM users")?;
1465    /// let count: i64 = result.require_scalar()?;  // Fails if NULL
1466    /// println!("User count: {}", count);
1467    /// # Ok(())
1468    /// # }
1469    /// ```
1470    ///
1471    /// # Errors
1472    ///
1473    /// - Returns the error from [`scalar`](Self::scalar).
1474    /// - Returns [`crate::Error::Conversion`] with message `"Scalar query returned NULL"`
1475    ///   if the single cell is SQL `NULL`.
1476    pub fn require_scalar<T: crate::result::RowValue>(self) -> crate::error::Result<T> {
1477        self.scalar()?
1478            .ok_or_else(|| crate::error::Error::conversion("Scalar query returned NULL"))
1479    }
1480}
1481
1482// =============================================================================
1483// RowIterator - C++-like iteration over query results
1484// =============================================================================
1485
1486/// An iterator over rows in a query result set.
1487///
1488/// `RowIterator` provides a C++-like iteration experience, hiding the
1489/// chunked fetching internally. Each call to `next()` returns the next
1490/// row, automatically fetching new chunks as needed.
1491///
1492/// # Memory Behavior
1493///
1494/// Memory usage remains constant regardless of result set size:
1495/// - Internally fetches 64K rows at a time
1496/// - Previous chunks are dropped when exhausted
1497/// - Safe for billion-row results
1498///
1499/// # Example
1500///
1501/// ```no_run
1502/// # use hyperdb_api::{Connection, Result};
1503/// # fn example(conn: &Connection) -> Result<()> {
1504/// let result = conn.execute_query("SELECT id, name FROM users")?;
1505/// for row in result.rows() {
1506///     let row = row?;
1507///     let id = row.get_i32(0).unwrap_or(-1);
1508///     let name = row.get::<String>(1).unwrap_or_default();
1509///     println!("{}: {}", id, name);
1510/// }
1511/// # Ok(())
1512/// # }
1513/// ```
1514///
1515/// # Error Handling
1516///
1517/// Each iteration yields a `Result<Row>`. Errors can occur
1518/// when fetching new chunks from the server (network issues, protocol
1519/// errors, etc.). Use `?` or match to handle them:
1520///
1521/// ```no_run
1522/// # use hyperdb_api::{Rowset, Result};
1523/// # fn example(mut result: Rowset) -> Result<()> {
1524/// // Using ? in a function that returns Result
1525/// for row in result.rows() {
1526///     let row = row?;
1527///     // process row...
1528/// }
1529/// # Ok(())
1530/// # }
1531/// # fn example2(mut result: Rowset) -> Result<()> {
1532/// // Using try_for_each
1533/// result.rows().try_for_each(|row| -> Result<()> {
1534///     let row = row?;
1535///     // process row...
1536///     Ok(())
1537/// })?;
1538/// # Ok(())
1539/// # }
1540/// ```
1541#[derive(Debug)]
1542pub struct RowIterator<'conn> {
1543    rowset: Rowset<'conn>,
1544    current_iter: std::vec::IntoIter<Row>,
1545}
1546
1547impl Iterator for RowIterator<'_> {
1548    type Item = Result<Row>;
1549
1550    fn next(&mut self) -> Option<Self::Item> {
1551        // Try to get next row from current chunk
1552        if let Some(row) = self.current_iter.next() {
1553            return Some(Ok(row));
1554        }
1555
1556        // Current chunk exhausted, fetch next chunk
1557        match self.rowset.next_chunk() {
1558            Ok(Some(chunk)) => {
1559                self.current_iter = chunk.into_iter();
1560                // Return first row of new chunk
1561                self.current_iter.next().map(Ok)
1562            }
1563            Ok(None) => None,       // No more rows
1564            Err(e) => Some(Err(e)), // Error fetching chunk
1565        }
1566    }
1567}
1568
1569/// Iterator over rows of a result set with `FromRow` deserialization.
1570///
1571/// Returned by [`Connection::stream_as`]. Lazily fetches chunks from the
1572/// server and maps each row to `T` via [`FromRow::from_row`]. Memory usage is
1573/// bounded by the chunk size: at most one chunk of rows is held in memory at a
1574/// time.
1575///
1576/// The column-name → index lookup table is built exactly once (on the first
1577/// non-empty chunk) and reused for all rows, making per-row mapping O(1) in
1578/// column count.
1579///
1580/// [`Connection::stream_as`]: crate::Connection::stream_as
1581/// [`FromRow::from_row`]: crate::FromRow::from_row
1582///
1583/// # Example
1584///
1585/// ```no_run
1586/// # use hyperdb_api::{Connection, CreateMode, FromRow, RowAccessor, Result};
1587/// # struct User { id: i32, name: String }
1588/// # impl FromRow for User {
1589/// #     fn from_row(row: RowAccessor<'_>) -> Result<Self> {
1590/// #         Ok(User { id: row.get("id")?, name: row.get("name")? })
1591/// #     }
1592/// # }
1593/// # fn example(conn: &Connection) -> Result<()> {
1594/// for row in conn.stream_as::<User>("SELECT id, name FROM users")? {
1595///     let user = row?;
1596///     println!("{}: {}", user.id, user.name);
1597/// }
1598/// # Ok(())
1599/// # }
1600/// ```
1601///
1602/// # Errors
1603///
1604/// Each yielded item is a `Result<T>`:
1605/// - `Ok(T)` if the row was successfully mapped via `FromRow`.
1606/// - `Err(e)` if mapping failed (e.g., missing column, type mismatch, NULL in
1607///   a non-optional field).
1608///
1609/// Transport-level errors (chunk-fetching failures) are also surfaced as
1610/// `Err` items.
1611pub(crate) struct TypedRowIterator<'conn, T> {
1612    rowset: Rowset<'conn>,
1613    current_iter: std::vec::IntoIter<Row>,
1614    indices: Option<std::collections::HashMap<String, usize>>,
1615    _marker: std::marker::PhantomData<fn() -> T>,
1616}
1617
1618impl<T> std::fmt::Debug for TypedRowIterator<'_, T> {
1619    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1620        f.debug_struct("TypedRowIterator")
1621            .field("rowset", &self.rowset)
1622            .field("current_iter_len", &self.current_iter.len())
1623            .field("indices_built", &self.indices.is_some())
1624            .finish_non_exhaustive()
1625    }
1626}
1627
1628impl<'conn, T> TypedRowIterator<'conn, T> {
1629    /// Constructs a new `TypedRowIterator` over the given rowset.
1630    /// Crate-internal: callers go through `Connection::stream_as`.
1631    pub(crate) fn new(rowset: Rowset<'conn>) -> Self {
1632        Self {
1633            rowset,
1634            current_iter: Vec::new().into_iter(),
1635            indices: None,
1636            _marker: std::marker::PhantomData,
1637        }
1638    }
1639}
1640
1641impl<T: crate::FromRow> Iterator for TypedRowIterator<'_, T> {
1642    type Item = Result<T>;
1643
1644    fn next(&mut self) -> Option<Self::Item> {
1645        loop {
1646            // Try to get next row from current chunk. `get_or_insert_with` is
1647            // a no-op here (the map was already built when this chunk was
1648            // fetched, below) but lets us borrow it without an Option unwrap.
1649            if let Some(row) = self.current_iter.next() {
1650                let indices = self.indices.get_or_insert_with(Default::default);
1651                return Some(T::from_row(crate::RowAccessor::new_owned(&row, indices)));
1652            }
1653
1654            // Current chunk exhausted, fetch next chunk
1655            match self.rowset.next_chunk() {
1656                Ok(Some(chunk)) => {
1657                    // Build the index map once, on the first chunk. The schema
1658                    // is populated by `next_chunk` as a side effect. If the
1659                    // schema is somehow unavailable, fall back to an empty map
1660                    // so column lookups surface a `Missing` error per row —
1661                    // matching `fetch_all_as`'s `unwrap_or_default()` rather
1662                    // than silently truncating the stream.
1663                    if self.indices.is_none() {
1664                        let map = self
1665                            .rowset
1666                            .schema()
1667                            .map(|schema| crate::RowAccessor::build_owned_indices(&schema))
1668                            .unwrap_or_default();
1669                        self.indices = Some(map);
1670                    }
1671                    self.current_iter = chunk.into_iter();
1672                    // loop around to drain the freshly fetched chunk
1673                }
1674                Ok(None) => return None,       // No more rows
1675                Err(e) => return Some(Err(e)), // Error fetching chunk
1676            }
1677        }
1678    }
1679}
1680
1681// =============================================================================
1682// Unit tests that don't need a live hyperd backend.
1683//
1684// Anything requiring a real Hyper process lives in `hyperdb-api/tests/*.rs` where
1685// `TestConnection` spins up a `HyperProcess` per test. These tests exercise
1686// pure in-process logic — specifically the Arrow-path branches of
1687// `Row::get_numeric`, where we can construct a synthetic `RecordBatch` with a
1688// specific `DataType::Decimal128(p, s)` descriptor and probe `Row`'s
1689// handling of it without hyperd in the loop.
1690// =============================================================================
1691
1692#[cfg(test)]
1693mod arrow_path_tests {
1694    use super::*;
1695    use arrow::array::Decimal128Array;
1696    use arrow::datatypes::{DataType as ArrowType, Field, Schema};
1697
1698    /// Build a single-row `RecordBatch` with a Decimal128 column whose
1699    /// value is `raw` and whose precision/scale are those passed in.
1700    fn decimal128_batch(raw: i128, precision: u8, scale: i8) -> Arc<RecordBatch> {
1701        let array = Decimal128Array::from(vec![Some(raw)])
1702            .with_precision_and_scale(precision, scale)
1703            .expect("valid Arrow Decimal128");
1704        let field = Field::new("v", ArrowType::Decimal128(precision, scale), true);
1705        let schema = Arc::new(Schema::new(vec![field]));
1706        Arc::new(RecordBatch::try_new(schema, vec![Arc::new(array)]).expect("batch"))
1707    }
1708
1709    /// Happy-path: a positive-scale Arrow Decimal128 decodes correctly
1710    /// via `row.get::<Numeric>()`, locking in the common case alongside
1711    /// the negative-scale test below.
1712    #[test]
1713    fn get_numeric_reads_arrow_decimal128_with_positive_scale() {
1714        // NUMERIC(10, 2), unscaled value 123 → 1.23
1715        let batch = decimal128_batch(123, 10, 2);
1716        let row = Row::from_arrow(Arc::clone(&batch), 0, None);
1717
1718        let numeric = row.get_numeric(0).expect("Some for positive-scale decimal");
1719        assert_eq!(numeric.unscaled_value(), 123);
1720        assert_eq!(numeric.scale(), 2);
1721        assert!((numeric.to_f64() - 1.23).abs() < 1e-9);
1722
1723        // Same result via the generic `row.get::<Numeric>` path.
1724        let via_rowvalue: hyperdb_api_core::types::Numeric =
1725            row.get(0).expect("RowValue path agrees with get_numeric");
1726        assert_eq!(via_rowvalue, numeric);
1727    }
1728
1729    /// Arrow's `DataType::Decimal128(u8, i8)` allows negative scale —
1730    /// a legitimate Arrow concept meaning "raw × 10^abs(scale)" (e.g.
1731    /// scale=-2 with raw=5 renders as 500). Hyper's `Numeric` uses a
1732    /// `u8` scale with no representation for that multiplier.
1733    ///
1734    /// The earlier `.max(0) as u8` code silently clamped the scale to
1735    /// 0 while keeping `raw` unchanged — which produces a value with
1736    /// the wrong magnitude (`5` instead of `500` in the example
1737    /// above). The fix here is to reject negative scales via
1738    /// `try_into` + `?`, which surfaces as `None` to the caller.
1739    /// That's strictly safer than a silent-wrong-magnitude value.
1740    #[test]
1741    fn get_numeric_rejects_arrow_decimal128_with_negative_scale() {
1742        // NUMERIC(10, -2) — Arrow allows this; Hyper's Numeric can't
1743        // represent it. Our `get_numeric` must return None rather
1744        // than silently drop the negative-scale multiplier.
1745        let batch = decimal128_batch(5, 10, -2);
1746        let row = Row::from_arrow(Arc::clone(&batch), 0, None);
1747
1748        assert!(
1749            row.get_numeric(0).is_none(),
1750            "negative Arrow scale must not produce a silently-wrong-magnitude Numeric",
1751        );
1752
1753        // And the same through the `RowValue` blanket path.
1754        let via_rowvalue: Option<hyperdb_api_core::types::Numeric> = row.get(0);
1755        assert!(via_rowvalue.is_none());
1756    }
1757
1758    /// Boundary: scale = 0 is a legal `u8` and must still succeed.
1759    /// Guards against an over-tightened check that accidentally
1760    /// rejects zero along with negatives.
1761    #[test]
1762    fn get_numeric_accepts_arrow_decimal128_with_zero_scale() {
1763        let batch = decimal128_batch(42, 10, 0);
1764        let row = Row::from_arrow(Arc::clone(&batch), 0, None);
1765        let numeric = row.get_numeric(0).expect("scale 0 is fine");
1766        assert_eq!(numeric.unscaled_value(), 42);
1767        assert_eq!(numeric.scale(), 0);
1768    }
1769}