1use std::{convert::TryInto, sync::Arc};
2
3use arrow::{
4 array::{ArrayRef, BooleanBuilder},
5 datatypes::{
6 DataType as ArrowDataType, Date32Type, Field, Float32Type, Float64Type, Int16Type,
7 Int32Type, Int64Type, Int8Type, TimeUnit, TimestampMicrosecondType,
8 TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, UInt8Type,
9 },
10};
11
12use log::debug;
13use odbc_api::{
14 buffers::{AnySlice, BufferDesc, Item},
15 Bit, DataType as OdbcDataType, ResultSetMetadata,
16};
17use thiserror::Error;
18
19mod binary;
20mod concurrent_odbc_reader;
21mod decimal;
22mod map_odbc_to_arrow;
23mod odbc_reader;
24mod text;
25mod to_record_batch;
26
27use crate::date_time::{
28 days_since_epoch, ms_since_epoch, ns_since_epoch, seconds_since_epoch, us_since_epoch,
29};
30
31pub use self::{
32 binary::{Binary, FixedSizedBinary},
33 concurrent_odbc_reader::ConcurrentOdbcReader,
34 decimal::Decimal,
35 map_odbc_to_arrow::{MapOdbcToArrow, MappingError},
36 odbc_reader::{OdbcReader, OdbcReaderBuilder},
37 text::choose_text_strategy,
38};
39
40pub trait ReadStrategy {
42 fn buffer_desc(&self) -> BufferDesc;
44
45 fn fill_arrow_array(&self, column_view: AnySlice) -> Result<ArrayRef, MappingError>;
47}
48
49pub struct NonNullableBoolean;
50
51impl ReadStrategy for NonNullableBoolean {
52 fn buffer_desc(&self) -> BufferDesc {
53 BufferDesc::Bit { nullable: false }
54 }
55
56 fn fill_arrow_array(&self, column_view: AnySlice) -> Result<ArrayRef, MappingError> {
57 let values = Bit::as_slice(column_view).unwrap();
58 let mut builder = BooleanBuilder::new();
59 for bit in values {
60 builder.append_value(bit.as_bool());
61 }
62 Ok(Arc::new(builder.finish()))
63 }
64}
65
66pub struct NullableBoolean;
67
68impl ReadStrategy for NullableBoolean {
69 fn buffer_desc(&self) -> BufferDesc {
70 BufferDesc::Bit { nullable: true }
71 }
72
73 fn fill_arrow_array(&self, column_view: AnySlice) -> Result<ArrayRef, MappingError> {
74 let values = Bit::as_nullable_slice(column_view).unwrap();
75 let mut builder = BooleanBuilder::new();
76 for bit in values {
77 builder.append_option(bit.copied().map(Bit::as_bool))
78 }
79 Ok(Arc::new(builder.finish()))
80 }
81}
82
83#[derive(Default, Debug, Clone, Copy)]
87pub struct BufferAllocationOptions {
88 pub max_text_size: Option<usize>,
101 pub max_binary_size: Option<usize>,
110 pub fallibale_allocations: bool,
114}
115
116pub fn choose_column_strategy(
117 field: &Field,
118 query_metadata: &mut impl ResultSetMetadata,
119 col_index: u16,
120 buffer_allocation_options: BufferAllocationOptions,
121 map_value_errors_to_null: bool,
122 trim_fixed_sized_character_strings: bool,
123) -> Result<Box<dyn ReadStrategy + Send>, ColumnFailure> {
124 let strat: Box<dyn ReadStrategy + Send> = match field.data_type() {
125 ArrowDataType::Boolean => {
126 if field.is_nullable() {
127 Box::new(NullableBoolean)
128 } else {
129 Box::new(NonNullableBoolean)
130 }
131 }
132 ArrowDataType::Int8 => Int8Type::identical(field.is_nullable()),
133 ArrowDataType::Int16 => Int16Type::identical(field.is_nullable()),
134 ArrowDataType::Int32 => Int32Type::identical(field.is_nullable()),
135 ArrowDataType::Int64 => Int64Type::identical(field.is_nullable()),
136 ArrowDataType::UInt8 => UInt8Type::identical(field.is_nullable()),
137 ArrowDataType::Float32 => Float32Type::identical(field.is_nullable()),
138 ArrowDataType::Float64 => Float64Type::identical(field.is_nullable()),
139 ArrowDataType::Date32 => Date32Type::map_infalliable(field.is_nullable(), days_since_epoch),
140 ArrowDataType::Utf8 => {
141 let sql_type = query_metadata
142 .col_data_type(col_index)
143 .map_err(ColumnFailure::FailedToDescribeColumn)?;
144 debug!("Relational type of column {}: {sql_type:?}", col_index - 1);
147 let lazy_display_size = || query_metadata.col_display_size(col_index);
148 choose_text_strategy(
150 sql_type,
151 lazy_display_size,
152 buffer_allocation_options.max_text_size,
153 trim_fixed_sized_character_strings,
154 )?
155 }
156 ArrowDataType::Decimal128(precision, scale @ 0..) => {
157 Box::new(Decimal::new(*precision, *scale))
158 }
159 ArrowDataType::Binary => {
160 let sql_type = query_metadata
161 .col_data_type(col_index)
162 .map_err(ColumnFailure::FailedToDescribeColumn)?;
163 let length = sql_type.column_size();
164 let length = match (length, buffer_allocation_options.max_binary_size) {
165 (None, None) => return Err(ColumnFailure::ZeroSizedColumn { sql_type }),
166 (None, Some(limit)) => limit,
167 (Some(len), None) => len.get(),
168 (Some(len), Some(limit)) => {
169 if len.get() < limit {
170 len.get()
171 } else {
172 limit
173 }
174 }
175 };
176 Box::new(Binary::new(length))
177 }
178 ArrowDataType::Timestamp(TimeUnit::Second, _) => {
179 TimestampSecondType::map_infalliable(field.is_nullable(), seconds_since_epoch)
180 }
181 ArrowDataType::Timestamp(TimeUnit::Millisecond, _) => {
182 TimestampMillisecondType::map_infalliable(field.is_nullable(), ms_since_epoch)
183 }
184 ArrowDataType::Timestamp(TimeUnit::Microsecond, _) => {
185 TimestampMicrosecondType::map_infalliable(field.is_nullable(), us_since_epoch)
186 }
187 ArrowDataType::Timestamp(TimeUnit::Nanosecond, _) => {
188 TimestampNanosecondType::map_falliable(
189 field.is_nullable(),
190 map_value_errors_to_null,
191 ns_since_epoch,
192 )
193 }
194 ArrowDataType::FixedSizeBinary(length) => {
195 Box::new(FixedSizedBinary::new((*length).try_into().unwrap()))
196 }
197 unsupported_arrow_type => {
198 return Err(ColumnFailure::UnsupportedArrowType(
199 unsupported_arrow_type.clone(),
200 ))
201 }
202 };
203 Ok(strat)
204}
205
206#[derive(Error, Debug)]
208pub enum ColumnFailure {
209 #[error(
211 "The ODBC driver did not specify a sensible upper bound for the column. This usually \
212 happens for large variadic types (E.g. VARCHAR(max)). In other cases it can be a \
213 shortcoming of the ODBC driver. Try casting the column into a type with a sensible upper \
214 bound. `arrow-odbc` also allows the application to specify a generic upper bound, which it \
215 would automatically apply. The type of the column causing this error is {:?}.",
216 sql_type
217 )]
218 ZeroSizedColumn { sql_type: OdbcDataType },
219 #[error(
221 "Unable to deduce the maximum string length for the SQL Data Type reported by the ODBC \
222 driver. Reported SQL data type is: {:?}.\n Error fetching column display or octet size: \
223 {source}",
224 sql_type
225 )]
226 UnknownStringLength {
227 sql_type: OdbcDataType,
228 source: odbc_api::Error,
229 },
230 #[error(
232 "Unsupported arrow type: `{0}`. This type can currently not be fetched from an ODBC data \
233 source by an instance of OdbcReader."
234 )]
235 UnsupportedArrowType(ArrowDataType),
236 #[error(
238 "An error occurred fetching the column description or data type from the metainformation \
239 attached to the ODBC result set:\n{0}"
240 )]
241 FailedToDescribeColumn(#[source] odbc_api::Error),
242 #[error(
243 "Column buffer is too large to be allocated. Tried to alloacte {num_elements} elements \
244 with {element_size} bytes in size each."
245 )]
246 TooLarge {
247 num_elements: usize,
248 element_size: usize,
249 },
250}
251
252impl ColumnFailure {
253 pub fn into_crate_error(self, name: String, index: usize) -> crate::Error {
255 crate::Error::ColumnFailure {
256 name,
257 index,
258 source: self,
259 }
260 }
261}