#![allow(
clippy::cast_precision_loss,
reason = "column accessor: explicit user-requested widening"
)]
use std::sync::Arc;
use crate::protocol::message::backend::DataRowBody;
use crate::types::FromHyperBinary;
use super::error::{Error, ErrorKind, Result};
use super::statement::Column;
fn decode_bytea_hex(data: &[u8]) -> Option<Vec<u8>> {
if data.len() >= 2 && data[0] == b'\\' && data[1] == b'x' {
let hex_data = &data[2..];
if hex_data.len() % 2 != 0 {
return None;
}
let mut result = Vec::with_capacity(hex_data.len() / 2);
for chunk in hex_data.chunks(2) {
let high = hex_digit_to_value(chunk[0])?;
let low = hex_digit_to_value(chunk[1])?;
result.push((high << 4) | low);
}
Some(result)
} else {
Some(data.to_vec())
}
}
#[inline]
fn hex_digit_to_value(c: u8) -> Option<u8> {
match c {
b'0'..=b'9' => Some(c - b'0'),
b'a'..=b'f' => Some(c - b'a' + 10),
b'A'..=b'F' => Some(c - b'A' + 10),
_ => None,
}
}
pub struct Row {
columns: Arc<Vec<Column>>,
data: DataRowBody,
ranges: Vec<Option<std::ops::Range<usize>>>,
}
impl Row {
pub(crate) fn new(columns: Arc<Vec<Column>>, data: DataRowBody) -> Result<Self> {
let ranges: Vec<_> = data
.ranges()
.map(|r| r.map_err(|e| Error::protocol(format!("invalid data row: {e}"))))
.collect::<Result<Vec<_>>>()?;
Ok(Row {
columns,
data,
ranges,
})
}
#[inline]
pub fn column_count(&self) -> usize {
self.columns.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.columns.is_empty()
}
#[inline]
pub fn columns(&self) -> &[Column] {
&self.columns
}
#[inline]
pub fn get_bytes(&self, idx: usize) -> Option<&[u8]> {
match self.ranges.get(idx)? {
Some(range) => Some(&self.data.buffer()[range.start..range.end]),
None => None,
}
}
#[inline]
pub fn is_null(&self, idx: usize) -> bool {
self.ranges
.get(idx)
.map_or(true, std::option::Option::is_none)
}
#[inline]
fn column_format(&self, idx: usize) -> super::statement::ColumnFormat {
self.columns
.get(idx)
.map(super::statement::Column::format)
.unwrap_or_default()
}
#[inline]
pub fn get_i16(&self, idx: usize) -> Option<i16> {
let bytes = self.get_bytes(idx)?;
if self.column_format(idx).is_binary() && bytes.len() == 2 {
Some(i16::from_le_bytes([bytes[0], bytes[1]]))
} else if !self.column_format(idx).is_binary() {
std::str::from_utf8(bytes).ok()?.trim().parse().ok()
} else {
None
}
}
#[inline]
pub fn get_i32(&self, idx: usize) -> Option<i32> {
let bytes = self.get_bytes(idx)?;
if self.column_format(idx).is_binary() && bytes.len() == 4 {
Some(i32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
} else if !self.column_format(idx).is_binary() {
std::str::from_utf8(bytes).ok()?.trim().parse().ok()
} else {
None
}
}
#[inline]
pub fn get_i64(&self, idx: usize) -> Option<i64> {
let bytes = self.get_bytes(idx)?;
if self.column_format(idx).is_binary() && bytes.len() == 8 {
Some(i64::from_le_bytes([
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
]))
} else if !self.column_format(idx).is_binary() {
std::str::from_utf8(bytes).ok()?.trim().parse().ok()
} else {
None
}
}
#[inline]
pub fn get_f32(&self, idx: usize) -> Option<f32> {
let bytes = self.get_bytes(idx)?;
if self.column_format(idx).is_binary() && bytes.len() == 4 {
Some(f32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
} else if !self.column_format(idx).is_binary() {
std::str::from_utf8(bytes).ok()?.trim().parse().ok()
} else {
None
}
}
#[inline]
pub fn get_f64(&self, idx: usize) -> Option<f64> {
let bytes = self.get_bytes(idx)?;
if self.column_format(idx).is_binary() && bytes.len() == 8 {
Some(f64::from_le_bytes([
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
]))
} else if !self.column_format(idx).is_binary() {
std::str::from_utf8(bytes).ok()?.trim().parse().ok()
} else {
None
}
}
#[inline]
pub fn get_bool(&self, idx: usize) -> Option<bool> {
let bytes = self.get_bytes(idx)?;
if self.column_format(idx).is_binary() && bytes.len() == 1 {
match bytes[0] {
0 => Some(false),
1 => Some(true),
_ => None,
}
} else {
match bytes {
[b't' | b'T'] => Some(true),
[b'f' | b'F'] => Some(false),
b"true" => Some(true),
b"false" => Some(false),
_ => None,
}
}
}
#[inline]
pub fn get_string(&self, idx: usize) -> Option<String> {
let bytes = self.get_bytes(idx)?;
String::from_utf8(bytes.to_vec()).ok()
}
#[inline]
pub fn get_bytes_owned(&self, idx: usize) -> Option<Vec<u8>> {
let bytes = self.get_bytes(idx)?;
if self.column_format(idx).is_binary() {
Some(bytes.to_vec())
} else {
decode_bytea_hex(bytes)
}
}
#[inline]
pub fn get<T: FromHyperBinary>(&self, idx: usize) -> Option<T> {
let bytes = self.get_bytes(idx)?;
T::from_hyper_binary(bytes).ok()
}
pub fn get_by_name<T: FromHyperBinary>(&self, name: &str) -> Option<T> {
let idx = self.column_index(name).ok()?;
self.get(idx)
}
pub fn column_index(&self, name: &str) -> Result<usize> {
self.columns
.iter()
.position(|c| c.name() == name)
.ok_or_else(|| Error::new(ErrorKind::Query, format!("column not found: {name}")))
}
}
impl std::fmt::Debug for Row {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Row")
.field("columns", &self.columns)
.field("column_count", &self.column_count())
.finish_non_exhaustive()
}
}
#[derive(Debug)]
pub struct StreamRow {
data: DataRowBody,
}
impl StreamRow {
#[inline]
pub(crate) fn new(data: DataRowBody) -> Self {
StreamRow { data }
}
#[inline]
pub fn column_count(&self) -> usize {
self.data.column_count() as usize
}
#[inline]
pub fn get_bytes(&self, idx: usize) -> Option<&[u8]> {
self.data.get_column_bytes(idx)
}
#[inline]
pub fn is_null(&self, idx: usize) -> bool {
self.data.is_column_null(idx)
}
#[inline]
pub fn get_i16(&self, idx: usize) -> Option<i16> {
let bytes = self.get_bytes(idx)?;
(bytes.len() == 2).then(|| i16::from_le_bytes([bytes[0], bytes[1]]))
}
#[inline]
pub fn get_i32(&self, idx: usize) -> Option<i32> {
let bytes = self.get_bytes(idx)?;
(bytes.len() == 4).then(|| i32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
}
#[inline]
pub fn get_i64(&self, idx: usize) -> Option<i64> {
let bytes = self.get_bytes(idx)?;
(bytes.len() == 8).then(|| {
i64::from_le_bytes([
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
])
})
}
#[inline]
pub fn get_f32(&self, idx: usize) -> Option<f32> {
let bytes = self.get_bytes(idx)?;
(bytes.len() == 4).then(|| f32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
}
#[inline]
pub fn get_f64(&self, idx: usize) -> Option<f64> {
let bytes = self.get_bytes(idx)?;
(bytes.len() == 8).then(|| {
f64::from_le_bytes([
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
])
})
}
#[inline]
pub fn get_bool(&self, idx: usize) -> Option<bool> {
let bytes = self.get_bytes(idx)?;
(bytes.len() == 1).then(|| match bytes[0] {
b't' | b'T' | 1 => Some(true),
b'f' | b'F' | 0 => Some(false),
_ => None,
})?
}
#[inline]
pub fn get_string(&self, idx: usize) -> Option<String> {
let bytes = self.get_bytes(idx)?;
String::from_utf8(bytes.to_vec()).ok()
}
#[inline]
pub fn get<T: FromBinaryValue>(&self, idx: usize) -> Option<T> {
T::from_stream_row(self, idx)
}
#[inline]
pub fn to_batch(self) -> BatchRow {
BatchRow::from_stream(self)
}
#[inline]
pub fn into_data(self) -> DataRowBody {
self.data
}
}
#[derive(Debug)]
pub struct BatchRow {
data: DataRowBody,
offsets: Vec<Option<(usize, usize)>>,
}
impl BatchRow {
#[inline]
pub fn new(data: DataRowBody) -> Self {
let offsets = data.compute_all_offsets();
BatchRow { data, offsets }
}
#[inline]
pub fn from_stream(row: StreamRow) -> Self {
Self::new(row.data)
}
#[inline]
pub fn column_count(&self) -> usize {
self.offsets.len()
}
#[inline]
pub fn get_bytes(&self, idx: usize) -> Option<&[u8]> {
let (start, end) = self.offsets.get(idx)?.as_ref().copied()?;
self.data.buffer().get(start..end)
}
#[inline]
pub fn is_null(&self, idx: usize) -> bool {
self.offsets
.get(idx)
.map_or(true, std::option::Option::is_none)
}
#[inline]
pub fn get_i16(&self, idx: usize) -> Option<i16> {
let bytes = self.get_bytes(idx)?;
(bytes.len() == 2).then(|| i16::from_le_bytes([bytes[0], bytes[1]]))
}
#[inline]
pub fn get_i32(&self, idx: usize) -> Option<i32> {
let bytes = self.get_bytes(idx)?;
(bytes.len() == 4).then(|| i32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
}
#[inline]
pub fn get_i64(&self, idx: usize) -> Option<i64> {
let bytes = self.get_bytes(idx)?;
(bytes.len() == 8).then(|| {
i64::from_le_bytes([
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
])
})
}
#[inline]
pub fn get_f32(&self, idx: usize) -> Option<f32> {
let bytes = self.get_bytes(idx)?;
(bytes.len() == 4).then(|| f32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
}
#[inline]
pub fn get_f64(&self, idx: usize) -> Option<f64> {
let bytes = self.get_bytes(idx)?;
(bytes.len() == 8).then(|| {
f64::from_le_bytes([
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
])
})
}
#[inline]
pub fn get_bool(&self, idx: usize) -> Option<bool> {
let bytes = self.get_bytes(idx)?;
(bytes.len() == 1).then(|| match bytes[0] {
b't' | b'T' | 1 => Some(true),
b'f' | b'F' | 0 => Some(false),
_ => None,
})?
}
#[inline]
pub fn get_string(&self, idx: usize) -> Option<String> {
let bytes = self.get_bytes(idx)?;
String::from_utf8(bytes.to_vec()).ok()
}
#[inline]
pub fn get<T: FromBinaryValue>(&self, idx: usize) -> Option<T> {
T::from_batch_row(self, idx)
}
}
pub trait FromBinaryValue: Sized {
fn from_stream_row(row: &StreamRow, idx: usize) -> Option<Self>;
fn from_batch_row(row: &BatchRow, idx: usize) -> Option<Self>;
}
impl FromBinaryValue for bool {
#[inline]
fn from_stream_row(row: &StreamRow, idx: usize) -> Option<Self> {
row.get_bool(idx)
}
#[inline]
fn from_batch_row(row: &BatchRow, idx: usize) -> Option<Self> {
row.get_bool(idx)
}
}
impl FromBinaryValue for i16 {
#[inline]
fn from_stream_row(row: &StreamRow, idx: usize) -> Option<Self> {
row.get_i16(idx)
}
#[inline]
fn from_batch_row(row: &BatchRow, idx: usize) -> Option<Self> {
row.get_i16(idx)
}
}
impl FromBinaryValue for i32 {
#[inline]
fn from_stream_row(row: &StreamRow, idx: usize) -> Option<Self> {
row.get_i32(idx).or_else(|| row.get_i16(idx).map(i32::from))
}
#[inline]
fn from_batch_row(row: &BatchRow, idx: usize) -> Option<Self> {
row.get_i32(idx).or_else(|| row.get_i16(idx).map(i32::from))
}
}
impl FromBinaryValue for i64 {
#[inline]
fn from_stream_row(row: &StreamRow, idx: usize) -> Option<Self> {
row.get_i64(idx)
.or_else(|| row.get_i32(idx).map(i64::from))
.or_else(|| row.get_i16(idx).map(i64::from))
}
#[inline]
fn from_batch_row(row: &BatchRow, idx: usize) -> Option<Self> {
row.get_i64(idx)
.or_else(|| row.get_i32(idx).map(i64::from))
.or_else(|| row.get_i16(idx).map(i64::from))
}
}
impl FromBinaryValue for f32 {
#[inline]
fn from_stream_row(row: &StreamRow, idx: usize) -> Option<Self> {
row.get_f32(idx)
}
#[inline]
fn from_batch_row(row: &BatchRow, idx: usize) -> Option<Self> {
row.get_f32(idx)
}
}
impl FromBinaryValue for f64 {
#[inline]
fn from_stream_row(row: &StreamRow, idx: usize) -> Option<Self> {
row.get_f64(idx)
.or_else(|| row.get_f32(idx).map(f64::from))
.or_else(|| row.get_i64(idx).map(|v| v as f64))
.or_else(|| row.get_i32(idx).map(f64::from))
}
#[inline]
fn from_batch_row(row: &BatchRow, idx: usize) -> Option<Self> {
row.get_f64(idx)
.or_else(|| row.get_f32(idx).map(f64::from))
.or_else(|| row.get_i64(idx).map(|v| v as f64))
.or_else(|| row.get_i32(idx).map(f64::from))
}
}
impl FromBinaryValue for String {
#[inline]
fn from_stream_row(row: &StreamRow, idx: usize) -> Option<Self> {
row.get_string(idx)
}
#[inline]
fn from_batch_row(row: &BatchRow, idx: usize) -> Option<Self> {
row.get_string(idx)
}
}
impl FromBinaryValue for Vec<u8> {
#[inline]
fn from_stream_row(row: &StreamRow, idx: usize) -> Option<Self> {
row.get_bytes(idx).map(<[u8]>::to_vec)
}
#[inline]
fn from_batch_row(row: &BatchRow, idx: usize) -> Option<Self> {
row.get_bytes(idx).map(<[u8]>::to_vec)
}
}
impl FromBinaryValue for crate::types::Date {
#[inline]
fn from_stream_row(row: &StreamRow, idx: usize) -> Option<Self> {
let bytes = row.get_bytes(idx)?;
crate::types::FromHyperBinary::from_hyper_binary(bytes).ok()
}
#[inline]
fn from_batch_row(row: &BatchRow, idx: usize) -> Option<Self> {
let bytes = row.get_bytes(idx)?;
crate::types::FromHyperBinary::from_hyper_binary(bytes).ok()
}
}
impl FromBinaryValue for crate::types::Time {
#[inline]
fn from_stream_row(row: &StreamRow, idx: usize) -> Option<Self> {
let bytes = row.get_bytes(idx)?;
crate::types::FromHyperBinary::from_hyper_binary(bytes).ok()
}
#[inline]
fn from_batch_row(row: &BatchRow, idx: usize) -> Option<Self> {
let bytes = row.get_bytes(idx)?;
crate::types::FromHyperBinary::from_hyper_binary(bytes).ok()
}
}
impl FromBinaryValue for crate::types::Timestamp {
#[inline]
fn from_stream_row(row: &StreamRow, idx: usize) -> Option<Self> {
let bytes = row.get_bytes(idx)?;
crate::types::FromHyperBinary::from_hyper_binary(bytes).ok()
}
#[inline]
fn from_batch_row(row: &BatchRow, idx: usize) -> Option<Self> {
let bytes = row.get_bytes(idx)?;
crate::types::FromHyperBinary::from_hyper_binary(bytes).ok()
}
}
impl FromBinaryValue for crate::types::Interval {
#[inline]
fn from_stream_row(row: &StreamRow, idx: usize) -> Option<Self> {
let bytes = row.get_bytes(idx)?;
crate::types::FromHyperBinary::from_hyper_binary(bytes).ok()
}
#[inline]
fn from_batch_row(row: &BatchRow, idx: usize) -> Option<Self> {
let bytes = row.get_bytes(idx)?;
crate::types::FromHyperBinary::from_hyper_binary(bytes).ok()
}
}
impl FromBinaryValue for crate::types::OffsetTimestamp {
#[inline]
fn from_stream_row(row: &StreamRow, idx: usize) -> Option<Self> {
let bytes = row.get_bytes(idx)?;
crate::types::FromHyperBinary::from_hyper_binary(bytes).ok()
}
#[inline]
fn from_batch_row(row: &BatchRow, idx: usize) -> Option<Self> {
let bytes = row.get_bytes(idx)?;
crate::types::FromHyperBinary::from_hyper_binary(bytes).ok()
}
}