1use std::{convert::TryInto, sync::Arc};
2
3use arrow::{
4 array::{ArrayRef, BooleanBuilder},
5 datatypes::{
6 DataType as ArrowDataType, Date32Type, Field, Float32Type, Float64Type, Int8Type,
7 Int16Type, Int32Type, Int64Type, Time32SecondType, TimeUnit, TimestampMicrosecondType,
8 TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, UInt8Type,
9 },
10};
11
12use log::debug;
13use odbc_api::{
14 Bit, DataType as OdbcDataType, ResultSetMetadata,
15 buffers::{AnySlice, BufferDesc, Item},
16};
17use thiserror::Error;
18use time::{TimeMsI32, TimeNsI64, TimeUsI64, seconds_since_midnight};
19
20mod binary;
21mod concurrent_odbc_reader;
22mod decimal;
23mod map_odbc_to_arrow;
24mod odbc_reader;
25mod text;
26mod time;
27mod to_record_batch;
28
29use crate::date_time::{
30 days_since_epoch, ms_since_epoch, ns_since_epoch, seconds_since_epoch, us_since_epoch,
31};
32
33pub use self::{
34 binary::{Binary, FixedSizedBinary},
35 concurrent_odbc_reader::ConcurrentOdbcReader,
36 decimal::Decimal,
37 map_odbc_to_arrow::{MapOdbcToArrow, MappingError},
38 odbc_reader::{OdbcReader, OdbcReaderBuilder},
39 text::{TextEncoding, choose_text_strategy},
40};
41
42pub trait ReadStrategy {
44 fn buffer_desc(&self) -> BufferDesc;
46
47 fn fill_arrow_array(&self, column_view: AnySlice) -> Result<ArrayRef, MappingError>;
49}
50
51pub struct NonNullableBoolean;
52
53impl ReadStrategy for NonNullableBoolean {
54 fn buffer_desc(&self) -> BufferDesc {
55 BufferDesc::Bit { nullable: false }
56 }
57
58 fn fill_arrow_array(&self, column_view: AnySlice) -> Result<ArrayRef, MappingError> {
59 let values = Bit::as_slice(column_view).unwrap();
60 let mut builder = BooleanBuilder::new();
61 for bit in values {
62 builder.append_value(bit.as_bool());
63 }
64 Ok(Arc::new(builder.finish()))
65 }
66}
67
68pub struct NullableBoolean;
69
70impl ReadStrategy for NullableBoolean {
71 fn buffer_desc(&self) -> BufferDesc {
72 BufferDesc::Bit { nullable: true }
73 }
74
75 fn fill_arrow_array(&self, column_view: AnySlice) -> Result<ArrayRef, MappingError> {
76 let values = Bit::as_nullable_slice(column_view).unwrap();
77 let mut builder = BooleanBuilder::new();
78 for bit in values {
79 builder.append_option(bit.copied().map(Bit::as_bool))
80 }
81 Ok(Arc::new(builder.finish()))
82 }
83}
84
85#[derive(Default, Debug, Clone, Copy)]
89pub struct BufferAllocationOptions {
90 pub max_text_size: Option<usize>,
103 pub max_binary_size: Option<usize>,
112 pub fallibale_allocations: bool,
116}
117
118pub fn choose_column_strategy(
119 field: &Field,
120 query_metadata: &mut impl ResultSetMetadata,
121 col_index: u16,
122 buffer_allocation_options: BufferAllocationOptions,
123 map_value_errors_to_null: bool,
124 trim_fixed_sized_character_strings: bool,
125 text_encoding: TextEncoding,
126) -> Result<Box<dyn ReadStrategy + Send>, ColumnFailure> {
127 let strat: Box<dyn ReadStrategy + Send> = match field.data_type() {
128 ArrowDataType::Boolean => {
129 if field.is_nullable() {
130 Box::new(NullableBoolean)
131 } else {
132 Box::new(NonNullableBoolean)
133 }
134 }
135 ArrowDataType::Int8 => Int8Type::identical(field.is_nullable()),
136 ArrowDataType::Int16 => Int16Type::identical(field.is_nullable()),
137 ArrowDataType::Int32 => Int32Type::identical(field.is_nullable()),
138 ArrowDataType::Int64 => Int64Type::identical(field.is_nullable()),
139 ArrowDataType::UInt8 => UInt8Type::identical(field.is_nullable()),
140 ArrowDataType::Float32 => Float32Type::identical(field.is_nullable()),
141 ArrowDataType::Float64 => Float64Type::identical(field.is_nullable()),
142 ArrowDataType::Date32 => Date32Type::map_infalliable(field.is_nullable(), days_since_epoch),
143 ArrowDataType::Time32(TimeUnit::Second) => {
144 Time32SecondType::map_infalliable(field.is_nullable(), seconds_since_midnight)
145 }
146 ArrowDataType::Time32(TimeUnit::Millisecond) => Box::new(TimeMsI32),
147 ArrowDataType::Time64(TimeUnit::Microsecond) => Box::new(TimeUsI64),
148 ArrowDataType::Time64(TimeUnit::Nanosecond) => Box::new(TimeNsI64),
149 ArrowDataType::Utf8 => {
150 let sql_type = query_metadata
151 .col_data_type(col_index)
152 .map_err(ColumnFailure::FailedToDescribeColumn)?;
153 debug!("Relational type of column {}: {sql_type:?}", col_index - 1);
156 let lazy_display_size = || query_metadata.col_display_size(col_index);
157 choose_text_strategy(
159 sql_type,
160 lazy_display_size,
161 buffer_allocation_options.max_text_size,
162 trim_fixed_sized_character_strings,
163 text_encoding,
164 )?
165 }
166 ArrowDataType::Decimal128(precision, scale @ 0..) => {
167 Box::new(Decimal::new(*precision, *scale))
168 }
169 ArrowDataType::Binary => {
170 let sql_type = query_metadata
171 .col_data_type(col_index)
172 .map_err(ColumnFailure::FailedToDescribeColumn)?;
173 let length = sql_type.column_size();
174 let length = match (length, buffer_allocation_options.max_binary_size) {
175 (None, None) => return Err(ColumnFailure::ZeroSizedColumn { sql_type }),
176 (None, Some(limit)) => limit,
177 (Some(len), None) => len.get(),
178 (Some(len), Some(limit)) => {
179 if len.get() < limit {
180 len.get()
181 } else {
182 limit
183 }
184 }
185 };
186 Box::new(Binary::new(length))
187 }
188 ArrowDataType::Timestamp(TimeUnit::Second, _) => {
189 TimestampSecondType::map_infalliable(field.is_nullable(), seconds_since_epoch)
190 }
191 ArrowDataType::Timestamp(TimeUnit::Millisecond, _) => {
192 TimestampMillisecondType::map_infalliable(field.is_nullable(), ms_since_epoch)
193 }
194 ArrowDataType::Timestamp(TimeUnit::Microsecond, _) => {
195 TimestampMicrosecondType::map_infalliable(field.is_nullable(), us_since_epoch)
196 }
197 ArrowDataType::Timestamp(TimeUnit::Nanosecond, _) => {
198 TimestampNanosecondType::map_falliable(
199 field.is_nullable(),
200 map_value_errors_to_null,
201 ns_since_epoch,
202 )
203 }
204 ArrowDataType::FixedSizeBinary(length) => {
205 Box::new(FixedSizedBinary::new((*length).try_into().unwrap()))
206 }
207 unsupported_arrow_type => {
208 return Err(ColumnFailure::UnsupportedArrowType(
209 unsupported_arrow_type.clone(),
210 ));
211 }
212 };
213 Ok(strat)
214}
215
216#[derive(Error, Debug)]
218pub enum ColumnFailure {
219 #[error(
221 "The ODBC driver did not specify a sensible upper bound for the column. This usually \
222 happens for large variadic types (E.g. VARCHAR(max)). In other cases it can be a \
223 shortcoming of the ODBC driver. Try casting the column into a type with a sensible upper \
224 bound. `arrow-odbc` also allows the application to specify a generic upper bound, which it \
225 would automatically apply. The type of the column causing this error is {:?}.",
226 sql_type
227 )]
228 ZeroSizedColumn { sql_type: OdbcDataType },
229 #[error(
231 "Unable to deduce the maximum string length for the SQL Data Type reported by the ODBC \
232 driver. Reported SQL data type is: {:?}.\n Error fetching column display or octet size: \
233 {source}",
234 sql_type
235 )]
236 UnknownStringLength {
237 sql_type: OdbcDataType,
238 source: odbc_api::Error,
239 },
240 #[error(
242 "Unsupported arrow type: `{0}`. This type can currently not be fetched from an ODBC data \
243 source by an instance of OdbcReader."
244 )]
245 UnsupportedArrowType(ArrowDataType),
246 #[error(
248 "An error occurred fetching the column description or data type from the metainformation \
249 attached to the ODBC result set:\n{0}"
250 )]
251 FailedToDescribeColumn(#[source] odbc_api::Error),
252 #[error(
253 "Column buffer is too large to be allocated. Tried to alloacte {num_elements} elements \
254 with {element_size} bytes in size each."
255 )]
256 TooLarge {
257 num_elements: usize,
258 element_size: usize,
259 },
260}
261
262impl ColumnFailure {
263 pub fn into_crate_error(self, name: String, index: usize) -> crate::Error {
265 crate::Error::ColumnFailure {
266 name,
267 index,
268 source: self,
269 }
270 }
271}