use std::{convert::TryInto, sync::Arc};
use arrow::{
array::{ArrayRef, BooleanBuilder},
datatypes::{
DataType as ArrowDataType, Date32Type, Field, Float32Type, Float64Type, Int16Type,
Int32Type, Int64Type, Int8Type, TimeUnit, TimestampMicrosecondType,
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, UInt8Type,
},
};
use log::debug;
use odbc_api::{
buffers::{AnySlice, BufferDesc, Item},
Bit, DataType as OdbcDataType, ResultSetMetadata,
};
use thiserror::Error;
mod binary;
mod concurrent_odbc_reader;
mod decimal;
mod map_odbc_to_arrow;
mod odbc_reader;
mod text;
mod to_record_batch;
use crate::{
date_time::{
days_since_epoch, ms_since_epoch, ns_since_epoch, seconds_since_epoch, us_since_epoch,
},
Quirks,
};
pub use self::{
binary::{Binary, FixedSizedBinary},
concurrent_odbc_reader::ConcurrentOdbcReader,
decimal::Decimal,
map_odbc_to_arrow::{MapOdbcToArrow, MappingError},
odbc_reader::{OdbcReader, OdbcReaderBuilder},
text::choose_text_strategy,
};
pub trait ReadStrategy {
fn buffer_desc(&self) -> BufferDesc;
fn fill_arrow_array(&self, column_view: AnySlice) -> Result<ArrayRef, MappingError>;
}
pub struct NonNullableBoolean;
impl ReadStrategy for NonNullableBoolean {
fn buffer_desc(&self) -> BufferDesc {
BufferDesc::Bit { nullable: false }
}
fn fill_arrow_array(&self, column_view: AnySlice) -> Result<ArrayRef, MappingError> {
let values = Bit::as_slice(column_view).unwrap();
let mut builder = BooleanBuilder::new();
for bit in values {
builder.append_value(bit.as_bool());
}
Ok(Arc::new(builder.finish()))
}
}
pub struct NullableBoolean;
impl ReadStrategy for NullableBoolean {
fn buffer_desc(&self) -> BufferDesc {
BufferDesc::Bit { nullable: true }
}
fn fill_arrow_array(&self, column_view: AnySlice) -> Result<ArrayRef, MappingError> {
let values = Bit::as_nullable_slice(column_view).unwrap();
let mut builder = BooleanBuilder::new();
for bit in values {
builder.append_option(bit.copied().map(Bit::as_bool))
}
Ok(Arc::new(builder.finish()))
}
}
#[derive(Default, Debug, Clone, Copy)]
pub struct BufferAllocationOptions {
pub max_text_size: Option<usize>,
pub max_binary_size: Option<usize>,
pub fallibale_allocations: bool,
}
pub fn choose_column_strategy(
field: &Field,
query_metadata: &mut impl ResultSetMetadata,
col_index: u16,
buffer_allocation_options: BufferAllocationOptions,
quirks: &Quirks,
) -> Result<Box<dyn ReadStrategy>, ColumnFailure> {
let strat: Box<dyn ReadStrategy> = match field.data_type() {
ArrowDataType::Boolean => {
if field.is_nullable() {
Box::new(NullableBoolean)
} else {
Box::new(NonNullableBoolean)
}
}
ArrowDataType::Int8 => Int8Type::identical(field.is_nullable()),
ArrowDataType::Int16 => Int16Type::identical(field.is_nullable()),
ArrowDataType::Int32 => Int32Type::identical(field.is_nullable()),
ArrowDataType::Int64 => Int64Type::identical(field.is_nullable()),
ArrowDataType::UInt8 => UInt8Type::identical(field.is_nullable()),
ArrowDataType::Float32 => Float32Type::identical(field.is_nullable()),
ArrowDataType::Float64 => Float64Type::identical(field.is_nullable()),
ArrowDataType::Date32 => {
Date32Type::map_with(field.is_nullable(), |e| Ok(days_since_epoch(e)))
}
ArrowDataType::Utf8 => {
let sql_type = query_metadata
.col_data_type(col_index)
.map_err(ColumnFailure::FailedToDescribeColumn)?;
debug!("Relational type of column {}: {sql_type:?}", col_index - 1);
let lazy_display_size = || query_metadata.col_display_size(col_index);
choose_text_strategy(
sql_type,
lazy_display_size,
buffer_allocation_options.max_text_size,
quirks.indicators_returned_from_bulk_fetch_are_memory_garbage,
)?
}
ArrowDataType::Decimal128(precision, scale @ 0..) => {
Box::new(Decimal::new(*precision, *scale))
}
ArrowDataType::Binary => {
let sql_type = query_metadata
.col_data_type(col_index)
.map_err(ColumnFailure::FailedToDescribeColumn)?;
let length = sql_type.column_size();
let length = match (length, buffer_allocation_options.max_binary_size) {
(None, None) => return Err(ColumnFailure::ZeroSizedColumn { sql_type }),
(None, Some(limit)) => limit,
(Some(len), None) => len.get(),
(Some(len), Some(limit)) => {
if len.get() < limit {
len.get()
} else {
limit
}
}
};
Box::new(Binary::new(length))
}
ArrowDataType::Timestamp(TimeUnit::Second, _) => {
TimestampSecondType::map_with(field.is_nullable(), |e| Ok(seconds_since_epoch(e)))
}
ArrowDataType::Timestamp(TimeUnit::Millisecond, _) => {
TimestampMillisecondType::map_with(field.is_nullable(), |e| Ok(ms_since_epoch(e)))
}
ArrowDataType::Timestamp(TimeUnit::Microsecond, _) => {
TimestampMicrosecondType::map_with(field.is_nullable(), |e| Ok(us_since_epoch(e)))
}
ArrowDataType::Timestamp(TimeUnit::Nanosecond, _) => {
TimestampNanosecondType::map_with(field.is_nullable(), ns_since_epoch)
}
ArrowDataType::FixedSizeBinary(length) => {
Box::new(FixedSizedBinary::new((*length).try_into().unwrap()))
}
unsupported_arrow_type => {
return Err(ColumnFailure::UnsupportedArrowType(
unsupported_arrow_type.clone(),
))
}
};
Ok(strat)
}
#[derive(Error, Debug)]
pub enum ColumnFailure {
#[error(
"The ODBC driver did not specify a sensible upper bound for the column. This usually \
happens for large variadic types (E.g. VARCHAR(max)). In other cases it can be a \
shortcoming of the ODBC driver. Try casting the column into a type with a sensible upper \
bound. `arrow-odbc` also allows the application to specify a generic upper bound, which it \
would automatically apply. The type of the column causing this error is {:?}.",
sql_type
)]
ZeroSizedColumn { sql_type: OdbcDataType },
#[error(
"Unable to deduce the maximum string length for the SQL Data Type reported by the ODBC \
driver. Reported SQL data type is: {:?}.\n Error fetching column display or octet size: \
{source}",
sql_type
)]
UnknownStringLength {
sql_type: OdbcDataType,
source: odbc_api::Error,
},
#[error(
"Unsupported arrow type: `{0}`. This type can currently not be fetched from an ODBC data \
source by an instance of OdbcReader."
)]
UnsupportedArrowType(ArrowDataType),
#[error(
"An error occurred fetching the column description or data type from the metainformation \
attached to the ODBC result set:\n{0}"
)]
FailedToDescribeColumn(#[source] odbc_api::Error),
#[error(
"Column buffer is too large to be allocated. Tried to alloacte {num_elements} elements \
with {element_size} bytes in size each."
)]
TooLarge {
num_elements: usize,
element_size: usize,
},
}
impl ColumnFailure {
pub fn into_crate_error(self, name: String, index: usize) -> crate::Error {
crate::Error::ColumnFailure {
name,
index,
source: self,
}
}
}