arrow_odbc/
reader.rs

1use std::{convert::TryInto, sync::Arc};
2
3use arrow::{
4    array::{ArrayRef, BooleanBuilder},
5    datatypes::{
6        DataType as ArrowDataType, Date32Type, Field, Float32Type, Float64Type, Int16Type,
7        Int32Type, Int64Type, Int8Type, TimeUnit, TimestampMicrosecondType,
8        TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, UInt8Type,
9    },
10};
11
12use log::debug;
13use odbc_api::{
14    buffers::{AnySlice, BufferDesc, Item},
15    Bit, DataType as OdbcDataType, ResultSetMetadata,
16};
17use thiserror::Error;
18
19mod binary;
20mod concurrent_odbc_reader;
21mod decimal;
22mod map_odbc_to_arrow;
23mod odbc_reader;
24mod text;
25mod to_record_batch;
26
27use crate::date_time::{
28    days_since_epoch, ms_since_epoch, ns_since_epoch, seconds_since_epoch, us_since_epoch,
29};
30
31pub use self::{
32    binary::{Binary, FixedSizedBinary},
33    concurrent_odbc_reader::ConcurrentOdbcReader,
34    decimal::Decimal,
35    map_odbc_to_arrow::{MapOdbcToArrow, MappingError},
36    odbc_reader::{OdbcReader, OdbcReaderBuilder},
37    text::choose_text_strategy,
38};
39
40/// All decisions needed to copy data from an ODBC buffer to an Arrow Array
41pub trait ReadStrategy {
42    /// Describes the buffer which is bound to the ODBC cursor.
43    fn buffer_desc(&self) -> BufferDesc;
44
45    /// Create an arrow array from an ODBC buffer described in [`Self::buffer_description`].
46    fn fill_arrow_array(&self, column_view: AnySlice) -> Result<ArrayRef, MappingError>;
47}
48
49pub struct NonNullableBoolean;
50
51impl ReadStrategy for NonNullableBoolean {
52    fn buffer_desc(&self) -> BufferDesc {
53        BufferDesc::Bit { nullable: false }
54    }
55
56    fn fill_arrow_array(&self, column_view: AnySlice) -> Result<ArrayRef, MappingError> {
57        let values = Bit::as_slice(column_view).unwrap();
58        let mut builder = BooleanBuilder::new();
59        for bit in values {
60            builder.append_value(bit.as_bool());
61        }
62        Ok(Arc::new(builder.finish()))
63    }
64}
65
66pub struct NullableBoolean;
67
68impl ReadStrategy for NullableBoolean {
69    fn buffer_desc(&self) -> BufferDesc {
70        BufferDesc::Bit { nullable: true }
71    }
72
73    fn fill_arrow_array(&self, column_view: AnySlice) -> Result<ArrayRef, MappingError> {
74        let values = Bit::as_nullable_slice(column_view).unwrap();
75        let mut builder = BooleanBuilder::new();
76        for bit in values {
77            builder.append_option(bit.copied().map(Bit::as_bool))
78        }
79        Ok(Arc::new(builder.finish()))
80    }
81}
82
83/// Allows setting limits for buffers bound to the ODBC data source. Check this out if you find that
84/// you get memory allocation, or zero sized column errors. Used than constructing a reader using
85/// [`crate::OdbcReaderBuilder`].
86#[derive(Default, Debug, Clone, Copy)]
87pub struct BufferAllocationOptions {
88    /// An upper limit for the size of buffers bound to variadic text columns of the data source.
89    /// This limit does not (directly) apply to the size of the created arrow buffers, but rather
90    /// applies to the buffers used for the data in transit. Use this option if you have e.g.
91    /// `VARCHAR(MAX)` fields in your database schema. In such a case without an upper limit, the
92    /// ODBC driver of your data source is asked for the maximum size of an element, and is likely
93    /// to answer with either `0` or a value which is way larger than any actual entry in the column
94    /// If you can not adapt your database schema, this limit might be what you are looking for. On
95    /// windows systems the size is double words (16Bit), as windows utilizes an UTF-16 encoding. So
96    /// this translates to roughly the size in letters. On non windows systems this is the size in
97    /// bytes and the datasource is assumed to utilize an UTF-8 encoding. `None` means no upper
98    /// limit is set and the maximum element size, reported by ODBC is used to determine buffer
99    /// sizes.
100    pub max_text_size: Option<usize>,
101    /// An upper limit for the size of buffers bound to variadic binary columns of the data source.
102    /// This limit does not (directly) apply to the size of the created arrow buffers, but rather
103    /// applies to the buffers used for the data in transit. Use this option if you have e.g.
104    /// `VARBINARY(MAX)` fields in your database schema. In such a case without an upper limit, the
105    /// ODBC driver of your data source is asked for the maximum size of an element, and is likely
106    /// to answer with either `0` or a value which is way larger than any actual entry in the
107    /// column. If you can not adapt your database schema, this limit might be what you are looking
108    /// for. This is the maximum size in bytes of the binary column.
109    pub max_binary_size: Option<usize>,
110    /// Set to `true` in order to trigger an [`ColumnFailure::TooLarge`] instead of a panic in case
111    /// the buffers can not be allocated due to their size. This might have a performance cost for
112    /// constructing the reader. `false` by default.
113    pub fallibale_allocations: bool,
114}
115
116pub fn choose_column_strategy(
117    field: &Field,
118    query_metadata: &mut impl ResultSetMetadata,
119    col_index: u16,
120    buffer_allocation_options: BufferAllocationOptions,
121    map_value_errors_to_null: bool,
122    trim_fixed_sized_character_strings: bool,
123) -> Result<Box<dyn ReadStrategy + Send>, ColumnFailure> {
124    let strat: Box<dyn ReadStrategy + Send> = match field.data_type() {
125        ArrowDataType::Boolean => {
126            if field.is_nullable() {
127                Box::new(NullableBoolean)
128            } else {
129                Box::new(NonNullableBoolean)
130            }
131        }
132        ArrowDataType::Int8 => Int8Type::identical(field.is_nullable()),
133        ArrowDataType::Int16 => Int16Type::identical(field.is_nullable()),
134        ArrowDataType::Int32 => Int32Type::identical(field.is_nullable()),
135        ArrowDataType::Int64 => Int64Type::identical(field.is_nullable()),
136        ArrowDataType::UInt8 => UInt8Type::identical(field.is_nullable()),
137        ArrowDataType::Float32 => Float32Type::identical(field.is_nullable()),
138        ArrowDataType::Float64 => Float64Type::identical(field.is_nullable()),
139        ArrowDataType::Date32 => Date32Type::map_infalliable(field.is_nullable(), days_since_epoch),
140        ArrowDataType::Utf8 => {
141            let sql_type = query_metadata
142                .col_data_type(col_index)
143                .map_err(ColumnFailure::FailedToDescribeColumn)?;
144            // Use a zero based index here, because we use it everywhere else there we communicate
145            // with users.
146            debug!("Relational type of column {}: {sql_type:?}", col_index - 1);
147            let lazy_display_size = || query_metadata.col_display_size(col_index);
148            // Use the SQL type first to determine buffer length.
149            choose_text_strategy(
150                sql_type,
151                lazy_display_size,
152                buffer_allocation_options.max_text_size,
153                trim_fixed_sized_character_strings,
154            )?
155        }
156        ArrowDataType::Decimal128(precision, scale @ 0..) => {
157            Box::new(Decimal::new(*precision, *scale))
158        }
159        ArrowDataType::Binary => {
160            let sql_type = query_metadata
161                .col_data_type(col_index)
162                .map_err(ColumnFailure::FailedToDescribeColumn)?;
163            let length = sql_type.column_size();
164            let length = match (length, buffer_allocation_options.max_binary_size) {
165                (None, None) => return Err(ColumnFailure::ZeroSizedColumn { sql_type }),
166                (None, Some(limit)) => limit,
167                (Some(len), None) => len.get(),
168                (Some(len), Some(limit)) => {
169                    if len.get() < limit {
170                        len.get()
171                    } else {
172                        limit
173                    }
174                }
175            };
176            Box::new(Binary::new(length))
177        }
178        ArrowDataType::Timestamp(TimeUnit::Second, _) => {
179            TimestampSecondType::map_infalliable(field.is_nullable(), seconds_since_epoch)
180        }
181        ArrowDataType::Timestamp(TimeUnit::Millisecond, _) => {
182            TimestampMillisecondType::map_infalliable(field.is_nullable(), ms_since_epoch)
183        }
184        ArrowDataType::Timestamp(TimeUnit::Microsecond, _) => {
185            TimestampMicrosecondType::map_infalliable(field.is_nullable(), us_since_epoch)
186        }
187        ArrowDataType::Timestamp(TimeUnit::Nanosecond, _) => {
188            TimestampNanosecondType::map_falliable(
189                field.is_nullable(),
190                map_value_errors_to_null,
191                ns_since_epoch,
192            )
193        }
194        ArrowDataType::FixedSizeBinary(length) => {
195            Box::new(FixedSizedBinary::new((*length).try_into().unwrap()))
196        }
197        unsupported_arrow_type => {
198            return Err(ColumnFailure::UnsupportedArrowType(
199                unsupported_arrow_type.clone(),
200            ))
201        }
202    };
203    Ok(strat)
204}
205
206/// Read error related to a specific column
207#[derive(Error, Debug)]
208pub enum ColumnFailure {
209    /// We are getting a display or column size from ODBC but it is not larger than 0.
210    #[error(
211        "The ODBC driver did not specify a sensible upper bound for the column. This usually \
212        happens for large variadic types (E.g. VARCHAR(max)). In other cases it can be a \
213        shortcoming of the ODBC driver. Try casting the column into a type with a sensible upper \
214        bound. `arrow-odbc` also allows the application to specify a generic upper bound, which it \
215        would automatically apply. The type of the column causing this error is {:?}.",
216        sql_type
217    )]
218    ZeroSizedColumn { sql_type: OdbcDataType },
219    /// Unable to retrieve the column display size for the column.
220    #[error(
221        "Unable to deduce the maximum string length for the SQL Data Type reported by the ODBC \
222        driver. Reported SQL data type is: {:?}.\n Error fetching column display or octet size: \
223        {source}",
224        sql_type
225    )]
226    UnknownStringLength {
227        sql_type: OdbcDataType,
228        source: odbc_api::Error,
229    },
230    /// The type specified in the arrow schema is not supported to be fetched from the database.
231    #[error(
232        "Unsupported arrow type: `{0}`. This type can currently not be fetched from an ODBC data \
233        source by an instance of OdbcReader."
234    )]
235    UnsupportedArrowType(ArrowDataType),
236    /// At ODBC api calls gaining information about the columns did fail.
237    #[error(
238        "An error occurred fetching the column description or data type from the metainformation \
239        attached to the ODBC result set:\n{0}"
240    )]
241    FailedToDescribeColumn(#[source] odbc_api::Error),
242    #[error(
243        "Column buffer is too large to be allocated. Tried to alloacte {num_elements} elements \
244        with {element_size} bytes in size each."
245    )]
246    TooLarge {
247        num_elements: usize,
248        element_size: usize,
249    },
250}
251
252impl ColumnFailure {
253    /// Provides the error with additional context of Error with column name and index.
254    pub fn into_crate_error(self, name: String, index: usize) -> crate::Error {
255        crate::Error::ColumnFailure {
256            name,
257            index,
258            source: self,
259        }
260    }
261}