1use std::{convert::TryInto, sync::Arc};
2
3use arrow::{
4 array::{ArrayRef, BooleanBuilder},
5 datatypes::{
6 DataType as ArrowDataType, Date32Type, Field, Float32Type, Float64Type, Int8Type,
7 Int16Type, Int32Type, Int64Type, Time32SecondType, TimeUnit, TimestampMicrosecondType,
8 TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, UInt8Type,
9 },
10};
11
12use log::debug;
13use odbc_api::{
14 Bit, DataType as OdbcDataType, ResultSetMetadata,
15 buffers::{AnyColumnBufferSlice, BufferDesc},
16};
17use thiserror::Error;
18use time::{TimeMsI32, TimeNsI64, TimeUsI64, seconds_since_midnight};
19
20mod binary;
21mod concurrent_odbc_reader;
22mod decimal;
23mod map_odbc_to_arrow;
24mod odbc_reader;
25mod text;
26mod time;
27mod to_record_batch;
28
29use crate::date_time::{
30 days_since_epoch, ms_since_epoch, ns_since_epoch, seconds_since_epoch, us_since_epoch,
31};
32
33pub use self::{
34 binary::{Binary, FixedSizedBinary},
35 concurrent_odbc_reader::ConcurrentOdbcReader,
36 decimal::Decimal,
37 map_odbc_to_arrow::{MapOdbcToArrow, MappingError},
38 odbc_reader::{OdbcReader, OdbcReaderBuilder},
39 text::{TextEncoding, choose_text_strategy},
40};
41
42pub trait ReadStrategy {
44 fn buffer_desc(&self) -> BufferDesc;
46
47 fn fill_arrow_array(&self, column_view: AnyColumnBufferSlice)
49 -> Result<ArrayRef, MappingError>;
50}
51
52pub struct NonNullableBoolean;
53
54impl ReadStrategy for NonNullableBoolean {
55 fn buffer_desc(&self) -> BufferDesc {
56 BufferDesc::Bit { nullable: false }
57 }
58
59 fn fill_arrow_array(
60 &self,
61 column_view: AnyColumnBufferSlice,
62 ) -> Result<ArrayRef, MappingError> {
63 let values = column_view.as_slice::<Bit>().unwrap();
64 let mut builder = BooleanBuilder::new();
65 for bit in values {
66 builder.append_value(bit.as_bool());
67 }
68 Ok(Arc::new(builder.finish()))
69 }
70}
71
72pub struct NullableBoolean;
73
74impl ReadStrategy for NullableBoolean {
75 fn buffer_desc(&self) -> BufferDesc {
76 BufferDesc::Bit { nullable: true }
77 }
78
79 fn fill_arrow_array(
80 &self,
81 column_view: AnyColumnBufferSlice,
82 ) -> Result<ArrayRef, MappingError> {
83 let values = column_view.as_nullable_slice().unwrap();
84 let mut builder = BooleanBuilder::new();
85 for bit in values {
86 builder.append_option(bit.copied().map(Bit::as_bool))
87 }
88 Ok(Arc::new(builder.finish()))
89 }
90}
91
92#[derive(Default, Debug, Clone, Copy)]
96pub struct BufferAllocationOptions {
97 pub max_text_size: Option<usize>,
110 pub max_binary_size: Option<usize>,
119 pub fallibale_allocations: bool,
123}
124
125pub fn choose_column_strategy(
126 field: &Field,
127 query_metadata: &mut impl ResultSetMetadata,
128 col_index: u16,
129 buffer_allocation_options: BufferAllocationOptions,
130 map_value_errors_to_null: bool,
131 trim_fixed_sized_character_strings: bool,
132 text_encoding: TextEncoding,
133) -> Result<Box<dyn ReadStrategy + Send>, ColumnFailure> {
134 let strat: Box<dyn ReadStrategy + Send> = match field.data_type() {
135 ArrowDataType::Boolean => {
136 if field.is_nullable() {
137 Box::new(NullableBoolean)
138 } else {
139 Box::new(NonNullableBoolean)
140 }
141 }
142 ArrowDataType::Int8 => Int8Type::identical(field.is_nullable()),
143 ArrowDataType::Int16 => Int16Type::identical(field.is_nullable()),
144 ArrowDataType::Int32 => Int32Type::identical(field.is_nullable()),
145 ArrowDataType::Int64 => Int64Type::identical(field.is_nullable()),
146 ArrowDataType::UInt8 => UInt8Type::identical(field.is_nullable()),
147 ArrowDataType::Float32 => Float32Type::identical(field.is_nullable()),
148 ArrowDataType::Float64 => Float64Type::identical(field.is_nullable()),
149 ArrowDataType::Date32 => Date32Type::map_infalliable(field.is_nullable(), days_since_epoch),
150 ArrowDataType::Time32(TimeUnit::Second) => {
151 Time32SecondType::map_infalliable(field.is_nullable(), seconds_since_midnight)
152 }
153 ArrowDataType::Time32(TimeUnit::Millisecond) => Box::new(TimeMsI32),
154 ArrowDataType::Time64(TimeUnit::Microsecond) => Box::new(TimeUsI64),
155 ArrowDataType::Time64(TimeUnit::Nanosecond) => Box::new(TimeNsI64),
156 ArrowDataType::Utf8 => {
157 let sql_type = query_metadata
158 .col_data_type(col_index)
159 .map_err(ColumnFailure::FailedToDescribeColumn)?;
160 debug!("Relational type of column {}: {sql_type:?}", col_index - 1);
163 let lazy_display_size = || query_metadata.col_display_size(col_index);
164 choose_text_strategy(
166 sql_type,
167 lazy_display_size,
168 buffer_allocation_options.max_text_size,
169 trim_fixed_sized_character_strings,
170 text_encoding,
171 )?
172 }
173 ArrowDataType::Decimal128(precision, scale @ 0..) => {
174 Box::new(Decimal::new(*precision, *scale))
175 }
176 ArrowDataType::Binary => {
177 let sql_type = query_metadata
178 .col_data_type(col_index)
179 .map_err(ColumnFailure::FailedToDescribeColumn)?;
180 let length = sql_type.column_size();
181 let length = match (length, buffer_allocation_options.max_binary_size) {
182 (None, None) => return Err(ColumnFailure::ZeroSizedColumn { sql_type }),
183 (None, Some(limit)) => limit,
184 (Some(len), None) => len.get(),
185 (Some(len), Some(limit)) => {
186 if len.get() < limit {
187 len.get()
188 } else {
189 limit
190 }
191 }
192 };
193 Box::new(Binary::new(length))
194 }
195 ArrowDataType::Timestamp(TimeUnit::Second, _) => {
196 TimestampSecondType::map_infalliable(field.is_nullable(), seconds_since_epoch)
197 }
198 ArrowDataType::Timestamp(TimeUnit::Millisecond, _) => {
199 TimestampMillisecondType::map_infalliable(field.is_nullable(), ms_since_epoch)
200 }
201 ArrowDataType::Timestamp(TimeUnit::Microsecond, _) => {
202 TimestampMicrosecondType::map_infalliable(field.is_nullable(), us_since_epoch)
203 }
204 ArrowDataType::Timestamp(TimeUnit::Nanosecond, _) => {
205 TimestampNanosecondType::map_falliable(
206 field.is_nullable(),
207 map_value_errors_to_null,
208 ns_since_epoch,
209 )
210 }
211 ArrowDataType::FixedSizeBinary(length) => {
212 Box::new(FixedSizedBinary::new((*length).try_into().unwrap()))
213 }
214 unsupported_arrow_type => {
215 return Err(ColumnFailure::UnsupportedArrowType(
216 unsupported_arrow_type.clone(),
217 ));
218 }
219 };
220 Ok(strat)
221}
222
223#[derive(Error, Debug)]
225pub enum ColumnFailure {
226 #[error(
228 "The ODBC driver did not specify a sensible upper bound for the column. This usually \
229 happens for large variadic types (E.g. VARCHAR(max)). In other cases it can be a \
230 shortcoming of the ODBC driver. Try casting the column into a type with a sensible upper \
231 bound. `arrow-odbc` also allows the application to specify a generic upper bound, which it \
232 would automatically apply. The type of the column causing this error is {:?}.",
233 sql_type
234 )]
235 ZeroSizedColumn { sql_type: OdbcDataType },
236 #[error(
238 "Unable to deduce the maximum string length for the SQL Data Type reported by the ODBC \
239 driver. Reported SQL data type is: {:?}.\n Error fetching column display or octet size: \
240 {source}",
241 sql_type
242 )]
243 UnknownStringLength {
244 sql_type: OdbcDataType,
245 source: odbc_api::Error,
246 },
247 #[error(
249 "Unsupported arrow type: `{0}`. This type can currently not be fetched from an ODBC data \
250 source by an instance of OdbcReader."
251 )]
252 UnsupportedArrowType(ArrowDataType),
253 #[error(
255 "An error occurred fetching the column description or data type from the metainformation \
256 attached to the ODBC result set:\n{0}"
257 )]
258 FailedToDescribeColumn(#[source] odbc_api::Error),
259 #[error(
260 "Column buffer is too large to be allocated. Tried to alloacte {num_elements} elements \
261 with {element_size} bytes in size each."
262 )]
263 TooLarge {
264 num_elements: usize,
265 element_size: usize,
266 },
267}
268
269impl ColumnFailure {
270 pub fn into_crate_error(self, name: String, index: usize) -> crate::Error {
272 crate::Error::ColumnFailure {
273 name,
274 index,
275 source: self,
276 }
277 }
278}