1use std::{convert::TryInto, sync::Arc};
2
3use arrow::{
4 array::{ArrayRef, BooleanBuilder},
5 datatypes::{
6 DataType as ArrowDataType, Date32Type, Field, Float32Type, Float64Type, Int8Type,
7 Int16Type, Int32Type, Int64Type, TimeUnit, TimestampMicrosecondType,
8 TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, UInt8Type,
9 },
10};
11
12use log::debug;
13use odbc_api::{
14 Bit, DataType as OdbcDataType, ResultSetMetadata,
15 buffers::{AnySlice, BufferDesc, Item},
16};
17use thiserror::Error;
18
19mod binary;
20mod concurrent_odbc_reader;
21mod decimal;
22mod map_odbc_to_arrow;
23mod odbc_reader;
24mod text;
25mod to_record_batch;
26
27use crate::date_time::{
28 days_since_epoch, ms_since_epoch, ns_since_epoch, seconds_since_epoch, us_since_epoch,
29};
30
31pub use self::{
32 binary::{Binary, FixedSizedBinary},
33 concurrent_odbc_reader::ConcurrentOdbcReader,
34 decimal::Decimal,
35 map_odbc_to_arrow::{MapOdbcToArrow, MappingError},
36 odbc_reader::{OdbcReader, OdbcReaderBuilder},
37 text::{TextEncoding, choose_text_strategy},
38};
39
40pub trait ReadStrategy {
42 fn buffer_desc(&self) -> BufferDesc;
44
45 fn fill_arrow_array(&self, column_view: AnySlice) -> Result<ArrayRef, MappingError>;
47}
48
49pub struct NonNullableBoolean;
50
51impl ReadStrategy for NonNullableBoolean {
52 fn buffer_desc(&self) -> BufferDesc {
53 BufferDesc::Bit { nullable: false }
54 }
55
56 fn fill_arrow_array(&self, column_view: AnySlice) -> Result<ArrayRef, MappingError> {
57 let values = Bit::as_slice(column_view).unwrap();
58 let mut builder = BooleanBuilder::new();
59 for bit in values {
60 builder.append_value(bit.as_bool());
61 }
62 Ok(Arc::new(builder.finish()))
63 }
64}
65
66pub struct NullableBoolean;
67
68impl ReadStrategy for NullableBoolean {
69 fn buffer_desc(&self) -> BufferDesc {
70 BufferDesc::Bit { nullable: true }
71 }
72
73 fn fill_arrow_array(&self, column_view: AnySlice) -> Result<ArrayRef, MappingError> {
74 let values = Bit::as_nullable_slice(column_view).unwrap();
75 let mut builder = BooleanBuilder::new();
76 for bit in values {
77 builder.append_option(bit.copied().map(Bit::as_bool))
78 }
79 Ok(Arc::new(builder.finish()))
80 }
81}
82
83#[derive(Default, Debug, Clone, Copy)]
87pub struct BufferAllocationOptions {
88 pub max_text_size: Option<usize>,
101 pub max_binary_size: Option<usize>,
110 pub fallibale_allocations: bool,
114}
115
116pub fn choose_column_strategy(
117 field: &Field,
118 query_metadata: &mut impl ResultSetMetadata,
119 col_index: u16,
120 buffer_allocation_options: BufferAllocationOptions,
121 map_value_errors_to_null: bool,
122 trim_fixed_sized_character_strings: bool,
123 text_encoding: TextEncoding,
124) -> Result<Box<dyn ReadStrategy + Send>, ColumnFailure> {
125 let strat: Box<dyn ReadStrategy + Send> = match field.data_type() {
126 ArrowDataType::Boolean => {
127 if field.is_nullable() {
128 Box::new(NullableBoolean)
129 } else {
130 Box::new(NonNullableBoolean)
131 }
132 }
133 ArrowDataType::Int8 => Int8Type::identical(field.is_nullable()),
134 ArrowDataType::Int16 => Int16Type::identical(field.is_nullable()),
135 ArrowDataType::Int32 => Int32Type::identical(field.is_nullable()),
136 ArrowDataType::Int64 => Int64Type::identical(field.is_nullable()),
137 ArrowDataType::UInt8 => UInt8Type::identical(field.is_nullable()),
138 ArrowDataType::Float32 => Float32Type::identical(field.is_nullable()),
139 ArrowDataType::Float64 => Float64Type::identical(field.is_nullable()),
140 ArrowDataType::Date32 => Date32Type::map_infalliable(field.is_nullable(), days_since_epoch),
141 ArrowDataType::Utf8 => {
142 let sql_type = query_metadata
143 .col_data_type(col_index)
144 .map_err(ColumnFailure::FailedToDescribeColumn)?;
145 debug!("Relational type of column {}: {sql_type:?}", col_index - 1);
148 let lazy_display_size = || query_metadata.col_display_size(col_index);
149 choose_text_strategy(
151 sql_type,
152 lazy_display_size,
153 buffer_allocation_options.max_text_size,
154 trim_fixed_sized_character_strings,
155 text_encoding,
156 )?
157 }
158 ArrowDataType::Decimal128(precision, scale @ 0..) => {
159 Box::new(Decimal::new(*precision, *scale))
160 }
161 ArrowDataType::Binary => {
162 let sql_type = query_metadata
163 .col_data_type(col_index)
164 .map_err(ColumnFailure::FailedToDescribeColumn)?;
165 let length = sql_type.column_size();
166 let length = match (length, buffer_allocation_options.max_binary_size) {
167 (None, None) => return Err(ColumnFailure::ZeroSizedColumn { sql_type }),
168 (None, Some(limit)) => limit,
169 (Some(len), None) => len.get(),
170 (Some(len), Some(limit)) => {
171 if len.get() < limit {
172 len.get()
173 } else {
174 limit
175 }
176 }
177 };
178 Box::new(Binary::new(length))
179 }
180 ArrowDataType::Timestamp(TimeUnit::Second, _) => {
181 TimestampSecondType::map_infalliable(field.is_nullable(), seconds_since_epoch)
182 }
183 ArrowDataType::Timestamp(TimeUnit::Millisecond, _) => {
184 TimestampMillisecondType::map_infalliable(field.is_nullable(), ms_since_epoch)
185 }
186 ArrowDataType::Timestamp(TimeUnit::Microsecond, _) => {
187 TimestampMicrosecondType::map_infalliable(field.is_nullable(), us_since_epoch)
188 }
189 ArrowDataType::Timestamp(TimeUnit::Nanosecond, _) => {
190 TimestampNanosecondType::map_falliable(
191 field.is_nullable(),
192 map_value_errors_to_null,
193 ns_since_epoch,
194 )
195 }
196 ArrowDataType::FixedSizeBinary(length) => {
197 Box::new(FixedSizedBinary::new((*length).try_into().unwrap()))
198 }
199 unsupported_arrow_type => {
200 return Err(ColumnFailure::UnsupportedArrowType(
201 unsupported_arrow_type.clone(),
202 ));
203 }
204 };
205 Ok(strat)
206}
207
208#[derive(Error, Debug)]
210pub enum ColumnFailure {
211 #[error(
213 "The ODBC driver did not specify a sensible upper bound for the column. This usually \
214 happens for large variadic types (E.g. VARCHAR(max)). In other cases it can be a \
215 shortcoming of the ODBC driver. Try casting the column into a type with a sensible upper \
216 bound. `arrow-odbc` also allows the application to specify a generic upper bound, which it \
217 would automatically apply. The type of the column causing this error is {:?}.",
218 sql_type
219 )]
220 ZeroSizedColumn { sql_type: OdbcDataType },
221 #[error(
223 "Unable to deduce the maximum string length for the SQL Data Type reported by the ODBC \
224 driver. Reported SQL data type is: {:?}.\n Error fetching column display or octet size: \
225 {source}",
226 sql_type
227 )]
228 UnknownStringLength {
229 sql_type: OdbcDataType,
230 source: odbc_api::Error,
231 },
232 #[error(
234 "Unsupported arrow type: `{0}`. This type can currently not be fetched from an ODBC data \
235 source by an instance of OdbcReader."
236 )]
237 UnsupportedArrowType(ArrowDataType),
238 #[error(
240 "An error occurred fetching the column description or data type from the metainformation \
241 attached to the ODBC result set:\n{0}"
242 )]
243 FailedToDescribeColumn(#[source] odbc_api::Error),
244 #[error(
245 "Column buffer is too large to be allocated. Tried to alloacte {num_elements} elements \
246 with {element_size} bytes in size each."
247 )]
248 TooLarge {
249 num_elements: usize,
250 element_size: usize,
251 },
252}
253
254impl ColumnFailure {
255 pub fn into_crate_error(self, name: String, index: usize) -> crate::Error {
257 crate::Error::ColumnFailure {
258 name,
259 index,
260 source: self,
261 }
262 }
263}