use std::sync::Arc;
use arrow::array::Array;
use arrow::record_batch::RecordBatch;
use hyperdb_api_core::client::QueryStream;
use hyperdb_api_core::client::StreamRow;
use hyperdb_api_core::types::SqlType;
use crate::arrow_result::{ArrowRowset, FromArrowValue};
use crate::error::Result;
pub(crate) const DEFAULT_BINARY_CHUNK_SIZE: usize = 65536;
pub struct Row {
inner: RowInner,
schema: Option<Arc<ResultSchema>>,
}
impl std::fmt::Debug for Row {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Row")
.field("has_schema", &self.schema.is_some())
.finish_non_exhaustive()
}
}
enum RowInner {
Tcp(StreamRow),
Arrow {
batch: Arc<RecordBatch>,
row_index: usize,
},
}
impl Row {
#[inline]
pub(crate) fn from_tcp(row: StreamRow, schema: Option<Arc<ResultSchema>>) -> Self {
Row {
inner: RowInner::Tcp(row),
schema,
}
}
#[inline]
pub(crate) fn from_arrow(
batch: Arc<RecordBatch>,
row_index: usize,
schema: Option<Arc<ResultSchema>>,
) -> Self {
Row {
inner: RowInner::Arrow { batch, row_index },
schema,
}
}
#[inline]
pub fn schema(&self) -> Option<&ResultSchema> {
self.schema.as_deref()
}
#[inline]
pub fn sql_type(&self, idx: usize) -> Option<SqlType> {
let schema = self.schema.as_deref()?;
if idx < schema.column_count() {
Some(schema.column(idx).sql_type())
} else {
None
}
}
#[inline]
pub fn get<T: RowValue>(&self, idx: usize) -> Option<T> {
T::from_row(self, idx)
}
pub fn try_get<T: RowValue>(&self, idx: usize, column_name: &str) -> crate::error::Result<T> {
if idx >= self.column_count() {
return Err(crate::error::Error::new(format!(
"Column index {} ({:?}) out of bounds — row has {} columns",
idx,
column_name,
self.column_count(),
)));
}
self.get::<T>(idx).ok_or_else(|| {
crate::error::Error::new(format!(
"Column {idx} ({column_name:?}) is NULL or has incompatible type",
))
})
}
#[inline]
fn arrow_column(batch: &RecordBatch, idx: usize) -> Option<&Arc<dyn Array>> {
if idx < batch.num_columns() {
Some(batch.column(idx))
} else {
None
}
}
#[inline]
pub fn get_i16(&self, idx: usize) -> Option<i16> {
match &self.inner {
RowInner::Tcp(row) => row.get_i16(idx),
RowInner::Arrow { batch, row_index } => {
i16::from_arrow_column(Self::arrow_column(batch, idx)?, *row_index)
}
}
}
#[inline]
pub fn get_i32(&self, idx: usize) -> Option<i32> {
match &self.inner {
RowInner::Tcp(row) => row.get_i32(idx),
RowInner::Arrow { batch, row_index } => {
i32::from_arrow_column(Self::arrow_column(batch, idx)?, *row_index)
}
}
}
#[inline]
pub fn get_i64(&self, idx: usize) -> Option<i64> {
match &self.inner {
RowInner::Tcp(row) => row.get_i64(idx),
RowInner::Arrow { batch, row_index } => {
i64::from_arrow_column(Self::arrow_column(batch, idx)?, *row_index)
}
}
}
#[inline]
pub fn get_f32(&self, idx: usize) -> Option<f32> {
match &self.inner {
RowInner::Tcp(row) => row.get_f32(idx),
RowInner::Arrow { batch, row_index } => {
f32::from_arrow_column(Self::arrow_column(batch, idx)?, *row_index)
}
}
}
#[inline]
pub fn get_f64(&self, idx: usize) -> Option<f64> {
match &self.inner {
RowInner::Tcp(row) => row.get_f64(idx),
RowInner::Arrow { batch, row_index } => {
f64::from_arrow_column(Self::arrow_column(batch, idx)?, *row_index)
}
}
}
#[inline]
pub fn get_bool(&self, idx: usize) -> Option<bool> {
match &self.inner {
RowInner::Tcp(row) => row.get_bool(idx),
RowInner::Arrow { batch, row_index } => {
bool::from_arrow_column(Self::arrow_column(batch, idx)?, *row_index)
}
}
}
#[inline]
pub fn get_string(&self, idx: usize) -> Option<String> {
match &self.inner {
RowInner::Tcp(row) => row.get_string(idx),
RowInner::Arrow { batch, row_index } => {
String::from_arrow_column(Self::arrow_column(batch, idx)?, *row_index)
}
}
}
#[inline]
pub fn is_null(&self, idx: usize) -> bool {
match &self.inner {
RowInner::Tcp(row) => row.is_null(idx),
RowInner::Arrow { batch, row_index } => match Self::arrow_column(batch, idx) {
Some(col) => col.is_null(*row_index),
None => true,
},
}
}
#[inline]
pub fn column_count(&self) -> usize {
match &self.inner {
RowInner::Tcp(row) => row.column_count(),
RowInner::Arrow { batch, .. } => batch.num_columns(),
}
}
#[inline]
pub fn get_bytes(&self, idx: usize) -> Option<Vec<u8>> {
match &self.inner {
RowInner::Tcp(row) => row.get_bytes(idx).map(<[u8]>::to_vec),
RowInner::Arrow { batch, row_index } => {
Vec::<u8>::from_arrow_column(Self::arrow_column(batch, idx)?, *row_index)
}
}
}
#[inline]
pub fn get_date(&self, idx: usize) -> Option<hyperdb_api_core::types::Date> {
match &self.inner {
RowInner::Tcp(row) => row.get(idx),
RowInner::Arrow { batch, row_index } => {
use arrow::array::Date32Array;
let col = Self::arrow_column(batch, idx)?;
let arr = col.as_any().downcast_ref::<Date32Array>()?;
if arr.is_null(*row_index) {
return None;
}
let unix_days = arr.value(*row_index);
let hyper_days = unix_days - 10957;
Some(hyperdb_api_core::types::Date::from_days(hyper_days))
}
}
}
#[inline]
pub fn get_time(&self, idx: usize) -> Option<hyperdb_api_core::types::Time> {
match &self.inner {
RowInner::Tcp(row) => row.get(idx),
RowInner::Arrow { batch, row_index } => {
use arrow::array::Time64MicrosecondArray;
let col = Self::arrow_column(batch, idx)?;
let arr = col.as_any().downcast_ref::<Time64MicrosecondArray>()?;
if arr.is_null(*row_index) {
return None;
}
let micros = u64::try_from(arr.value(*row_index)).ok()?;
Some(hyperdb_api_core::types::Time::from_microseconds(micros))
}
}
}
#[inline]
pub fn get_timestamp(&self, idx: usize) -> Option<hyperdb_api_core::types::Timestamp> {
match &self.inner {
RowInner::Tcp(row) => row.get(idx),
RowInner::Arrow { batch, row_index } => {
use arrow::array::TimestampMicrosecondArray;
let col = Self::arrow_column(batch, idx)?;
let arr = col.as_any().downcast_ref::<TimestampMicrosecondArray>()?;
if arr.is_null(*row_index) {
return None;
}
let unix_micros = arr.value(*row_index);
let hyper_micros = unix_micros - 946_684_800_000_000;
Some(hyperdb_api_core::types::Timestamp::from_microseconds(
hyper_micros,
))
}
}
}
#[inline]
pub fn get_offset_timestamp(
&self,
idx: usize,
) -> Option<hyperdb_api_core::types::OffsetTimestamp> {
match &self.inner {
RowInner::Tcp(row) => row.get(idx),
RowInner::Arrow { batch, row_index } => {
use arrow::array::TimestampMicrosecondArray;
let col = Self::arrow_column(batch, idx)?;
let arr = col.as_any().downcast_ref::<TimestampMicrosecondArray>()?;
if arr.is_null(*row_index) {
return None;
}
let unix_micros = arr.value(*row_index);
let hyper_micros = unix_micros - 946_684_800_000_000;
let ts = hyperdb_api_core::types::Timestamp::from_microseconds(hyper_micros);
Some(hyperdb_api_core::types::OffsetTimestamp::new(ts, 0))
}
}
}
#[inline]
pub fn get_interval(&self, idx: usize) -> Option<hyperdb_api_core::types::Interval> {
match &self.inner {
RowInner::Tcp(row) => row.get(idx),
RowInner::Arrow { batch, row_index } => {
use arrow::array::IntervalMonthDayNanoArray;
let col = Self::arrow_column(batch, idx)?;
let arr = col.as_any().downcast_ref::<IntervalMonthDayNanoArray>()?;
if arr.is_null(*row_index) {
return None;
}
let v = arr.value(*row_index);
let micros = v.nanoseconds / 1000;
Some(hyperdb_api_core::types::Interval::new(
v.months, v.days, micros,
))
}
}
}
pub fn get_numeric(&self, idx: usize) -> Option<hyperdb_api_core::types::Numeric> {
match &self.inner {
RowInner::Tcp(_) => {
let scale: u8 = match self.sql_type(idx)? {
SqlType::Numeric { scale, .. } => u8::try_from(scale).ok()?,
_ => return None,
};
let bytes = self.get_bytes(idx)?;
hyperdb_api_core::types::Numeric::from_binary_with_scale(&bytes, scale).ok()
}
RowInner::Arrow { batch, row_index } => {
use arrow::array::{Decimal128Array, Decimal256Array};
use arrow::datatypes::DataType as ArrowType;
let col = Self::arrow_column(batch, idx)?;
match col.data_type() {
ArrowType::Decimal128(_precision, scale) => {
let scale_u8: u8 = (*scale).try_into().ok()?;
let arr = col.as_any().downcast_ref::<Decimal128Array>()?;
if arr.is_null(*row_index) {
return None;
}
let raw = arr.value(*row_index); Some(hyperdb_api_core::types::Numeric::new(raw, scale_u8))
}
ArrowType::Decimal256(_precision, scale) => {
let scale_u8: u8 = (*scale).try_into().ok()?;
let arr = col.as_any().downcast_ref::<Decimal256Array>()?;
if arr.is_null(*row_index) {
return None;
}
let raw = arr.value(*row_index);
let as_i128: i128 = raw.to_i128()?;
Some(hyperdb_api_core::types::Numeric::new(as_i128, scale_u8))
}
_ => None,
}
}
}
}
}
pub trait RowValue: Sized {
fn from_row(row: &Row, idx: usize) -> Option<Self>;
}
impl RowValue for i16 {
#[inline]
fn from_row(row: &Row, idx: usize) -> Option<Self> {
row.get_i16(idx)
}
}
impl RowValue for i32 {
#[inline]
fn from_row(row: &Row, idx: usize) -> Option<Self> {
row.get_i32(idx).or_else(|| row.get_i16(idx).map(i32::from))
}
}
impl RowValue for i64 {
#[inline]
fn from_row(row: &Row, idx: usize) -> Option<Self> {
row.get_i64(idx)
.or_else(|| row.get_i32(idx).map(i64::from))
.or_else(|| row.get_i16(idx).map(i64::from))
}
}
impl RowValue for f32 {
#[inline]
fn from_row(row: &Row, idx: usize) -> Option<Self> {
row.get_f32(idx)
}
}
impl RowValue for f64 {
#[inline]
fn from_row(row: &Row, idx: usize) -> Option<Self> {
row.get_f64(idx).or_else(|| row.get_f32(idx).map(f64::from))
}
}
impl RowValue for bool {
#[inline]
fn from_row(row: &Row, idx: usize) -> Option<Self> {
row.get_bool(idx)
}
}
impl RowValue for String {
#[inline]
fn from_row(row: &Row, idx: usize) -> Option<Self> {
row.get_string(idx)
}
}
impl RowValue for Vec<u8> {
#[inline]
fn from_row(row: &Row, idx: usize) -> Option<Self> {
row.get_bytes(idx)
}
}
impl RowValue for hyperdb_api_core::types::Date {
#[inline]
fn from_row(row: &Row, idx: usize) -> Option<Self> {
row.get_date(idx)
}
}
impl RowValue for hyperdb_api_core::types::Time {
#[inline]
fn from_row(row: &Row, idx: usize) -> Option<Self> {
row.get_time(idx)
}
}
impl RowValue for hyperdb_api_core::types::Timestamp {
#[inline]
fn from_row(row: &Row, idx: usize) -> Option<Self> {
row.get_timestamp(idx)
}
}
impl RowValue for hyperdb_api_core::types::OffsetTimestamp {
#[inline]
fn from_row(row: &Row, idx: usize) -> Option<Self> {
row.get_offset_timestamp(idx)
}
}
impl RowValue for hyperdb_api_core::types::Interval {
#[inline]
fn from_row(row: &Row, idx: usize) -> Option<Self> {
row.get_interval(idx)
}
}
impl RowValue for hyperdb_api_core::types::Numeric {
#[inline]
fn from_row(row: &Row, idx: usize) -> Option<Self> {
row.get_numeric(idx)
}
}
pub trait FromRow: Sized {
fn from_row(row: &Row) -> crate::error::Result<Self>;
}
impl<A: RowValue> FromRow for (Option<A>,) {
fn from_row(row: &Row) -> crate::error::Result<Self> {
Ok((row.get::<A>(0),))
}
}
impl<A: RowValue, B: RowValue> FromRow for (Option<A>, Option<B>) {
fn from_row(row: &Row) -> crate::error::Result<Self> {
Ok((row.get::<A>(0), row.get::<B>(1)))
}
}
impl<A: RowValue, B: RowValue, C: RowValue> FromRow for (Option<A>, Option<B>, Option<C>) {
fn from_row(row: &Row) -> crate::error::Result<Self> {
Ok((row.get::<A>(0), row.get::<B>(1), row.get::<C>(2)))
}
}
impl<A: RowValue, B: RowValue, C: RowValue, D: RowValue> FromRow
for (Option<A>, Option<B>, Option<C>, Option<D>)
{
fn from_row(row: &Row) -> crate::error::Result<Self> {
Ok((
row.get::<A>(0),
row.get::<B>(1),
row.get::<C>(2),
row.get::<D>(3),
))
}
}
#[derive(Debug, Clone)]
pub struct ResultColumn {
name: String,
sql_type: SqlType,
index: usize,
}
impl ResultColumn {
pub fn new(name: impl Into<String>, sql_type: SqlType, index: usize) -> Self {
ResultColumn {
name: name.into(),
sql_type,
index,
}
}
#[must_use]
pub fn name(&self) -> &str {
&self.name
}
#[must_use]
pub fn sql_type(&self) -> SqlType {
self.sql_type
}
#[must_use]
pub fn index(&self) -> usize {
self.index
}
}
#[derive(Debug, Clone, Default)]
pub struct ResultSchema {
columns: Vec<ResultColumn>,
}
impl ResultSchema {
#[must_use]
pub fn new() -> Self {
ResultSchema {
columns: Vec::new(),
}
}
#[must_use]
pub fn from_columns(columns: Vec<ResultColumn>) -> Self {
ResultSchema { columns }
}
pub fn add_column(&mut self, name: impl Into<String>, sql_type: SqlType) {
let index = self.columns.len();
self.columns.push(ResultColumn::new(name, sql_type, index));
}
#[must_use]
pub fn column_count(&self) -> usize {
self.columns.len()
}
#[must_use]
pub fn columns(&self) -> &[ResultColumn] {
&self.columns
}
#[must_use]
pub fn column(&self, index: usize) -> &ResultColumn {
&self.columns[index]
}
#[must_use]
pub fn column_by_name(&self, name: &str) -> Option<&ResultColumn> {
self.columns.iter().find(|c| c.name == name)
}
#[must_use]
pub fn column_index(&self, name: &str) -> Option<usize> {
self.columns.iter().position(|c| c.name == name)
}
}
pub struct Rowset<'conn> {
inner: RowsetInner<'conn>,
schema_cache: Option<Arc<ResultSchema>>,
_statement_guard: Option<hyperdb_api_core::client::OwnedPreparedStatement>,
}
impl std::fmt::Debug for Rowset<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Rowset")
.field("has_schema_cache", &self.schema_cache.is_some())
.finish_non_exhaustive()
}
}
enum RowsetInner<'conn> {
Tcp(QueryStream<'conn>),
Arrow(ArrowRowset),
Prepared(hyperdb_api_core::client::PreparedQueryStream<'conn>),
}
impl<'conn> Rowset<'conn> {
pub(crate) fn new(stream: QueryStream<'conn>) -> Self {
Rowset {
inner: RowsetInner::Tcp(stream),
schema_cache: None,
_statement_guard: None,
}
}
pub(crate) fn from_arrow(arrow_rowset: ArrowRowset) -> Self {
Rowset {
inner: RowsetInner::Arrow(arrow_rowset),
schema_cache: None,
_statement_guard: None,
}
}
pub(crate) fn from_prepared(
stream: hyperdb_api_core::client::PreparedQueryStream<'conn>,
) -> Self {
Rowset {
inner: RowsetInner::Prepared(stream),
schema_cache: None,
_statement_guard: None,
}
}
#[expect(
clippy::used_underscore_binding,
reason = "underscore-prefixed parameter retained for trait-method signature compatibility"
)]
pub(crate) fn with_statement_guard(
mut self,
statement: hyperdb_api_core::client::OwnedPreparedStatement,
) -> Self {
self._statement_guard = Some(statement);
self
}
#[must_use]
pub fn schema(&self) -> Option<ResultSchema> {
if let Some(ref cached) = self.schema_cache {
return Some((**cached).clone());
}
self.build_schema()
}
fn build_schema(&self) -> Option<ResultSchema> {
match &self.inner {
RowsetInner::Tcp(stream) => stream.schema().map(|cols| {
let columns = cols
.iter()
.enumerate()
.map(|(idx, col)| {
let sql_type =
SqlType::from_oid_and_modifier(col.type_oid().0, col.type_modifier());
ResultColumn::new(col.name(), sql_type, idx)
})
.collect();
ResultSchema::from_columns(columns)
}),
RowsetInner::Arrow(arrow) => {
let schema = arrow.schema();
let columns = schema
.fields()
.iter()
.enumerate()
.map(|(idx, field)| {
ResultColumn::new(
field.name(),
crate::arrow_result::arrow_type_to_sql_type(field.data_type()),
idx,
)
})
.collect();
Some(ResultSchema::from_columns(columns))
}
RowsetInner::Prepared(stream) => {
let cols = stream.schema();
let columns = cols
.iter()
.enumerate()
.map(|(idx, col)| {
let sql_type =
SqlType::from_oid_and_modifier(col.type_oid().0, col.type_modifier());
ResultColumn::new(col.name(), sql_type, idx)
})
.collect();
Some(ResultSchema::from_columns(columns))
}
}
}
fn cached_schema_arc(&mut self) -> Option<Arc<ResultSchema>> {
if self.schema_cache.is_none() {
if let Some(schema) = self.build_schema() {
self.schema_cache = Some(Arc::new(schema));
}
}
self.schema_cache.clone()
}
pub fn next_chunk(&mut self) -> Result<Option<Vec<Row>>> {
enum TransportChunk {
Tcp(Vec<StreamRow>),
Arrow(Arc<RecordBatch>),
}
let chunk_opt: Option<TransportChunk> = match &mut self.inner {
RowsetInner::Tcp(stream) => stream.next_chunk()?.map(TransportChunk::Tcp),
RowsetInner::Arrow(arrow) => arrow
.next_chunk()?
.map(|chunk| TransportChunk::Arrow(Arc::new(chunk.into_batch()))),
RowsetInner::Prepared(stream) => stream.next_chunk()?.map(TransportChunk::Tcp),
};
let Some(chunk) = chunk_opt else {
return Ok(None);
};
let schema = self.cached_schema_arc();
let rows = match chunk {
TransportChunk::Tcp(stream_rows) => stream_rows
.into_iter()
.map(|row| Row::from_tcp(row, schema.clone()))
.collect(),
TransportChunk::Arrow(batch) => (0..batch.num_rows())
.map(|row_index| Row::from_arrow(Arc::clone(&batch), row_index, schema.clone()))
.collect(),
};
Ok(Some(rows))
}
#[must_use]
pub fn rows(self) -> RowIterator<'conn> {
RowIterator {
rowset: self,
current_iter: Vec::new().into_iter(),
}
}
pub fn collect_rows(self) -> crate::error::Result<Vec<Row>> {
self.rows().collect::<crate::error::Result<Vec<_>>>()
}
pub fn collect_column<T: crate::result::RowValue>(
self,
) -> crate::error::Result<Vec<Option<T>>> {
self.rows()
.map(|row| row.map(|r| r.get::<T>(0)))
.collect::<crate::error::Result<Vec<_>>>()
}
pub fn collect_column_non_null<T: crate::result::RowValue>(
self,
) -> crate::error::Result<Vec<T>> {
Ok(self.collect_column::<T>()?.into_iter().flatten().collect())
}
pub fn first_row(mut self) -> crate::error::Result<Option<Row>> {
if let Some(chunk) = self.next_chunk()? {
Ok(chunk.into_iter().next())
} else {
Ok(None)
}
}
pub fn require_first_row(self) -> crate::error::Result<Row> {
self.first_row()?
.ok_or_else(|| crate::error::Error::new("Query returned no rows"))
}
pub fn scalar<T: crate::result::RowValue>(self) -> crate::error::Result<Option<T>> {
Ok(self.require_first_row()?.get(0))
}
pub fn require_scalar<T: crate::result::RowValue>(self) -> crate::error::Result<T> {
self.scalar()?
.ok_or_else(|| crate::error::Error::new("Scalar query returned NULL"))
}
}
#[derive(Debug)]
pub struct RowIterator<'conn> {
rowset: Rowset<'conn>,
current_iter: std::vec::IntoIter<Row>,
}
impl Iterator for RowIterator<'_> {
type Item = Result<Row>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(row) = self.current_iter.next() {
return Some(Ok(row));
}
match self.rowset.next_chunk() {
Ok(Some(chunk)) => {
self.current_iter = chunk.into_iter();
self.current_iter.next().map(Ok)
}
Ok(None) => None, Err(e) => Some(Err(e)), }
}
}
#[cfg(test)]
mod arrow_path_tests {
use super::*;
use arrow::array::Decimal128Array;
use arrow::datatypes::{DataType as ArrowType, Field, Schema};
fn decimal128_batch(raw: i128, precision: u8, scale: i8) -> Arc<RecordBatch> {
let array = Decimal128Array::from(vec![Some(raw)])
.with_precision_and_scale(precision, scale)
.expect("valid Arrow Decimal128");
let field = Field::new("v", ArrowType::Decimal128(precision, scale), true);
let schema = Arc::new(Schema::new(vec![field]));
Arc::new(RecordBatch::try_new(schema, vec![Arc::new(array)]).expect("batch"))
}
#[test]
fn get_numeric_reads_arrow_decimal128_with_positive_scale() {
let batch = decimal128_batch(123, 10, 2);
let row = Row::from_arrow(Arc::clone(&batch), 0, None);
let numeric = row.get_numeric(0).expect("Some for positive-scale decimal");
assert_eq!(numeric.unscaled_value(), 123);
assert_eq!(numeric.scale(), 2);
assert!((numeric.to_f64() - 1.23).abs() < 1e-9);
let via_rowvalue: hyperdb_api_core::types::Numeric =
row.get(0).expect("RowValue path agrees with get_numeric");
assert_eq!(via_rowvalue, numeric);
}
#[test]
fn get_numeric_rejects_arrow_decimal128_with_negative_scale() {
let batch = decimal128_batch(5, 10, -2);
let row = Row::from_arrow(Arc::clone(&batch), 0, None);
assert!(
row.get_numeric(0).is_none(),
"negative Arrow scale must not produce a silently-wrong-magnitude Numeric",
);
let via_rowvalue: Option<hyperdb_api_core::types::Numeric> = row.get(0);
assert!(via_rowvalue.is_none());
}
#[test]
fn get_numeric_accepts_arrow_decimal128_with_zero_scale() {
let batch = decimal128_batch(42, 10, 0);
let row = Row::from_arrow(Arc::clone(&batch), 0, None);
let numeric = row.get_numeric(0).expect("scale 0 is fine");
assert_eq!(numeric.unscaled_value(), 42);
assert_eq!(numeric.scale(), 0);
}
}