use std::{convert::TryInto, sync::Arc};
use arrow::{
array::{ArrayRef, BooleanBuilder, Decimal128Builder},
datatypes::{
DataType as ArrowDataType, Date32Type, Field, Float32Type, Float64Type, Int16Type,
Int32Type, Int64Type, Int8Type, TimeUnit, TimestampMicrosecondType,
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, UInt8Type,
},
};
use atoi::FromRadix10Signed;
use odbc_api::{
buffers::{AnySlice, BufferDesc, Item},
Bit, DataType as OdbcDataType, ResultSetMetadata,
};
use thiserror::Error;
mod binary;
mod map_odbc_to_arrow;
mod text;
pub use self::{
binary::{Binary, FixedSizedBinary},
text::choose_text_strategy,
};
use self::map_odbc_to_arrow::MapOdbcToArrow;
use crate::date_time::{
days_since_epoch, ms_since_epoch, ns_since_epoch, seconds_since_epoch, us_since_epoch,
};
pub trait ReadStrategy {
fn buffer_desc(&self) -> BufferDesc;
fn fill_arrow_array(&self, column_view: AnySlice) -> ArrayRef;
}
pub struct NonNullableBoolean;
impl ReadStrategy for NonNullableBoolean {
fn buffer_desc(&self) -> BufferDesc {
BufferDesc::Bit { nullable: false }
}
fn fill_arrow_array(&self, column_view: AnySlice) -> ArrayRef {
let values = Bit::as_slice(column_view).unwrap();
let mut builder = BooleanBuilder::new();
for bit in values {
builder.append_value(bit.as_bool());
}
Arc::new(builder.finish())
}
}
pub struct NullableBoolean;
impl ReadStrategy for NullableBoolean {
fn buffer_desc(&self) -> BufferDesc {
BufferDesc::Bit { nullable: true }
}
fn fill_arrow_array(&self, column_view: AnySlice) -> ArrayRef {
let values = Bit::as_nullable_slice(column_view).unwrap();
let mut builder = BooleanBuilder::new();
for bit in values {
builder.append_option(bit.copied().map(Bit::as_bool))
}
Arc::new(builder.finish())
}
}
pub struct Decimal {
precision: u8,
scale: i8,
}
impl Decimal {
pub fn new(precision: u8, scale: i8) -> Self {
Self { precision, scale }
}
}
impl ReadStrategy for Decimal {
fn buffer_desc(&self) -> BufferDesc {
BufferDesc::Text {
max_str_len: self.precision as usize + 2,
}
}
fn fill_arrow_array(&self, column_view: AnySlice) -> ArrayRef {
let view = column_view.as_text_view().unwrap();
let mut builder = Decimal128Builder::new();
let mut buf_digits = Vec::new();
for opt in view.iter() {
if let Some(text) = opt {
buf_digits.clear();
buf_digits.extend(text.iter().filter(|&&c| c != b'.'));
let (num, _consumed) = i128::from_radix_10_signed(&buf_digits);
builder.append_value(num);
} else {
builder.append_null();
}
}
Arc::new(
builder
.finish()
.with_precision_and_scale(self.precision, self.scale)
.unwrap(),
)
}
}
#[derive(Default, Debug, Clone, Copy)]
pub struct BufferAllocationOptions {
pub max_text_size: Option<usize>,
pub max_binary_size: Option<usize>,
pub fallibale_allocations: bool,
}
pub fn choose_column_strategy(
field: &Field,
query_metadata: &mut impl ResultSetMetadata,
col_index: u16,
buffer_allocation_options: BufferAllocationOptions,
) -> Result<Box<dyn ReadStrategy>, ColumnFailure> {
let strat: Box<dyn ReadStrategy> = match field.data_type() {
ArrowDataType::Boolean => {
if field.is_nullable() {
Box::new(NullableBoolean)
} else {
Box::new(NonNullableBoolean)
}
}
ArrowDataType::Int8 => Int8Type::identical(field.is_nullable()),
ArrowDataType::Int16 => Int16Type::identical(field.is_nullable()),
ArrowDataType::Int32 => Int32Type::identical(field.is_nullable()),
ArrowDataType::Int64 => Int64Type::identical(field.is_nullable()),
ArrowDataType::UInt8 => UInt8Type::identical(field.is_nullable()),
ArrowDataType::Float32 => Float32Type::identical(field.is_nullable()),
ArrowDataType::Float64 => Float64Type::identical(field.is_nullable()),
ArrowDataType::Date32 => Date32Type::map_with(field.is_nullable(), days_since_epoch),
ArrowDataType::Utf8 => {
let sql_type = query_metadata
.col_data_type(col_index)
.map_err(ColumnFailure::FailedToDescribeColumn)?;
let lazy_display_size = || query_metadata.col_display_size(col_index);
choose_text_strategy(
sql_type,
lazy_display_size,
buffer_allocation_options.max_text_size,
)?
}
ArrowDataType::Decimal128(precision, scale @ 0..) => {
Box::new(Decimal::new(*precision, *scale))
}
ArrowDataType::Binary => {
let sql_type = query_metadata
.col_data_type(col_index)
.map_err(ColumnFailure::FailedToDescribeColumn)?;
let length = sql_type.column_size();
let length = match (length, buffer_allocation_options.max_binary_size) {
(0, None) => return Err(ColumnFailure::ZeroSizedColumn { sql_type }),
(0, Some(limit)) => limit,
(len, None) => len,
(len, Some(limit)) => {
if len < limit {
len
} else {
limit
}
}
};
Box::new(Binary::new(length))
}
ArrowDataType::Timestamp(TimeUnit::Second, _) => {
TimestampSecondType::map_with(field.is_nullable(), seconds_since_epoch)
}
ArrowDataType::Timestamp(TimeUnit::Millisecond, _) => {
TimestampMillisecondType::map_with(field.is_nullable(), ms_since_epoch)
}
ArrowDataType::Timestamp(TimeUnit::Microsecond, _) => {
TimestampMicrosecondType::map_with(field.is_nullable(), us_since_epoch)
}
ArrowDataType::Timestamp(TimeUnit::Nanosecond, _) => {
TimestampNanosecondType::map_with(field.is_nullable(), ns_since_epoch)
}
ArrowDataType::FixedSizeBinary(length) => {
Box::new(FixedSizedBinary::new((*length).try_into().unwrap()))
}
unsupported_arrow_type => {
return Err(ColumnFailure::UnsupportedArrowType(
unsupported_arrow_type.clone(),
))
}
};
Ok(strat)
}
#[derive(Error, Debug)]
pub enum ColumnFailure {
#[error(
"ODBC reported a size of '0' for the column. This might indicate that the driver cannot \
specify a sensible upper bound for the column. E.g. for cases like VARCHAR(max). Try \
casting the column into a type with a sensible upper bound. The type of the column causing \
this error is {:?}.",
sql_type
)]
ZeroSizedColumn { sql_type: OdbcDataType },
#[error(
"Unable to deduce the maximum string length for the SQL Data Type reported by the ODBC \
driver. Reported SQL data type is: {:?}.\n Error fetching column display or octet size: \
{source}",
sql_type
)]
UnknownStringLength {
sql_type: OdbcDataType,
source: odbc_api::Error,
},
#[error(
"Unsupported arrow type: `{0}`. This type can currently not be fetched from an ODBC data \
source by an instance of OdbcReader."
)]
UnsupportedArrowType(ArrowDataType),
#[error(
"An error occurred fetching the column description or data type from the metainformation \
attached to the ODBC result set:\n{0}"
)]
FailedToDescribeColumn(#[source] odbc_api::Error),
#[error(
"Column buffer is too large to be allocated. Tried to alloacte {num_elements} elements \
with {element_size} bytes in size each."
)]
TooLarge {
num_elements: usize,
element_size: usize,
},
}
impl ColumnFailure {
pub fn into_crate_error(self, name: String, index: usize) -> crate::Error {
crate::Error::ColumnFailure {
name,
index,
source: self,
}
}
}