arrow_odbc/
reader.rs

1use std::{convert::TryInto, sync::Arc};
2
3use arrow::{
4    array::{ArrayRef, BooleanBuilder},
5    datatypes::{
6        DataType as ArrowDataType, Date32Type, Field, Float32Type, Float64Type, Int8Type,
7        Int16Type, Int32Type, Int64Type, Time32SecondType, TimeUnit, TimestampMicrosecondType,
8        TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, UInt8Type,
9    },
10};
11
12use log::debug;
13use odbc_api::{
14    Bit, DataType as OdbcDataType, ResultSetMetadata,
15    buffers::{AnySlice, BufferDesc, Item},
16};
17use thiserror::Error;
18use time::{TimeMsI32, TimeNsI64, TimeUsI64, seconds_since_midnight};
19
20mod binary;
21mod concurrent_odbc_reader;
22mod decimal;
23mod map_odbc_to_arrow;
24mod odbc_reader;
25mod text;
26mod time;
27mod to_record_batch;
28
29use crate::date_time::{
30    days_since_epoch, ms_since_epoch, ns_since_epoch, seconds_since_epoch, us_since_epoch,
31};
32
33pub use self::{
34    binary::{Binary, FixedSizedBinary},
35    concurrent_odbc_reader::ConcurrentOdbcReader,
36    decimal::Decimal,
37    map_odbc_to_arrow::{MapOdbcToArrow, MappingError},
38    odbc_reader::{OdbcReader, OdbcReaderBuilder},
39    text::{TextEncoding, choose_text_strategy},
40};
41
42/// All decisions needed to copy data from an ODBC buffer to an Arrow Array
43pub trait ReadStrategy {
44    /// Describes the buffer which is bound to the ODBC cursor.
45    fn buffer_desc(&self) -> BufferDesc;
46
47    /// Create an arrow array from an ODBC buffer described in [`Self::buffer_description`].
48    fn fill_arrow_array(&self, column_view: AnySlice) -> Result<ArrayRef, MappingError>;
49}
50
51pub struct NonNullableBoolean;
52
53impl ReadStrategy for NonNullableBoolean {
54    fn buffer_desc(&self) -> BufferDesc {
55        BufferDesc::Bit { nullable: false }
56    }
57
58    fn fill_arrow_array(&self, column_view: AnySlice) -> Result<ArrayRef, MappingError> {
59        let values = Bit::as_slice(column_view).unwrap();
60        let mut builder = BooleanBuilder::new();
61        for bit in values {
62            builder.append_value(bit.as_bool());
63        }
64        Ok(Arc::new(builder.finish()))
65    }
66}
67
68pub struct NullableBoolean;
69
70impl ReadStrategy for NullableBoolean {
71    fn buffer_desc(&self) -> BufferDesc {
72        BufferDesc::Bit { nullable: true }
73    }
74
75    fn fill_arrow_array(&self, column_view: AnySlice) -> Result<ArrayRef, MappingError> {
76        let values = Bit::as_nullable_slice(column_view).unwrap();
77        let mut builder = BooleanBuilder::new();
78        for bit in values {
79            builder.append_option(bit.copied().map(Bit::as_bool))
80        }
81        Ok(Arc::new(builder.finish()))
82    }
83}
84
85/// Allows setting limits for buffers bound to the ODBC data source. Check this out if you find that
86/// you get memory allocation, or zero sized column errors. Used than constructing a reader using
87/// [`crate::OdbcReaderBuilder`].
88#[derive(Default, Debug, Clone, Copy)]
89pub struct BufferAllocationOptions {
90    /// An upper limit for the size of buffers bound to variadic text columns of the data source.
91    /// This limit does not (directly) apply to the size of the created arrow buffers, but rather
92    /// applies to the buffers used for the data in transit. Use this option if you have e.g.
93    /// `VARCHAR(MAX)` fields in your database schema. In such a case without an upper limit, the
94    /// ODBC driver of your data source is asked for the maximum size of an element, and is likely
95    /// to answer with either `0` or a value which is way larger than any actual entry in the column
96    /// If you can not adapt your database schema, this limit might be what you are looking for. On
97    /// windows systems the size is double words (16Bit), as windows utilizes an UTF-16 encoding. So
98    /// this translates to roughly the size in letters. On non windows systems this is the size in
99    /// bytes and the datasource is assumed to utilize an UTF-8 encoding. `None` means no upper
100    /// limit is set and the maximum element size, reported by ODBC is used to determine buffer
101    /// sizes.
102    pub max_text_size: Option<usize>,
103    /// An upper limit for the size of buffers bound to variadic binary columns of the data source.
104    /// This limit does not (directly) apply to the size of the created arrow buffers, but rather
105    /// applies to the buffers used for the data in transit. Use this option if you have e.g.
106    /// `VARBINARY(MAX)` fields in your database schema. In such a case without an upper limit, the
107    /// ODBC driver of your data source is asked for the maximum size of an element, and is likely
108    /// to answer with either `0` or a value which is way larger than any actual entry in the
109    /// column. If you can not adapt your database schema, this limit might be what you are looking
110    /// for. This is the maximum size in bytes of the binary column.
111    pub max_binary_size: Option<usize>,
112    /// Set to `true` in order to trigger an [`ColumnFailure::TooLarge`] instead of a panic in case
113    /// the buffers can not be allocated due to their size. This might have a performance cost for
114    /// constructing the reader. `false` by default.
115    pub fallibale_allocations: bool,
116}
117
118pub fn choose_column_strategy(
119    field: &Field,
120    query_metadata: &mut impl ResultSetMetadata,
121    col_index: u16,
122    buffer_allocation_options: BufferAllocationOptions,
123    map_value_errors_to_null: bool,
124    trim_fixed_sized_character_strings: bool,
125    text_encoding: TextEncoding,
126) -> Result<Box<dyn ReadStrategy + Send>, ColumnFailure> {
127    let strat: Box<dyn ReadStrategy + Send> = match field.data_type() {
128        ArrowDataType::Boolean => {
129            if field.is_nullable() {
130                Box::new(NullableBoolean)
131            } else {
132                Box::new(NonNullableBoolean)
133            }
134        }
135        ArrowDataType::Int8 => Int8Type::identical(field.is_nullable()),
136        ArrowDataType::Int16 => Int16Type::identical(field.is_nullable()),
137        ArrowDataType::Int32 => Int32Type::identical(field.is_nullable()),
138        ArrowDataType::Int64 => Int64Type::identical(field.is_nullable()),
139        ArrowDataType::UInt8 => UInt8Type::identical(field.is_nullable()),
140        ArrowDataType::Float32 => Float32Type::identical(field.is_nullable()),
141        ArrowDataType::Float64 => Float64Type::identical(field.is_nullable()),
142        ArrowDataType::Date32 => Date32Type::map_infalliable(field.is_nullable(), days_since_epoch),
143        ArrowDataType::Time32(TimeUnit::Second) => {
144            Time32SecondType::map_infalliable(field.is_nullable(), seconds_since_midnight)
145        }
146        ArrowDataType::Time32(TimeUnit::Millisecond) => Box::new(TimeMsI32),
147        ArrowDataType::Time64(TimeUnit::Microsecond) => Box::new(TimeUsI64),
148        ArrowDataType::Time64(TimeUnit::Nanosecond) => Box::new(TimeNsI64),
149        ArrowDataType::Utf8 => {
150            let sql_type = query_metadata
151                .col_data_type(col_index)
152                .map_err(ColumnFailure::FailedToDescribeColumn)?;
153            // Use a zero based index here, because we use it everywhere else there we communicate
154            // with users.
155            debug!("Relational type of column {}: {sql_type:?}", col_index - 1);
156            let lazy_display_size = || query_metadata.col_display_size(col_index);
157            // Use the SQL type first to determine buffer length.
158            choose_text_strategy(
159                sql_type,
160                lazy_display_size,
161                buffer_allocation_options.max_text_size,
162                trim_fixed_sized_character_strings,
163                text_encoding,
164            )?
165        }
166        ArrowDataType::Decimal128(precision, scale @ 0..) => {
167            Box::new(Decimal::new(*precision, *scale))
168        }
169        ArrowDataType::Binary => {
170            let sql_type = query_metadata
171                .col_data_type(col_index)
172                .map_err(ColumnFailure::FailedToDescribeColumn)?;
173            let length = sql_type.column_size();
174            let length = match (length, buffer_allocation_options.max_binary_size) {
175                (None, None) => return Err(ColumnFailure::ZeroSizedColumn { sql_type }),
176                (None, Some(limit)) => limit,
177                (Some(len), None) => len.get(),
178                (Some(len), Some(limit)) => {
179                    if len.get() < limit {
180                        len.get()
181                    } else {
182                        limit
183                    }
184                }
185            };
186            Box::new(Binary::new(length))
187        }
188        ArrowDataType::Timestamp(TimeUnit::Second, _) => {
189            TimestampSecondType::map_infalliable(field.is_nullable(), seconds_since_epoch)
190        }
191        ArrowDataType::Timestamp(TimeUnit::Millisecond, _) => {
192            TimestampMillisecondType::map_infalliable(field.is_nullable(), ms_since_epoch)
193        }
194        ArrowDataType::Timestamp(TimeUnit::Microsecond, _) => {
195            TimestampMicrosecondType::map_infalliable(field.is_nullable(), us_since_epoch)
196        }
197        ArrowDataType::Timestamp(TimeUnit::Nanosecond, _) => {
198            TimestampNanosecondType::map_falliable(
199                field.is_nullable(),
200                map_value_errors_to_null,
201                ns_since_epoch,
202            )
203        }
204        ArrowDataType::FixedSizeBinary(length) => {
205            Box::new(FixedSizedBinary::new((*length).try_into().unwrap()))
206        }
207        unsupported_arrow_type => {
208            return Err(ColumnFailure::UnsupportedArrowType(
209                unsupported_arrow_type.clone(),
210            ));
211        }
212    };
213    Ok(strat)
214}
215
216/// Read error related to a specific column
217#[derive(Error, Debug)]
218pub enum ColumnFailure {
219    /// We are getting a display or column size from ODBC but it is not larger than 0.
220    #[error(
221        "The ODBC driver did not specify a sensible upper bound for the column. This usually \
222        happens for large variadic types (E.g. VARCHAR(max)). In other cases it can be a \
223        shortcoming of the ODBC driver. Try casting the column into a type with a sensible upper \
224        bound. `arrow-odbc` also allows the application to specify a generic upper bound, which it \
225        would automatically apply. The type of the column causing this error is {:?}.",
226        sql_type
227    )]
228    ZeroSizedColumn { sql_type: OdbcDataType },
229    /// Unable to retrieve the column display size for the column.
230    #[error(
231        "Unable to deduce the maximum string length for the SQL Data Type reported by the ODBC \
232        driver. Reported SQL data type is: {:?}.\n Error fetching column display or octet size: \
233        {source}",
234        sql_type
235    )]
236    UnknownStringLength {
237        sql_type: OdbcDataType,
238        source: odbc_api::Error,
239    },
240    /// The type specified in the arrow schema is not supported to be fetched from the database.
241    #[error(
242        "Unsupported arrow type: `{0}`. This type can currently not be fetched from an ODBC data \
243        source by an instance of OdbcReader."
244    )]
245    UnsupportedArrowType(ArrowDataType),
246    /// At ODBC api calls gaining information about the columns did fail.
247    #[error(
248        "An error occurred fetching the column description or data type from the metainformation \
249        attached to the ODBC result set:\n{0}"
250    )]
251    FailedToDescribeColumn(#[source] odbc_api::Error),
252    #[error(
253        "Column buffer is too large to be allocated. Tried to alloacte {num_elements} elements \
254        with {element_size} bytes in size each."
255    )]
256    TooLarge {
257        num_elements: usize,
258        element_size: usize,
259    },
260}
261
262impl ColumnFailure {
263    /// Provides the error with additional context of Error with column name and index.
264    pub fn into_crate_error(self, name: String, index: usize) -> crate::Error {
265        crate::Error::ColumnFailure {
266            name,
267            index,
268            source: self,
269        }
270    }
271}