Skip to main content

hyperdb_api/
result.rs

1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Query result handling with type-safe value access.
5//!
6//! This module provides types for working with query results:
7//! - [`Rowset`] — Streaming result set with memory-efficient chunked iteration
8//! - [`RowIterator`] — C++-like iterator for simple row-by-row processing
9//! - [`ResultSchema`] — Column metadata (names and types)
10//!
11//! # Streaming Design
12//!
13//! Query results are streamed from the server in chunks of up to
14//! [`DEFAULT_BINARY_CHUNK_SIZE`] rows (64K). Only one chunk is held in memory
15//! at a time, so memory usage is `O(chunk_size)` regardless of total result
16//! size — safe for billion-row results.
17//!
18//! # Iteration Patterns
19//!
20//! Two patterns are available, both streaming with constant memory:
21//!
22//! ## Pattern 1: Chunked (`next_chunk()`) — batch processing
23//!
24//! Best for high-throughput scenarios. Error checking happens once per chunk
25//! (~64K rows), and you get direct `Vec<Row>` iteration with good cache
26//! locality. Natural for batch operations, vectorized processing, or
27//! parallelizing across chunks.
28//!
29//! ```no_run
30//! # use hyperdb_api::{Connection, CreateMode, Result};
31//! # fn example(conn: &Connection) -> Result<()> {
32//! let mut result = conn.execute_query("SELECT * FROM table")?;
33//! while let Some(chunk) = result.next_chunk()? {
34//!     for row in &chunk {
35//!         let id: Option<i32> = row.get(0);
36//!         let value: Option<f64> = row.get(1);
37//!     }
38//! }
39//! # Ok(())
40//! # }
41//! ```
42//!
43//! ## Pattern 2: Iterator (`rows()`) — simple row-by-row
44//!
45//! Best for simple iteration where you process one row at a time. Each item
46//! is `Result<Row>` since chunk fetches can fail, so error checking happens
47//! per-row. The extra iterator wrapper adds slight overhead compared to
48//! `next_chunk()`.
49//!
50//! ```no_run
51//! # use hyperdb_api::{Connection, Result};
52//! # fn example(conn: &Connection) -> Result<()> {
53//! let result = conn.execute_query("SELECT * FROM table")?;
54//! for row in result.rows() {
55//!     let row = row?;  // Handle potential errors
56//!     let id: Option<i32> = row.get(0);
57//!     let value: Option<f64> = row.get(1);
58//! }
59//! # Ok(())
60//! # }
61//! ```
62//!
63//! **When to use which:**
64//! - `rows()` — simple iteration, one row at a time, small overhead acceptable
65//! - `next_chunk()` — maximum performance, large result sets, batch operations
66//!
67//! # Type Coercion
68//!
69//! The generic `row.get::<T>()` method supports automatic widening coercion:
70//!
71//! | Request Type | Coerces From |
72//! |---|---|
73//! | `i32` | `i16` |
74//! | `i64` | `i32`, `i16` |
75//! | `f64` | `f32` |
76//!
77//! Direct accessors (`row.get_i32()`, `row.get_f64()`) skip coercion for
78//! slightly better performance when the exact type is known.
79
80use std::sync::Arc;
81
82use arrow::array::Array;
83use arrow::record_batch::RecordBatch;
84use hyperdb_api_core::client::QueryStream;
85use hyperdb_api_core::client::StreamRow;
86use hyperdb_api_core::types::SqlType;
87
88use crate::arrow_result::{ArrowRowset, FromArrowValue};
89use crate::error::Result;
90
91/// Default chunk size for streaming queries (64K rows).
92pub(crate) const DEFAULT_BINARY_CHUNK_SIZE: usize = 65536;
93
94// =============================================================================
95// Row - Unified row type for both TCP and gRPC
96// =============================================================================
97
98/// A row from a query result, providing typed value access.
99///
100/// This type abstracts over the underlying transport (TCP or gRPC),
101/// providing a consistent API for accessing column values regardless
102/// of how the data was retrieved.
103///
104/// # Example
105///
106/// ```no_run
107/// # use hyperdb_api::Result;
108/// # fn example(result: hyperdb_api::Rowset) -> Result<()> {
109/// for row in result.rows() {
110///     let row = row?;
111///     let id: Option<i32> = row.get(0);
112///     let name: Option<String> = row.get(1);
113///     // Or use direct accessors
114///     let value = row.get_f64(2);
115/// }
116/// # Ok(())
117/// # }
118/// ```
119pub struct Row {
120    inner: RowInner,
121    /// Shared schema reference for the parent rowset. Every row
122    /// produced by [`Rowset::next_chunk`] carries this (cloned cheaply
123    /// from an `Arc`) so that metadata-dependent decoders like
124    /// [`Self::get_numeric`] can look up `SqlType` per column without
125    /// the caller plumbing scale through manually. `None` only in the
126    /// unusual case a row is constructed outside `next_chunk` (no such
127    /// path exists in-tree today; the field is `Option` so future
128    /// schemas-unavailable paths remain compilable).
129    schema: Option<Arc<ResultSchema>>,
130}
131
132impl std::fmt::Debug for Row {
133    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
134        f.debug_struct("Row")
135            .field("has_schema", &self.schema.is_some())
136            .finish_non_exhaustive()
137    }
138}
139
140/// Internal per-transport backing for a [`Row`]. Not public: all
141/// consumer-visible API goes through `Row`'s methods, which dispatch
142/// on this enum internally.
143enum RowInner {
144    /// Row from TCP transport (`StreamRow`).
145    Tcp(StreamRow),
146    /// Row from gRPC transport (Arrow-backed).
147    Arrow {
148        /// The record batch containing this row's data.
149        batch: Arc<RecordBatch>,
150        /// Index of this row within the batch.
151        row_index: usize,
152    },
153}
154
155impl Row {
156    /// Construct a TCP-backed row with an attached schema reference.
157    #[inline]
158    pub(crate) fn from_tcp(row: StreamRow, schema: Option<Arc<ResultSchema>>) -> Self {
159        Row {
160            inner: RowInner::Tcp(row),
161            schema,
162        }
163    }
164
165    /// Construct an Arrow-backed row with an attached schema reference.
166    #[inline]
167    pub(crate) fn from_arrow(
168        batch: Arc<RecordBatch>,
169        row_index: usize,
170        schema: Option<Arc<ResultSchema>>,
171    ) -> Self {
172        Row {
173            inner: RowInner::Arrow { batch, row_index },
174            schema,
175        }
176    }
177
178    /// Returns the schema this row belongs to, if attached.
179    ///
180    /// Every row produced by [`Rowset::next_chunk`] has a schema
181    /// attached — so this returns `Some` for any row obtained through
182    /// the public API.
183    #[inline]
184    pub fn schema(&self) -> Option<&ResultSchema> {
185        self.schema.as_deref()
186    }
187
188    /// Returns the `SqlType` of the column at the given index, if the
189    /// schema is attached and the index is in bounds.
190    ///
191    /// Useful for metadata-dependent decoders like [`Self::get_numeric`]
192    /// that need per-column precision and scale. Most callers reach for
193    /// [`Self::get`] / [`Self::try_get`] instead, which handle this
194    /// lookup internally via the [`RowValue`] trait.
195    #[inline]
196    pub fn sql_type(&self, idx: usize) -> Option<SqlType> {
197        let schema = self.schema.as_deref()?;
198        if idx < schema.column_count() {
199            Some(schema.column(idx).sql_type())
200        } else {
201            None
202        }
203    }
204
205    /// Gets a typed value at the given column index.
206    ///
207    /// # Example
208    ///
209    /// ```no_run
210    /// # use hyperdb_api::Row;
211    /// # fn example(row: &Row) {
212    /// let id: Option<i32> = row.get(0);
213    /// let name: Option<String> = row.get(1);
214    /// # }
215    /// ```
216    #[inline]
217    pub fn get<T: RowValue>(&self, idx: usize) -> Option<T> {
218        T::from_row(self, idx)
219    }
220
221    /// Gets a typed value at the given column index, returning a `Result`
222    /// with a descriptive error on failure.
223    ///
224    /// Use this in [`FromRow`] implementations for better error messages
225    /// than bare `row.get(idx).ok_or(...)`.
226    ///
227    /// # Example
228    ///
229    /// ```no_run
230    /// # use hyperdb_api::{Row, FromRow, Result};
231    /// # struct User { id: i32, name: String }
232    /// impl FromRow for User {
233    ///     fn from_row(row: &Row) -> Result<Self> {
234    ///         Ok(User {
235    ///             id: row.try_get::<i32>(0, "id")?,
236    ///             name: row.try_get::<String>(1, "name")?,
237    ///         })
238    ///     }
239    /// }
240    /// ```
241    ///
242    /// # Errors
243    ///
244    /// - Returns [`crate::Error::Other`] if `idx` is out of bounds for the row's
245    ///   column count.
246    /// - Returns [`crate::Error::Other`] if the cell is SQL `NULL` or its value
247    ///   cannot be decoded as `T`.
248    pub fn try_get<T: RowValue>(&self, idx: usize, column_name: &str) -> crate::error::Result<T> {
249        if idx >= self.column_count() {
250            return Err(crate::error::Error::new(format!(
251                "Column index {} ({:?}) out of bounds — row has {} columns",
252                idx,
253                column_name,
254                self.column_count(),
255            )));
256        }
257        self.get::<T>(idx).ok_or_else(|| {
258            crate::error::Error::new(format!(
259                "Column {idx} ({column_name:?}) is NULL or has incompatible type",
260            ))
261        })
262    }
263
264    /// Returns an Arrow column reference, or `None` if the index is out of bounds.
265    ///
266    /// This is a safe wrapper around `batch.column(idx)` that avoids panicking.
267    #[inline]
268    fn arrow_column(batch: &RecordBatch, idx: usize) -> Option<&Arc<dyn Array>> {
269        if idx < batch.num_columns() {
270            Some(batch.column(idx))
271        } else {
272            None
273        }
274    }
275
276    /// Gets an i16 value at the given column index.
277    #[inline]
278    pub fn get_i16(&self, idx: usize) -> Option<i16> {
279        match &self.inner {
280            RowInner::Tcp(row) => row.get_i16(idx),
281            RowInner::Arrow { batch, row_index } => {
282                i16::from_arrow_column(Self::arrow_column(batch, idx)?, *row_index)
283            }
284        }
285    }
286
287    /// Gets an i32 value at the given column index.
288    #[inline]
289    pub fn get_i32(&self, idx: usize) -> Option<i32> {
290        match &self.inner {
291            RowInner::Tcp(row) => row.get_i32(idx),
292            RowInner::Arrow { batch, row_index } => {
293                i32::from_arrow_column(Self::arrow_column(batch, idx)?, *row_index)
294            }
295        }
296    }
297
298    /// Gets an i64 value at the given column index.
299    #[inline]
300    pub fn get_i64(&self, idx: usize) -> Option<i64> {
301        match &self.inner {
302            RowInner::Tcp(row) => row.get_i64(idx),
303            RowInner::Arrow { batch, row_index } => {
304                i64::from_arrow_column(Self::arrow_column(batch, idx)?, *row_index)
305            }
306        }
307    }
308
309    /// Gets an f32 value at the given column index.
310    #[inline]
311    pub fn get_f32(&self, idx: usize) -> Option<f32> {
312        match &self.inner {
313            RowInner::Tcp(row) => row.get_f32(idx),
314            RowInner::Arrow { batch, row_index } => {
315                f32::from_arrow_column(Self::arrow_column(batch, idx)?, *row_index)
316            }
317        }
318    }
319
320    /// Gets an f64 value at the given column index.
321    #[inline]
322    pub fn get_f64(&self, idx: usize) -> Option<f64> {
323        match &self.inner {
324            RowInner::Tcp(row) => row.get_f64(idx),
325            RowInner::Arrow { batch, row_index } => {
326                f64::from_arrow_column(Self::arrow_column(batch, idx)?, *row_index)
327            }
328        }
329    }
330
331    /// Gets a bool value at the given column index.
332    #[inline]
333    pub fn get_bool(&self, idx: usize) -> Option<bool> {
334        match &self.inner {
335            RowInner::Tcp(row) => row.get_bool(idx),
336            RowInner::Arrow { batch, row_index } => {
337                bool::from_arrow_column(Self::arrow_column(batch, idx)?, *row_index)
338            }
339        }
340    }
341
342    /// Gets a String value at the given column index.
343    #[inline]
344    pub fn get_string(&self, idx: usize) -> Option<String> {
345        match &self.inner {
346            RowInner::Tcp(row) => row.get_string(idx),
347            RowInner::Arrow { batch, row_index } => {
348                String::from_arrow_column(Self::arrow_column(batch, idx)?, *row_index)
349            }
350        }
351    }
352
353    /// Checks if the value at the given column is null.
354    #[inline]
355    pub fn is_null(&self, idx: usize) -> bool {
356        match &self.inner {
357            RowInner::Tcp(row) => row.is_null(idx),
358            RowInner::Arrow { batch, row_index } => match Self::arrow_column(batch, idx) {
359                Some(col) => col.is_null(*row_index),
360                None => true,
361            },
362        }
363    }
364
365    /// Returns the number of columns in this row.
366    #[inline]
367    pub fn column_count(&self) -> usize {
368        match &self.inner {
369            RowInner::Tcp(row) => row.column_count(),
370            RowInner::Arrow { batch, .. } => batch.num_columns(),
371        }
372    }
373
374    /// Gets raw bytes at the given column index.
375    ///
376    /// For TCP rows, returns the raw binary data. For Arrow rows, this method
377    /// is not available and returns None.
378    #[inline]
379    pub fn get_bytes(&self, idx: usize) -> Option<Vec<u8>> {
380        match &self.inner {
381            RowInner::Tcp(row) => row.get_bytes(idx).map(<[u8]>::to_vec),
382            RowInner::Arrow { batch, row_index } => {
383                Vec::<u8>::from_arrow_column(Self::arrow_column(batch, idx)?, *row_index)
384            }
385        }
386    }
387
388    /// Gets a Date value at the given column index.
389    #[inline]
390    pub fn get_date(&self, idx: usize) -> Option<hyperdb_api_core::types::Date> {
391        match &self.inner {
392            RowInner::Tcp(row) => row.get(idx),
393            RowInner::Arrow { batch, row_index } => {
394                // Arrow Date32 is days since Unix epoch (1970-01-01)
395                // Hyper Date is days since Hyper epoch (2000-01-01)
396                use arrow::array::Date32Array;
397                let col = Self::arrow_column(batch, idx)?;
398                let arr = col.as_any().downcast_ref::<Date32Array>()?;
399                if arr.is_null(*row_index) {
400                    return None;
401                }
402                let unix_days = arr.value(*row_index);
403                // Convert from Unix epoch to Hyper epoch (diff is 10957 days)
404                let hyper_days = unix_days - 10957;
405                Some(hyperdb_api_core::types::Date::from_days(hyper_days))
406            }
407        }
408    }
409
410    /// Gets a Time value at the given column index.
411    #[inline]
412    pub fn get_time(&self, idx: usize) -> Option<hyperdb_api_core::types::Time> {
413        match &self.inner {
414            RowInner::Tcp(row) => row.get(idx),
415            RowInner::Arrow { batch, row_index } => {
416                // Arrow Time64 is microseconds since midnight
417                use arrow::array::Time64MicrosecondArray;
418                let col = Self::arrow_column(batch, idx)?;
419                let arr = col.as_any().downcast_ref::<Time64MicrosecondArray>()?;
420                if arr.is_null(*row_index) {
421                    return None;
422                }
423                let micros = u64::try_from(arr.value(*row_index)).ok()?;
424                Some(hyperdb_api_core::types::Time::from_microseconds(micros))
425            }
426        }
427    }
428
429    /// Gets a Timestamp value at the given column index.
430    #[inline]
431    pub fn get_timestamp(&self, idx: usize) -> Option<hyperdb_api_core::types::Timestamp> {
432        match &self.inner {
433            RowInner::Tcp(row) => row.get(idx),
434            RowInner::Arrow { batch, row_index } => {
435                // Arrow Timestamp is microseconds since Unix epoch
436                // Hyper Timestamp is microseconds since Hyper epoch (2000-01-01)
437                use arrow::array::TimestampMicrosecondArray;
438                let col = Self::arrow_column(batch, idx)?;
439                let arr = col.as_any().downcast_ref::<TimestampMicrosecondArray>()?;
440                if arr.is_null(*row_index) {
441                    return None;
442                }
443                let unix_micros = arr.value(*row_index);
444                // Convert from Unix epoch to Hyper epoch
445                // 2000-01-01 is 946684800 seconds after 1970-01-01
446                let hyper_micros = unix_micros - 946_684_800_000_000;
447                Some(hyperdb_api_core::types::Timestamp::from_microseconds(
448                    hyper_micros,
449                ))
450            }
451        }
452    }
453
454    /// Gets an `OffsetTimestamp` (TIMESTAMP WITH TIME ZONE) value at the given column index.
455    #[inline]
456    pub fn get_offset_timestamp(
457        &self,
458        idx: usize,
459    ) -> Option<hyperdb_api_core::types::OffsetTimestamp> {
460        match &self.inner {
461            RowInner::Tcp(row) => row.get(idx),
462            RowInner::Arrow { batch, row_index } => {
463                // Arrow TimestampTz is microseconds since Unix epoch with timezone
464                use arrow::array::TimestampMicrosecondArray;
465                let col = Self::arrow_column(batch, idx)?;
466                let arr = col.as_any().downcast_ref::<TimestampMicrosecondArray>()?;
467                if arr.is_null(*row_index) {
468                    return None;
469                }
470                let unix_micros = arr.value(*row_index);
471                let hyper_micros = unix_micros - 946_684_800_000_000;
472                let ts = hyperdb_api_core::types::Timestamp::from_microseconds(hyper_micros);
473                Some(hyperdb_api_core::types::OffsetTimestamp::new(ts, 0))
474            }
475        }
476    }
477
478    /// Gets an Interval value at the given column index.
479    #[inline]
480    pub fn get_interval(&self, idx: usize) -> Option<hyperdb_api_core::types::Interval> {
481        match &self.inner {
482            RowInner::Tcp(row) => row.get(idx),
483            RowInner::Arrow { batch, row_index } => {
484                // Arrow MonthDayNano interval → Hyper Interval
485                use arrow::array::IntervalMonthDayNanoArray;
486                let col = Self::arrow_column(batch, idx)?;
487                let arr = col.as_any().downcast_ref::<IntervalMonthDayNanoArray>()?;
488                if arr.is_null(*row_index) {
489                    return None;
490                }
491                let v = arr.value(*row_index);
492                let micros = v.nanoseconds / 1000;
493                Some(hyperdb_api_core::types::Interval::new(
494                    v.months, v.days, micros,
495                ))
496            }
497        }
498    }
499
500    /// Gets a `NUMERIC` value at the given column index.
501    ///
502    /// This is the metadata-aware variant of [`Self::get_bytes`] +
503    /// [`hyperdb_api_core::types::Numeric::from_binary_with_scale`]: it looks up
504    /// the column's `SqlType::Numeric { scale, .. }` from the attached
505    /// schema and decodes the wire bytes with that scale, handling
506    /// both of Hyper's NUMERIC wire forms transparently:
507    ///
508    /// - **8 bytes** (i64) when the column's declared precision ≤ 18
509    ///   (Hyper's `Type::Numeric`). This is what aggregates like
510    ///   `AVG(INTEGER)` return as `Numeric(16, 6)`.
511    /// - **16 bytes** (i128) when declared precision > 18
512    ///   (Hyper's `Type::BigNumeric`).
513    ///
514    /// Returns `None` if any of the following are true: the value is
515    /// NULL, the schema isn't attached (which never happens for rows
516    /// obtained through [`Rowset::next_chunk`]), the column at `idx`
517    /// isn't `NUMERIC`, or the bytes can't be decoded.
518    ///
519    /// For non-TCP (Arrow/gRPC) rows, this path falls back to reading
520    /// the Arrow-native `Decimal128` / `Decimal256` columns; the scale
521    /// lives in the Arrow type descriptor in that case.
522    pub fn get_numeric(&self, idx: usize) -> Option<hyperdb_api_core::types::Numeric> {
523        match &self.inner {
524            RowInner::Tcp(_) => {
525                // TCP: decode raw bytes with scale from the schema.
526                //
527                // `SqlType::Numeric::scale` is `u32` and Hyper's own
528                // `NUMERIC(p, s)` caps at `p ≤ 38` (per
529                // `hyper/rts/type/Type.hpp`), so any legitimate scale
530                // fits easily in `u8`. But `scale as u8` silently
531                // truncates the high bits for values > 255, and a
532                // malformed server response or a bug in typemod
533                // parsing could deliver such a value — at which point
534                // we'd produce a `Numeric` with the wrong (truncated)
535                // scale and no error signal. `u8::try_from` returns
536                // `Err` for out-of-range, `?` propagates `None`, and
537                // the caller gets a clean "no value" instead of
538                // silent corruption. Symmetric with the Arrow
539                // negative-scale guard a few lines below.
540                let scale: u8 = match self.sql_type(idx)? {
541                    SqlType::Numeric { scale, .. } => u8::try_from(scale).ok()?,
542                    _ => return None,
543                };
544                let bytes = self.get_bytes(idx)?;
545                hyperdb_api_core::types::Numeric::from_binary_with_scale(&bytes, scale).ok()
546            }
547            RowInner::Arrow { batch, row_index } => {
548                use arrow::array::{Decimal128Array, Decimal256Array};
549                use arrow::datatypes::DataType as ArrowType;
550                let col = Self::arrow_column(batch, idx)?;
551                // Arrow stores decimal precision/scale in the type
552                // descriptor itself, so there's no separate schema
553                // lookup needed on this path.
554                //
555                // Note: Arrow's decimal scale is `i8` and can legally
556                // be negative (negative scale = "value is multiplied
557                // by 10^abs(scale)", e.g. scale=-2 on raw=5 renders
558                // as 500). Hyper's `Numeric` uses `u8` scale and has
559                // no representation for the negative-scale
560                // multiplier. Rather than silently dropping the
561                // multiplier (which would make raw=5 display as 5
562                // instead of 500), we surface it as "no value" via
563                // `try_into` + `?`. Negative-scale decimals don't
564                // originate from Hyper's own gRPC encoder — but
565                // `Row` can be fed from externally-loaded Arrow
566                // files, so defensive handling costs nothing and
567                // prevents a silent-corruption failure mode.
568                match col.data_type() {
569                    ArrowType::Decimal128(_precision, scale) => {
570                        let scale_u8: u8 = (*scale).try_into().ok()?;
571                        let arr = col.as_any().downcast_ref::<Decimal128Array>()?;
572                        if arr.is_null(*row_index) {
573                            return None;
574                        }
575                        let raw = arr.value(*row_index); // i128
576                        Some(hyperdb_api_core::types::Numeric::new(raw, scale_u8))
577                    }
578                    ArrowType::Decimal256(_precision, scale) => {
579                        // i256 from Arrow; Hyper NUMERIC caps at i128
580                        // (precision ≤ 38). Narrow to i128; this is
581                        // lossless for any value Hyper would actually
582                        // produce. Values outside that range are a
583                        // server-side contract violation.
584                        let scale_u8: u8 = (*scale).try_into().ok()?;
585                        let arr = col.as_any().downcast_ref::<Decimal256Array>()?;
586                        if arr.is_null(*row_index) {
587                            return None;
588                        }
589                        let raw = arr.value(*row_index);
590                        let as_i128: i128 = raw.to_i128()?;
591                        Some(hyperdb_api_core::types::Numeric::new(as_i128, scale_u8))
592                    }
593                    _ => None,
594                }
595            }
596        }
597    }
598}
599
600/// Trait for types that can be extracted from a Row.
601pub trait RowValue: Sized {
602    /// Extract a value from a Row at the given column index.
603    fn from_row(row: &Row, idx: usize) -> Option<Self>;
604}
605
606impl RowValue for i16 {
607    #[inline]
608    fn from_row(row: &Row, idx: usize) -> Option<Self> {
609        row.get_i16(idx)
610    }
611}
612
613impl RowValue for i32 {
614    #[inline]
615    fn from_row(row: &Row, idx: usize) -> Option<Self> {
616        row.get_i32(idx).or_else(|| row.get_i16(idx).map(i32::from))
617    }
618}
619
620impl RowValue for i64 {
621    #[inline]
622    fn from_row(row: &Row, idx: usize) -> Option<Self> {
623        row.get_i64(idx)
624            .or_else(|| row.get_i32(idx).map(i64::from))
625            .or_else(|| row.get_i16(idx).map(i64::from))
626    }
627}
628
629impl RowValue for f32 {
630    #[inline]
631    fn from_row(row: &Row, idx: usize) -> Option<Self> {
632        row.get_f32(idx)
633    }
634}
635
636impl RowValue for f64 {
637    #[inline]
638    fn from_row(row: &Row, idx: usize) -> Option<Self> {
639        row.get_f64(idx).or_else(|| row.get_f32(idx).map(f64::from))
640    }
641}
642
643impl RowValue for bool {
644    #[inline]
645    fn from_row(row: &Row, idx: usize) -> Option<Self> {
646        row.get_bool(idx)
647    }
648}
649
650impl RowValue for String {
651    #[inline]
652    fn from_row(row: &Row, idx: usize) -> Option<Self> {
653        row.get_string(idx)
654    }
655}
656
657impl RowValue for Vec<u8> {
658    #[inline]
659    fn from_row(row: &Row, idx: usize) -> Option<Self> {
660        row.get_bytes(idx)
661    }
662}
663
664impl RowValue for hyperdb_api_core::types::Date {
665    #[inline]
666    fn from_row(row: &Row, idx: usize) -> Option<Self> {
667        row.get_date(idx)
668    }
669}
670
671impl RowValue for hyperdb_api_core::types::Time {
672    #[inline]
673    fn from_row(row: &Row, idx: usize) -> Option<Self> {
674        row.get_time(idx)
675    }
676}
677
678impl RowValue for hyperdb_api_core::types::Timestamp {
679    #[inline]
680    fn from_row(row: &Row, idx: usize) -> Option<Self> {
681        row.get_timestamp(idx)
682    }
683}
684
685impl RowValue for hyperdb_api_core::types::OffsetTimestamp {
686    #[inline]
687    fn from_row(row: &Row, idx: usize) -> Option<Self> {
688        row.get_offset_timestamp(idx)
689    }
690}
691
692impl RowValue for hyperdb_api_core::types::Interval {
693    #[inline]
694    fn from_row(row: &Row, idx: usize) -> Option<Self> {
695        row.get_interval(idx)
696    }
697}
698
699impl RowValue for hyperdb_api_core::types::Numeric {
700    /// Unlike every other `RowValue` impl, `Numeric` decode requires
701    /// per-column metadata (scale + wire-form width) that lives on the
702    /// row's attached `ResultSchema`. [`Row::get_numeric`] does the
703    /// lookup; this impl delegates there so generic `row.get::<Numeric>()`
704    /// / `row.try_get::<Numeric>(idx, "name")` call sites work the same
705    /// as every other type.
706    #[inline]
707    fn from_row(row: &Row, idx: usize) -> Option<Self> {
708        row.get_numeric(idx)
709    }
710}
711
712// =============================================================================
713// FromRow - Struct mapping trait
714// =============================================================================
715
716/// Trait for types that can be constructed from a database row.
717///
718/// Implement this trait for your structs to enable direct mapping from
719/// query results using [`Connection::fetch_one_as`](crate::Connection::fetch_one_as),
720/// [`Connection::fetch_all_as`](crate::Connection::fetch_all_as), or by calling
721/// [`FromRow::from_row`](FromRow::from_row) on each [`Row`](crate::Row) from a [`Rowset`](crate::Rowset).
722///
723/// # Example
724///
725/// ```no_run
726/// use hyperdb_api::{Row, FromRow, Result};
727///
728/// struct User {
729///     id: i32,
730///     name: String,
731///     active: bool,
732/// }
733///
734/// impl FromRow for User {
735///     fn from_row(row: &Row) -> Result<Self> {
736///         Ok(User {
737///             id: row.get::<i32>(0).ok_or_else(|| hyperdb_api::Error::new("NULL id"))?,
738///             name: row.get::<String>(1).unwrap_or_default(),
739///             active: row.get::<bool>(2).unwrap_or(false),
740///         })
741///     }
742/// }
743/// ```
744pub trait FromRow: Sized {
745    /// Constructs an instance from a database row.
746    ///
747    /// # Errors
748    ///
749    /// Returns an [`Error`](crate::Error) — typically [`crate::Error::Other`] —
750    /// when a required column is missing, SQL `NULL`, or cannot be
751    /// decoded as the expected type. Implementations decide the exact
752    /// failure shape.
753    fn from_row(row: &Row) -> crate::error::Result<Self>;
754}
755
756// Tuple implementations for common patterns
757
758impl<A: RowValue> FromRow for (Option<A>,) {
759    fn from_row(row: &Row) -> crate::error::Result<Self> {
760        Ok((row.get::<A>(0),))
761    }
762}
763
764impl<A: RowValue, B: RowValue> FromRow for (Option<A>, Option<B>) {
765    fn from_row(row: &Row) -> crate::error::Result<Self> {
766        Ok((row.get::<A>(0), row.get::<B>(1)))
767    }
768}
769
770impl<A: RowValue, B: RowValue, C: RowValue> FromRow for (Option<A>, Option<B>, Option<C>) {
771    fn from_row(row: &Row) -> crate::error::Result<Self> {
772        Ok((row.get::<A>(0), row.get::<B>(1), row.get::<C>(2)))
773    }
774}
775
776impl<A: RowValue, B: RowValue, C: RowValue, D: RowValue> FromRow
777    for (Option<A>, Option<B>, Option<C>, Option<D>)
778{
779    fn from_row(row: &Row) -> crate::error::Result<Self> {
780        Ok((
781            row.get::<A>(0),
782            row.get::<B>(1),
783            row.get::<C>(2),
784            row.get::<D>(3),
785        ))
786    }
787}
788
789// =============================================================================
790// ResultSchema and ResultColumn
791// =============================================================================
792
793/// Metadata about a column in a result schema.
794#[derive(Debug, Clone)]
795pub struct ResultColumn {
796    /// The column name.
797    name: String,
798    /// The SQL type of the column.
799    sql_type: SqlType,
800    /// The column index (0-based).
801    index: usize,
802}
803
804impl ResultColumn {
805    /// Creates a new result column.
806    pub fn new(name: impl Into<String>, sql_type: SqlType, index: usize) -> Self {
807        ResultColumn {
808            name: name.into(),
809            sql_type,
810            index,
811        }
812    }
813
814    /// Returns the column name.
815    #[must_use]
816    pub fn name(&self) -> &str {
817        &self.name
818    }
819
820    /// Returns the SQL type of the column.
821    #[must_use]
822    pub fn sql_type(&self) -> SqlType {
823        self.sql_type
824    }
825
826    /// Returns the column index (0-based).
827    #[must_use]
828    pub fn index(&self) -> usize {
829        self.index
830    }
831}
832
833/// Schema information for a query result.
834///
835/// Provides metadata about the columns returned by a query, including
836/// column names and types.
837#[derive(Debug, Clone, Default)]
838pub struct ResultSchema {
839    columns: Vec<ResultColumn>,
840}
841
842impl ResultSchema {
843    /// Creates a new empty result schema.
844    #[must_use]
845    pub fn new() -> Self {
846        ResultSchema {
847            columns: Vec::new(),
848        }
849    }
850
851    /// Creates a result schema from column definitions.
852    #[must_use]
853    pub fn from_columns(columns: Vec<ResultColumn>) -> Self {
854        ResultSchema { columns }
855    }
856
857    /// Adds a column to the schema.
858    pub fn add_column(&mut self, name: impl Into<String>, sql_type: SqlType) {
859        let index = self.columns.len();
860        self.columns.push(ResultColumn::new(name, sql_type, index));
861    }
862
863    /// Returns the number of columns.
864    #[must_use]
865    pub fn column_count(&self) -> usize {
866        self.columns.len()
867    }
868
869    /// Returns all columns.
870    #[must_use]
871    pub fn columns(&self) -> &[ResultColumn] {
872        &self.columns
873    }
874
875    /// Returns the column at the given index.
876    ///
877    /// # Panics
878    ///
879    /// Panics if the index is out of bounds.
880    #[must_use]
881    pub fn column(&self, index: usize) -> &ResultColumn {
882        &self.columns[index]
883    }
884
885    /// Returns the column with the given name, if it exists.
886    #[must_use]
887    pub fn column_by_name(&self, name: &str) -> Option<&ResultColumn> {
888        self.columns.iter().find(|c| c.name == name)
889    }
890
891    /// Returns the index of the column with the given name, if it exists.
892    #[must_use]
893    pub fn column_index(&self, name: &str) -> Option<usize> {
894        self.columns.iter().position(|c| c.name == name)
895    }
896}
897
898// =============================================================================
899// Rowset (Streaming)
900// =============================================================================
901
902/// A streaming result set from a SQL query.
903///
904/// `Rowset` provides memory-efficient streaming access to query results.
905/// Results are fetched on-demand in chunks, keeping memory usage constant
906/// regardless of result set size. This makes it safe for any result size,
907/// from a single row to billions of rows.
908///
909/// # Example
910///
911/// ```no_run
912/// # use hyperdb_api::{Connection, Result};
913/// # fn example(conn: &Connection) -> Result<()> {
914/// let mut result = conn.execute_query("SELECT * FROM big_table")?;
915/// while let Some(chunk) = result.next_chunk()? {
916///     for row in &chunk {
917///         // Generic typed access (like C++ row.get<T>())
918///         let id: Option<i32> = row.get(0);
919///         let value: Option<f64> = row.get(1);
920///
921///         // Or direct accessors for performance
922///         let id = row.get_i32(0);
923///         let value = row.get_f64(1);
924///     }
925/// }
926/// # Ok(())
927/// # }
928/// ```
929///
930/// # Memory Behavior
931///
932/// - Only one chunk is held in memory at a time
933/// - Default chunk size is 64K rows (~few MB depending on row width)
934/// - Memory usage is `O(chunk_size)`, not `O(total_rows)`
935/// - Safe for billion-row results
936pub struct Rowset<'conn> {
937    inner: RowsetInner<'conn>,
938    /// Cached schema for this rowset, built lazily the first time
939    /// [`Self::next_chunk`] produces a non-empty chunk (TCP path — at
940    /// which point the `RowDescription` message has been observed) or
941    /// on first Arrow chunk (gRPC path). Stored as `Arc` so each row
942    /// produced by `next_chunk` gets a cheap ref-count clone — that's
943    /// how metadata-dependent decoders like [`Row::get_numeric`] reach
944    /// the column's `SqlType` without the caller plumbing scale
945    /// through manually.
946    schema_cache: Option<Arc<ResultSchema>>,
947    /// For one-shot prepared statements (the internal
948    /// [`crate::Connection::query_params`] path), hold the statement
949    /// handle here so its `Drop`-time `close_statement` fires *after*
950    /// the rowset releases its connection lock. Dropping the statement
951    /// before the rowset would deadlock because the inner stream owns
952    /// the connection's `MutexGuard`.
953    _statement_guard: Option<hyperdb_api_core::client::OwnedPreparedStatement>,
954}
955
956impl std::fmt::Debug for Rowset<'_> {
957    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
958        f.debug_struct("Rowset")
959            .field("has_schema_cache", &self.schema_cache.is_some())
960            .finish_non_exhaustive()
961    }
962}
963
964/// Internal enum to hold either TCP stream or Arrow data.
965enum RowsetInner<'conn> {
966    /// TCP streaming result (uses `QueryStream`).
967    Tcp(QueryStream<'conn>),
968    /// Arrow-based result from gRPC (all data loaded).
969    Arrow(ArrowRowset),
970    /// TCP streaming result from a prepared-statement execute.
971    Prepared(hyperdb_api_core::client::PreparedQueryStream<'conn>),
972}
973
974impl<'conn> Rowset<'conn> {
975    /// Creates a new Rowset from a `QueryStream` (TCP).
976    pub(crate) fn new(stream: QueryStream<'conn>) -> Self {
977        Rowset {
978            inner: RowsetInner::Tcp(stream),
979            schema_cache: None,
980            _statement_guard: None,
981        }
982    }
983
984    /// Creates a new Rowset from Arrow IPC data (gRPC).
985    pub(crate) fn from_arrow(arrow_rowset: ArrowRowset) -> Self {
986        Rowset {
987            inner: RowsetInner::Arrow(arrow_rowset),
988            schema_cache: None,
989            _statement_guard: None,
990        }
991    }
992
993    /// Creates a new Rowset from a prepared-statement streaming result.
994    pub(crate) fn from_prepared(
995        stream: hyperdb_api_core::client::PreparedQueryStream<'conn>,
996    ) -> Self {
997        Rowset {
998            inner: RowsetInner::Prepared(stream),
999            schema_cache: None,
1000            _statement_guard: None,
1001        }
1002    }
1003
1004    #[expect(
1005        clippy::used_underscore_binding,
1006        reason = "underscore-prefixed parameter retained for trait-method signature compatibility"
1007    )]
1008    /// Attaches a `OwnedPreparedStatement` that should be dropped
1009    /// **after** this rowset is consumed. Used by the one-shot
1010    /// prepare+execute path inside
1011    /// [`crate::Connection::query_params`] so the statement's
1012    /// Drop-time close doesn't deadlock on the rowset's still-held
1013    /// connection lock.
1014    pub(crate) fn with_statement_guard(
1015        mut self,
1016        statement: hyperdb_api_core::client::OwnedPreparedStatement,
1017    ) -> Self {
1018        self._statement_guard = Some(statement);
1019        self
1020    }
1021
1022    /// Returns the schema (column metadata) for the result set.
1023    ///
1024    /// For TCP connections, the schema is captured from the `RowDescription` message
1025    /// after the first chunk is read. For gRPC connections, the schema is available
1026    /// immediately from the Arrow data.
1027    ///
1028    /// Returns `None` if no data has been read yet (TCP only).
1029    ///
1030    /// # Example
1031    ///
1032    /// ```no_run
1033    /// # use hyperdb_api::{Connection, Result};
1034    /// # fn example(conn: &Connection) -> Result<()> {
1035    /// let mut result = conn.execute_query("SELECT id, name FROM users")?;
1036    /// // Read first chunk to capture schema (TCP) or get it immediately (gRPC)
1037    /// let _ = result.next_chunk()?;
1038    /// if let Some(schema) = result.schema() {
1039    ///     for col in schema.columns() {
1040    ///         println!("Column: {} ({})", col.name(), col.sql_type());
1041    ///     }
1042    /// }
1043    /// # Ok(())
1044    /// # }
1045    /// ```
1046    #[must_use]
1047    pub fn schema(&self) -> Option<ResultSchema> {
1048        // Fast path: cache already populated by a previous call or by
1049        // `next_chunk`. Clone from the Arc so external callers get an
1050        // owned value independent of internal lifetimes.
1051        if let Some(ref cached) = self.schema_cache {
1052            return Some((**cached).clone());
1053        }
1054        // Slow path: schema hasn't been materialized yet. Build it from
1055        // the transport without populating the cache — `schema()` takes
1056        // `&self`, so mutation isn't possible here. `next_chunk` does
1057        // the caching pass for the row-construction hot path; if a
1058        // caller really only wants the schema and never touches rows,
1059        // they pay one build per call but this is rarely the pattern.
1060        self.build_schema()
1061    }
1062
1063    /// Compute the current schema without populating the cache.
1064    ///
1065    /// Pulls column metadata from the underlying transport and
1066    /// constructs a fresh `ResultSchema`. TCP builds `SqlType` via
1067    /// [`SqlType::from_oid_and_modifier`] so
1068    /// `NUMERIC(precision, scale)` and `VARCHAR(n)` recover their
1069    /// declared parameters from the `RowDescription` `atttypmod`
1070    /// field — dropping the modifier (which bare
1071    /// [`SqlType::from_oid`] does) silently turns every `NUMERIC`
1072    /// into `(precision: 0, scale: 0)` and corrupts decimal decodes
1073    /// downstream. Arrow comes pre-typed via
1074    /// `arrow_type_to_sql_type`.
1075    fn build_schema(&self) -> Option<ResultSchema> {
1076        match &self.inner {
1077            RowsetInner::Tcp(stream) => stream.schema().map(|cols| {
1078                let columns = cols
1079                    .iter()
1080                    .enumerate()
1081                    .map(|(idx, col)| {
1082                        let sql_type =
1083                            SqlType::from_oid_and_modifier(col.type_oid().0, col.type_modifier());
1084                        ResultColumn::new(col.name(), sql_type, idx)
1085                    })
1086                    .collect();
1087                ResultSchema::from_columns(columns)
1088            }),
1089            RowsetInner::Arrow(arrow) => {
1090                let schema = arrow.schema();
1091                let columns = schema
1092                    .fields()
1093                    .iter()
1094                    .enumerate()
1095                    .map(|(idx, field)| {
1096                        ResultColumn::new(
1097                            field.name(),
1098                            crate::arrow_result::arrow_type_to_sql_type(field.data_type()),
1099                            idx,
1100                        )
1101                    })
1102                    .collect();
1103                Some(ResultSchema::from_columns(columns))
1104            }
1105            // Prepared statements: schema was captured at prepare time,
1106            // so it is always available immediately.
1107            RowsetInner::Prepared(stream) => {
1108                let cols = stream.schema();
1109                let columns = cols
1110                    .iter()
1111                    .enumerate()
1112                    .map(|(idx, col)| {
1113                        let sql_type =
1114                            SqlType::from_oid_and_modifier(col.type_oid().0, col.type_modifier());
1115                        ResultColumn::new(col.name(), sql_type, idx)
1116                    })
1117                    .collect();
1118                Some(ResultSchema::from_columns(columns))
1119            }
1120        }
1121    }
1122
1123    /// Populate `schema_cache` if not yet set, then return an `Arc`
1124    /// clone of the cached schema for row construction. Called by
1125    /// `next_chunk` so every row produced gets a cheap schema
1126    /// reference without re-building the `ResultSchema` per chunk.
1127    fn cached_schema_arc(&mut self) -> Option<Arc<ResultSchema>> {
1128        if self.schema_cache.is_none() {
1129            if let Some(schema) = self.build_schema() {
1130                self.schema_cache = Some(Arc::new(schema));
1131            }
1132        }
1133        self.schema_cache.clone()
1134    }
1135
1136    /// Returns the next chunk of rows from the result set.
1137    ///
1138    /// Each chunk contains up to `chunk_size` rows (default 64K).
1139    /// Returns `Ok(None)` when all rows have been consumed.
1140    ///
1141    /// # Example
1142    ///
1143    /// ```no_run
1144    /// # use hyperdb_api::{Rowset, Result};
1145    /// # fn example(mut result: Rowset) -> Result<()> {
1146    /// while let Some(chunk) = result.next_chunk()? {
1147    ///     for row in &chunk {
1148    ///         let id: Option<i32> = row.get(0);  // Generic typed access
1149    ///         let value = row.get_f64(1);        // Direct accessor
1150    ///     }
1151    /// }
1152    /// # Ok(())
1153    /// # }
1154    /// ```
1155    ///
1156    /// # Errors
1157    ///
1158    /// - Returns [`crate::Error::Client`] if the server sends an `ErrorResponse`
1159    ///   while streaming the result set.
1160    /// - Returns [`crate::Error::Io`] on transport-level I/O failures.
1161    /// - Returns [`crate::Error::Other`] if an Arrow IPC chunk cannot be decoded.
1162    pub fn next_chunk(&mut self) -> Result<Option<Vec<Row>>> {
1163        // Pull the next raw chunk from the underlying transport first;
1164        // on TCP, this is what makes the `RowDescription` bytes arrive
1165        // so we can cache the schema in the step below. We collect a
1166        // `TransportChunk` instead of a `Vec<Row>` directly so the
1167        // schema can be attached after we've populated the cache.
1168        enum TransportChunk {
1169            Tcp(Vec<StreamRow>),
1170            Arrow(Arc<RecordBatch>),
1171        }
1172
1173        let chunk_opt: Option<TransportChunk> = match &mut self.inner {
1174            RowsetInner::Tcp(stream) => stream.next_chunk()?.map(TransportChunk::Tcp),
1175            RowsetInner::Arrow(arrow) => arrow
1176                .next_chunk()?
1177                .map(|chunk| TransportChunk::Arrow(Arc::new(chunk.into_batch()))),
1178            RowsetInner::Prepared(stream) => stream.next_chunk()?.map(TransportChunk::Tcp),
1179        };
1180
1181        let Some(chunk) = chunk_opt else {
1182            return Ok(None);
1183        };
1184
1185        // Populate the schema cache if not already set, then clone the
1186        // Arc into each Row so `Row::get::<Numeric>` and friends can
1187        // look up per-column precision / scale without any caller
1188        // having to thread the schema through manually.
1189        let schema = self.cached_schema_arc();
1190        let rows = match chunk {
1191            TransportChunk::Tcp(stream_rows) => stream_rows
1192                .into_iter()
1193                .map(|row| Row::from_tcp(row, schema.clone()))
1194                .collect(),
1195            TransportChunk::Arrow(batch) => (0..batch.num_rows())
1196                .map(|row_index| Row::from_arrow(Arc::clone(&batch), row_index, schema.clone()))
1197                .collect(),
1198        };
1199        Ok(Some(rows))
1200    }
1201
1202    /// Returns an iterator over all rows in the result set.
1203    ///
1204    /// This provides a C++-like iteration experience while maintaining
1205    /// Rust's explicit error handling. Chunks are fetched internally
1206    /// as needed, keeping memory usage constant.
1207    ///
1208    /// # Example
1209    ///
1210    /// ```no_run
1211    /// # use hyperdb_api::{Connection, Result};
1212    /// # fn example(conn: &Connection) -> Result<()> {
1213    /// // Simple iteration (like C++)
1214    /// let result = conn.execute_query("SELECT * FROM users")?;
1215    /// for row in result.rows() {
1216    ///     let row = row?;  // Handle potential network errors
1217    ///     let id: Option<i32> = row.get(0);
1218    ///     let name: Option<String> = row.get(1);
1219    ///     println!("User: {:?} - {:?}", id, name);
1220    /// }
1221    /// # Ok(())
1222    /// # }
1223    /// ```
1224    ///
1225    /// # Error Handling
1226    ///
1227    /// Unlike C++ which uses exceptions, Rust requires explicit error handling.
1228    /// Each item in the iterator is a `Result<LightweightRow>` to handle
1229    /// potential network or protocol errors during streaming.
1230    ///
1231    /// # Comparison with `next_chunk()`
1232    ///
1233    /// | Aspect | `rows()` | `next_chunk()` |
1234    /// |--------|----------|----------------|
1235    /// | Syntax | Simpler, C++-like | More verbose |
1236    /// | Error handling | Per-row with `?` | Per-chunk |
1237    /// | Batch ops | Use `.collect()` | Natural |
1238    /// | Best for | Simple iteration | Batch processing |
1239    #[must_use]
1240    pub fn rows(self) -> RowIterator<'conn> {
1241        RowIterator {
1242            rowset: self,
1243            current_iter: Vec::new().into_iter(),
1244        }
1245    }
1246
1247    /// Collects all rows into a Vec.
1248    ///
1249    /// This is a convenience method that handles error collection more elegantly
1250    /// than the standard `collect::<Result<Vec<_>, _>>()` pattern.
1251    ///
1252    /// # Example
1253    ///
1254    /// ```no_run
1255    /// # use hyperdb_api::{Connection, Result};
1256    /// # fn example(conn: &Connection) -> Result<()> {
1257    /// let result = conn.execute_query("SELECT id, name FROM users")?;
1258    /// let rows = result.collect_rows()?;  // Much cleaner than collect::<Result<Vec<_>, _>>()
1259    ///
1260    /// for row in rows {
1261    ///     let id: Option<i32> = row.get(0);
1262    ///     let name: Option<String> = row.get(1);
1263    ///     println!("User: {:?} - {:?}", id, name);
1264    /// }
1265    /// # Ok(())
1266    /// # }
1267    /// ```
1268    ///
1269    /// # Errors
1270    ///
1271    /// Returns the first error produced by [`next_chunk`](Self::next_chunk)
1272    /// while draining the stream (transport I/O failure or server-side
1273    /// error).
1274    pub fn collect_rows(self) -> crate::error::Result<Vec<Row>> {
1275        self.rows().collect::<crate::error::Result<Vec<_>>>()
1276    }
1277
1278    /// Collects the first column of each row into a Vec.
1279    ///
1280    /// This is useful for single-column queries or when you only need one column.
1281    ///
1282    /// # Example
1283    ///
1284    /// ```no_run
1285    /// # use hyperdb_api::{Connection, Result};
1286    /// # fn example(conn: &Connection) -> Result<()> {
1287    /// let result = conn.execute_query("SELECT name FROM users")?;
1288    /// let names: Vec<Option<String>> = result.collect_column()?;
1289    ///
1290    /// for name in names {
1291    ///     if let Some(name) = name {
1292    ///         println!("User: {}", name);
1293    ///     }
1294    /// }
1295    /// # Ok(())
1296    /// # }
1297    /// ```
1298    ///
1299    /// # Errors
1300    ///
1301    /// Returns the first streaming error from
1302    /// [`next_chunk`](Self::next_chunk). SQL `NULL` cells yield
1303    /// `Option::None` entries, not errors.
1304    pub fn collect_column<T: crate::result::RowValue>(
1305        self,
1306    ) -> crate::error::Result<Vec<Option<T>>> {
1307        self.rows()
1308            .map(|row| row.map(|r| r.get::<T>(0)))
1309            .collect::<crate::error::Result<Vec<_>>>()
1310    }
1311
1312    /// Collects the first column, filtering out NULL values.
1313    ///
1314    /// This is useful when you know the column doesn't contain NULLs or want to ignore them.
1315    ///
1316    /// # Example
1317    ///
1318    /// ```no_run
1319    /// # use hyperdb_api::{Connection, Result};
1320    /// # fn example(conn: &Connection) -> Result<()> {
1321    /// let result = conn.execute_query("SELECT name FROM users WHERE name IS NOT NULL")?;
1322    /// let names: Vec<String> = result.collect_column_non_null()?;
1323    ///
1324    /// for name in names {
1325    ///     println!("User: {}", name);  // No need to handle Option
1326    /// }
1327    /// # Ok(())
1328    /// # }
1329    /// ```
1330    ///
1331    /// # Errors
1332    ///
1333    /// Returns the first streaming error from
1334    /// [`collect_column`](Self::collect_column).
1335    pub fn collect_column_non_null<T: crate::result::RowValue>(
1336        self,
1337    ) -> crate::error::Result<Vec<T>> {
1338        Ok(self.collect_column::<T>()?.into_iter().flatten().collect())
1339    }
1340
1341    /// Gets the first row of the result set.
1342    ///
1343    /// This is useful for queries that are expected to return exactly one row,
1344    /// such as aggregate queries or lookups by unique key.
1345    ///
1346    /// # Example
1347    ///
1348    /// ```no_run
1349    /// # use hyperdb_api::{Connection, Result};
1350    /// # fn example(conn: &Connection) -> Result<()> {
1351    /// let result = conn.execute_query("SELECT COUNT(*) FROM users")?;
1352    /// if let Some(row) = result.first_row()? {
1353    ///     let count: Option<i64> = row.get(0);
1354    ///     println!("User count: {:?}", count);
1355    /// }
1356    /// # Ok(())
1357    /// # }
1358    /// ```
1359    ///
1360    /// # Errors
1361    ///
1362    /// Returns the error from [`next_chunk`](Self::next_chunk). An empty
1363    /// result set yields `Ok(None)`, not an error.
1364    pub fn first_row(mut self) -> crate::error::Result<Option<Row>> {
1365        if let Some(chunk) = self.next_chunk()? {
1366            Ok(chunk.into_iter().next())
1367        } else {
1368            Ok(None)
1369        }
1370    }
1371
1372    /// Gets the first row or returns an error if no rows were found.
1373    ///
1374    /// This is useful when you expect exactly one row and want to fail if that's not the case.
1375    ///
1376    /// # Example
1377    ///
1378    /// ```no_run
1379    /// # use hyperdb_api::{Connection, Result};
1380    /// # fn example(conn: &Connection) -> Result<()> {
1381    /// let result = conn.execute_query("SELECT id, name FROM users WHERE id = 1")?;
1382    /// let row = result.require_first_row()?;  // Fails if no row found
1383    /// let id: Option<i32> = row.get(0);
1384    /// let name: Option<String> = row.get(1);
1385    /// println!("Found user: {:?} - {:?}", id, name);
1386    /// # Ok(())
1387    /// # }
1388    /// ```
1389    ///
1390    /// # Errors
1391    ///
1392    /// - Returns the error from [`first_row`](Self::first_row).
1393    /// - Returns [`crate::Error::Other`] with message `"Query returned no rows"`
1394    ///   if the result set is empty.
1395    pub fn require_first_row(self) -> crate::error::Result<Row> {
1396        self.first_row()?
1397            .ok_or_else(|| crate::error::Error::new("Query returned no rows"))
1398    }
1399
1400    /// Gets a scalar value from the first row, first column.
1401    ///
1402    /// This is a convenience method for scalar queries like `SELECT COUNT(*)` or `SELECT MAX(id)`.
1403    ///
1404    /// # Example
1405    ///
1406    /// ```no_run
1407    /// # use hyperdb_api::{Connection, Result};
1408    /// # fn example(conn: &Connection) -> Result<()> {
1409    /// let result = conn.execute_query("SELECT COUNT(*) FROM users")?;
1410    /// let count: Option<i64> = result.scalar()?;  // Much cleaner than manual row handling
1411    /// println!("User count: {:?}", count);
1412    /// # Ok(())
1413    /// # }
1414    /// ```
1415    ///
1416    /// # Errors
1417    ///
1418    /// Returns the error from [`require_first_row`](Self::require_first_row):
1419    /// streaming error or empty result. SQL `NULL` in the single cell
1420    /// yields `Ok(None)`.
1421    pub fn scalar<T: crate::result::RowValue>(self) -> crate::error::Result<Option<T>> {
1422        Ok(self.require_first_row()?.get(0))
1423    }
1424
1425    /// Gets a scalar value from the first row, first column, or returns an error if NULL.
1426    ///
1427    /// This is useful when you expect a non-NULL scalar result.
1428    ///
1429    /// # Example
1430    ///
1431    /// ```no_run
1432    /// # use hyperdb_api::{Connection, Result};
1433    /// # fn example(conn: &Connection) -> Result<()> {
1434    /// let result = conn.execute_query("SELECT COUNT(*) FROM users")?;
1435    /// let count: i64 = result.require_scalar()?;  // Fails if NULL
1436    /// println!("User count: {}", count);
1437    /// # Ok(())
1438    /// # }
1439    /// ```
1440    ///
1441    /// # Errors
1442    ///
1443    /// - Returns the error from [`scalar`](Self::scalar).
1444    /// - Returns [`crate::Error::Other`] with message `"Scalar query returned NULL"`
1445    ///   if the single cell is SQL `NULL`.
1446    pub fn require_scalar<T: crate::result::RowValue>(self) -> crate::error::Result<T> {
1447        self.scalar()?
1448            .ok_or_else(|| crate::error::Error::new("Scalar query returned NULL"))
1449    }
1450}
1451
1452// =============================================================================
1453// RowIterator - C++-like iteration over query results
1454// =============================================================================
1455
1456/// An iterator over rows in a query result set.
1457///
1458/// `RowIterator` provides a C++-like iteration experience, hiding the
1459/// chunked fetching internally. Each call to `next()` returns the next
1460/// row, automatically fetching new chunks as needed.
1461///
1462/// # Memory Behavior
1463///
1464/// Memory usage remains constant regardless of result set size:
1465/// - Internally fetches 64K rows at a time
1466/// - Previous chunks are dropped when exhausted
1467/// - Safe for billion-row results
1468///
1469/// # Example
1470///
1471/// ```no_run
1472/// # use hyperdb_api::{Connection, Result};
1473/// # fn example(conn: &Connection) -> Result<()> {
1474/// let result = conn.execute_query("SELECT id, name FROM users")?;
1475/// for row in result.rows() {
1476///     let row = row?;
1477///     let id = row.get_i32(0).unwrap_or(-1);
1478///     let name = row.get::<String>(1).unwrap_or_default();
1479///     println!("{}: {}", id, name);
1480/// }
1481/// # Ok(())
1482/// # }
1483/// ```
1484///
1485/// # Error Handling
1486///
1487/// Each iteration yields a `Result<Row>`. Errors can occur
1488/// when fetching new chunks from the server (network issues, protocol
1489/// errors, etc.). Use `?` or match to handle them:
1490///
1491/// ```no_run
1492/// # use hyperdb_api::{Rowset, Result};
1493/// # fn example(mut result: Rowset) -> Result<()> {
1494/// // Using ? in a function that returns Result
1495/// for row in result.rows() {
1496///     let row = row?;
1497///     // process row...
1498/// }
1499/// # Ok(())
1500/// # }
1501/// # fn example2(mut result: Rowset) -> Result<()> {
1502/// // Using try_for_each
1503/// result.rows().try_for_each(|row| -> Result<()> {
1504///     let row = row?;
1505///     // process row...
1506///     Ok(())
1507/// })?;
1508/// # Ok(())
1509/// # }
1510/// ```
1511#[derive(Debug)]
1512pub struct RowIterator<'conn> {
1513    rowset: Rowset<'conn>,
1514    current_iter: std::vec::IntoIter<Row>,
1515}
1516
1517impl Iterator for RowIterator<'_> {
1518    type Item = Result<Row>;
1519
1520    fn next(&mut self) -> Option<Self::Item> {
1521        // Try to get next row from current chunk
1522        if let Some(row) = self.current_iter.next() {
1523            return Some(Ok(row));
1524        }
1525
1526        // Current chunk exhausted, fetch next chunk
1527        match self.rowset.next_chunk() {
1528            Ok(Some(chunk)) => {
1529                self.current_iter = chunk.into_iter();
1530                // Return first row of new chunk
1531                self.current_iter.next().map(Ok)
1532            }
1533            Ok(None) => None,       // No more rows
1534            Err(e) => Some(Err(e)), // Error fetching chunk
1535        }
1536    }
1537}
1538
1539// =============================================================================
1540// Unit tests that don't need a live hyperd backend.
1541//
1542// Anything requiring a real Hyper process lives in `hyperdb-api/tests/*.rs` where
1543// `TestConnection` spins up a `HyperProcess` per test. These tests exercise
1544// pure in-process logic — specifically the Arrow-path branches of
1545// `Row::get_numeric`, where we can construct a synthetic `RecordBatch` with a
1546// specific `DataType::Decimal128(p, s)` descriptor and probe `Row`'s
1547// handling of it without hyperd in the loop.
1548// =============================================================================
1549
1550#[cfg(test)]
1551mod arrow_path_tests {
1552    use super::*;
1553    use arrow::array::Decimal128Array;
1554    use arrow::datatypes::{DataType as ArrowType, Field, Schema};
1555
1556    /// Build a single-row `RecordBatch` with a Decimal128 column whose
1557    /// value is `raw` and whose precision/scale are those passed in.
1558    fn decimal128_batch(raw: i128, precision: u8, scale: i8) -> Arc<RecordBatch> {
1559        let array = Decimal128Array::from(vec![Some(raw)])
1560            .with_precision_and_scale(precision, scale)
1561            .expect("valid Arrow Decimal128");
1562        let field = Field::new("v", ArrowType::Decimal128(precision, scale), true);
1563        let schema = Arc::new(Schema::new(vec![field]));
1564        Arc::new(RecordBatch::try_new(schema, vec![Arc::new(array)]).expect("batch"))
1565    }
1566
1567    /// Happy-path: a positive-scale Arrow Decimal128 decodes correctly
1568    /// via `row.get::<Numeric>()`, locking in the common case alongside
1569    /// the negative-scale test below.
1570    #[test]
1571    fn get_numeric_reads_arrow_decimal128_with_positive_scale() {
1572        // NUMERIC(10, 2), unscaled value 123 → 1.23
1573        let batch = decimal128_batch(123, 10, 2);
1574        let row = Row::from_arrow(Arc::clone(&batch), 0, None);
1575
1576        let numeric = row.get_numeric(0).expect("Some for positive-scale decimal");
1577        assert_eq!(numeric.unscaled_value(), 123);
1578        assert_eq!(numeric.scale(), 2);
1579        assert!((numeric.to_f64() - 1.23).abs() < 1e-9);
1580
1581        // Same result via the generic `row.get::<Numeric>` path.
1582        let via_rowvalue: hyperdb_api_core::types::Numeric =
1583            row.get(0).expect("RowValue path agrees with get_numeric");
1584        assert_eq!(via_rowvalue, numeric);
1585    }
1586
1587    /// Arrow's `DataType::Decimal128(u8, i8)` allows negative scale —
1588    /// a legitimate Arrow concept meaning "raw × 10^abs(scale)" (e.g.
1589    /// scale=-2 with raw=5 renders as 500). Hyper's `Numeric` uses a
1590    /// `u8` scale with no representation for that multiplier.
1591    ///
1592    /// The earlier `.max(0) as u8` code silently clamped the scale to
1593    /// 0 while keeping `raw` unchanged — which produces a value with
1594    /// the wrong magnitude (`5` instead of `500` in the example
1595    /// above). The fix here is to reject negative scales via
1596    /// `try_into` + `?`, which surfaces as `None` to the caller.
1597    /// That's strictly safer than a silent-wrong-magnitude value.
1598    #[test]
1599    fn get_numeric_rejects_arrow_decimal128_with_negative_scale() {
1600        // NUMERIC(10, -2) — Arrow allows this; Hyper's Numeric can't
1601        // represent it. Our `get_numeric` must return None rather
1602        // than silently drop the negative-scale multiplier.
1603        let batch = decimal128_batch(5, 10, -2);
1604        let row = Row::from_arrow(Arc::clone(&batch), 0, None);
1605
1606        assert!(
1607            row.get_numeric(0).is_none(),
1608            "negative Arrow scale must not produce a silently-wrong-magnitude Numeric",
1609        );
1610
1611        // And the same through the `RowValue` blanket path.
1612        let via_rowvalue: Option<hyperdb_api_core::types::Numeric> = row.get(0);
1613        assert!(via_rowvalue.is_none());
1614    }
1615
1616    /// Boundary: scale = 0 is a legal `u8` and must still succeed.
1617    /// Guards against an over-tightened check that accidentally
1618    /// rejects zero along with negatives.
1619    #[test]
1620    fn get_numeric_accepts_arrow_decimal128_with_zero_scale() {
1621        let batch = decimal128_batch(42, 10, 0);
1622        let row = Row::from_arrow(Arc::clone(&batch), 0, None);
1623        let numeric = row.get_numeric(0).expect("scale 0 is fine");
1624        assert_eq!(numeric.unscaled_value(), 42);
1625        assert_eq!(numeric.scale(), 0);
1626    }
1627}