arrow_odbc/
reader.rs

1use std::{convert::TryInto, sync::Arc};
2
3use arrow::{
4    array::{ArrayRef, BooleanBuilder},
5    datatypes::{
6        DataType as ArrowDataType, Date32Type, Field, Float32Type, Float64Type, Int8Type,
7        Int16Type, Int32Type, Int64Type, TimeUnit, TimestampMicrosecondType,
8        TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, UInt8Type,
9    },
10};
11
12use log::debug;
13use odbc_api::{
14    Bit, DataType as OdbcDataType, ResultSetMetadata,
15    buffers::{AnySlice, BufferDesc, Item},
16};
17use thiserror::Error;
18
19mod binary;
20mod concurrent_odbc_reader;
21mod decimal;
22mod map_odbc_to_arrow;
23mod odbc_reader;
24mod text;
25mod to_record_batch;
26
27use crate::date_time::{
28    days_since_epoch, ms_since_epoch, ns_since_epoch, seconds_since_epoch, us_since_epoch,
29};
30
31pub use self::{
32    binary::{Binary, FixedSizedBinary},
33    concurrent_odbc_reader::ConcurrentOdbcReader,
34    decimal::Decimal,
35    map_odbc_to_arrow::{MapOdbcToArrow, MappingError},
36    odbc_reader::{OdbcReader, OdbcReaderBuilder},
37    text::{TextEncoding, choose_text_strategy},
38};
39
40/// All decisions needed to copy data from an ODBC buffer to an Arrow Array
41pub trait ReadStrategy {
42    /// Describes the buffer which is bound to the ODBC cursor.
43    fn buffer_desc(&self) -> BufferDesc;
44
45    /// Create an arrow array from an ODBC buffer described in [`Self::buffer_description`].
46    fn fill_arrow_array(&self, column_view: AnySlice) -> Result<ArrayRef, MappingError>;
47}
48
49pub struct NonNullableBoolean;
50
51impl ReadStrategy for NonNullableBoolean {
52    fn buffer_desc(&self) -> BufferDesc {
53        BufferDesc::Bit { nullable: false }
54    }
55
56    fn fill_arrow_array(&self, column_view: AnySlice) -> Result<ArrayRef, MappingError> {
57        let values = Bit::as_slice(column_view).unwrap();
58        let mut builder = BooleanBuilder::new();
59        for bit in values {
60            builder.append_value(bit.as_bool());
61        }
62        Ok(Arc::new(builder.finish()))
63    }
64}
65
66pub struct NullableBoolean;
67
68impl ReadStrategy for NullableBoolean {
69    fn buffer_desc(&self) -> BufferDesc {
70        BufferDesc::Bit { nullable: true }
71    }
72
73    fn fill_arrow_array(&self, column_view: AnySlice) -> Result<ArrayRef, MappingError> {
74        let values = Bit::as_nullable_slice(column_view).unwrap();
75        let mut builder = BooleanBuilder::new();
76        for bit in values {
77            builder.append_option(bit.copied().map(Bit::as_bool))
78        }
79        Ok(Arc::new(builder.finish()))
80    }
81}
82
83/// Allows setting limits for buffers bound to the ODBC data source. Check this out if you find that
84/// you get memory allocation, or zero sized column errors. Used than constructing a reader using
85/// [`crate::OdbcReaderBuilder`].
86#[derive(Default, Debug, Clone, Copy)]
87pub struct BufferAllocationOptions {
88    /// An upper limit for the size of buffers bound to variadic text columns of the data source.
89    /// This limit does not (directly) apply to the size of the created arrow buffers, but rather
90    /// applies to the buffers used for the data in transit. Use this option if you have e.g.
91    /// `VARCHAR(MAX)` fields in your database schema. In such a case without an upper limit, the
92    /// ODBC driver of your data source is asked for the maximum size of an element, and is likely
93    /// to answer with either `0` or a value which is way larger than any actual entry in the column
94    /// If you can not adapt your database schema, this limit might be what you are looking for. On
95    /// windows systems the size is double words (16Bit), as windows utilizes an UTF-16 encoding. So
96    /// this translates to roughly the size in letters. On non windows systems this is the size in
97    /// bytes and the datasource is assumed to utilize an UTF-8 encoding. `None` means no upper
98    /// limit is set and the maximum element size, reported by ODBC is used to determine buffer
99    /// sizes.
100    pub max_text_size: Option<usize>,
101    /// An upper limit for the size of buffers bound to variadic binary columns of the data source.
102    /// This limit does not (directly) apply to the size of the created arrow buffers, but rather
103    /// applies to the buffers used for the data in transit. Use this option if you have e.g.
104    /// `VARBINARY(MAX)` fields in your database schema. In such a case without an upper limit, the
105    /// ODBC driver of your data source is asked for the maximum size of an element, and is likely
106    /// to answer with either `0` or a value which is way larger than any actual entry in the
107    /// column. If you can not adapt your database schema, this limit might be what you are looking
108    /// for. This is the maximum size in bytes of the binary column.
109    pub max_binary_size: Option<usize>,
110    /// Set to `true` in order to trigger an [`ColumnFailure::TooLarge`] instead of a panic in case
111    /// the buffers can not be allocated due to their size. This might have a performance cost for
112    /// constructing the reader. `false` by default.
113    pub fallibale_allocations: bool,
114}
115
116pub fn choose_column_strategy(
117    field: &Field,
118    query_metadata: &mut impl ResultSetMetadata,
119    col_index: u16,
120    buffer_allocation_options: BufferAllocationOptions,
121    map_value_errors_to_null: bool,
122    trim_fixed_sized_character_strings: bool,
123    text_encoding: TextEncoding,
124) -> Result<Box<dyn ReadStrategy + Send>, ColumnFailure> {
125    let strat: Box<dyn ReadStrategy + Send> = match field.data_type() {
126        ArrowDataType::Boolean => {
127            if field.is_nullable() {
128                Box::new(NullableBoolean)
129            } else {
130                Box::new(NonNullableBoolean)
131            }
132        }
133        ArrowDataType::Int8 => Int8Type::identical(field.is_nullable()),
134        ArrowDataType::Int16 => Int16Type::identical(field.is_nullable()),
135        ArrowDataType::Int32 => Int32Type::identical(field.is_nullable()),
136        ArrowDataType::Int64 => Int64Type::identical(field.is_nullable()),
137        ArrowDataType::UInt8 => UInt8Type::identical(field.is_nullable()),
138        ArrowDataType::Float32 => Float32Type::identical(field.is_nullable()),
139        ArrowDataType::Float64 => Float64Type::identical(field.is_nullable()),
140        ArrowDataType::Date32 => Date32Type::map_infalliable(field.is_nullable(), days_since_epoch),
141        ArrowDataType::Utf8 => {
142            let sql_type = query_metadata
143                .col_data_type(col_index)
144                .map_err(ColumnFailure::FailedToDescribeColumn)?;
145            // Use a zero based index here, because we use it everywhere else there we communicate
146            // with users.
147            debug!("Relational type of column {}: {sql_type:?}", col_index - 1);
148            let lazy_display_size = || query_metadata.col_display_size(col_index);
149            // Use the SQL type first to determine buffer length.
150            choose_text_strategy(
151                sql_type,
152                lazy_display_size,
153                buffer_allocation_options.max_text_size,
154                trim_fixed_sized_character_strings,
155                text_encoding,
156            )?
157        }
158        ArrowDataType::Decimal128(precision, scale @ 0..) => {
159            Box::new(Decimal::new(*precision, *scale))
160        }
161        ArrowDataType::Binary => {
162            let sql_type = query_metadata
163                .col_data_type(col_index)
164                .map_err(ColumnFailure::FailedToDescribeColumn)?;
165            let length = sql_type.column_size();
166            let length = match (length, buffer_allocation_options.max_binary_size) {
167                (None, None) => return Err(ColumnFailure::ZeroSizedColumn { sql_type }),
168                (None, Some(limit)) => limit,
169                (Some(len), None) => len.get(),
170                (Some(len), Some(limit)) => {
171                    if len.get() < limit {
172                        len.get()
173                    } else {
174                        limit
175                    }
176                }
177            };
178            Box::new(Binary::new(length))
179        }
180        ArrowDataType::Timestamp(TimeUnit::Second, _) => {
181            TimestampSecondType::map_infalliable(field.is_nullable(), seconds_since_epoch)
182        }
183        ArrowDataType::Timestamp(TimeUnit::Millisecond, _) => {
184            TimestampMillisecondType::map_infalliable(field.is_nullable(), ms_since_epoch)
185        }
186        ArrowDataType::Timestamp(TimeUnit::Microsecond, _) => {
187            TimestampMicrosecondType::map_infalliable(field.is_nullable(), us_since_epoch)
188        }
189        ArrowDataType::Timestamp(TimeUnit::Nanosecond, _) => {
190            TimestampNanosecondType::map_falliable(
191                field.is_nullable(),
192                map_value_errors_to_null,
193                ns_since_epoch,
194            )
195        }
196        ArrowDataType::FixedSizeBinary(length) => {
197            Box::new(FixedSizedBinary::new((*length).try_into().unwrap()))
198        }
199        unsupported_arrow_type => {
200            return Err(ColumnFailure::UnsupportedArrowType(
201                unsupported_arrow_type.clone(),
202            ));
203        }
204    };
205    Ok(strat)
206}
207
208/// Read error related to a specific column
209#[derive(Error, Debug)]
210pub enum ColumnFailure {
211    /// We are getting a display or column size from ODBC but it is not larger than 0.
212    #[error(
213        "The ODBC driver did not specify a sensible upper bound for the column. This usually \
214        happens for large variadic types (E.g. VARCHAR(max)). In other cases it can be a \
215        shortcoming of the ODBC driver. Try casting the column into a type with a sensible upper \
216        bound. `arrow-odbc` also allows the application to specify a generic upper bound, which it \
217        would automatically apply. The type of the column causing this error is {:?}.",
218        sql_type
219    )]
220    ZeroSizedColumn { sql_type: OdbcDataType },
221    /// Unable to retrieve the column display size for the column.
222    #[error(
223        "Unable to deduce the maximum string length for the SQL Data Type reported by the ODBC \
224        driver. Reported SQL data type is: {:?}.\n Error fetching column display or octet size: \
225        {source}",
226        sql_type
227    )]
228    UnknownStringLength {
229        sql_type: OdbcDataType,
230        source: odbc_api::Error,
231    },
232    /// The type specified in the arrow schema is not supported to be fetched from the database.
233    #[error(
234        "Unsupported arrow type: `{0}`. This type can currently not be fetched from an ODBC data \
235        source by an instance of OdbcReader."
236    )]
237    UnsupportedArrowType(ArrowDataType),
238    /// At ODBC api calls gaining information about the columns did fail.
239    #[error(
240        "An error occurred fetching the column description or data type from the metainformation \
241        attached to the ODBC result set:\n{0}"
242    )]
243    FailedToDescribeColumn(#[source] odbc_api::Error),
244    #[error(
245        "Column buffer is too large to be allocated. Tried to alloacte {num_elements} elements \
246        with {element_size} bytes in size each."
247    )]
248    TooLarge {
249        num_elements: usize,
250        element_size: usize,
251    },
252}
253
254impl ColumnFailure {
255    /// Provides the error with additional context of Error with column name and index.
256    pub fn into_crate_error(self, name: String, index: usize) -> crate::Error {
257        crate::Error::ColumnFailure {
258            name,
259            index,
260            source: self,
261        }
262    }
263}