Skip to main content

arrow_odbc/
reader.rs

1use std::{convert::TryInto, sync::Arc};
2
3use arrow::{
4    array::{ArrayRef, BooleanBuilder},
5    datatypes::{
6        DataType as ArrowDataType, Date32Type, Field, Float32Type, Float64Type, Int8Type,
7        Int16Type, Int32Type, Int64Type, Time32SecondType, TimeUnit, TimestampMicrosecondType,
8        TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, UInt8Type,
9    },
10};
11
12use log::debug;
13use odbc_api::{
14    Bit, DataType as OdbcDataType, ResultSetMetadata,
15    buffers::{AnyColumnBufferSlice, BufferDesc},
16};
17use thiserror::Error;
18use time::{TimeMsI32, TimeNsI64, TimeUsI64, seconds_since_midnight};
19
20mod binary;
21mod concurrent_odbc_reader;
22mod decimal;
23mod map_odbc_to_arrow;
24mod odbc_reader;
25mod text;
26mod time;
27mod to_record_batch;
28
29use crate::date_time::{
30    days_since_epoch, ms_since_epoch, ns_since_epoch, seconds_since_epoch, us_since_epoch,
31};
32
33pub use self::{
34    binary::{Binary, FixedSizedBinary},
35    concurrent_odbc_reader::ConcurrentOdbcReader,
36    decimal::Decimal,
37    map_odbc_to_arrow::{MapOdbcToArrow, MappingError},
38    odbc_reader::{OdbcReader, OdbcReaderBuilder},
39    text::{TextEncoding, choose_text_strategy},
40};
41
42/// All decisions needed to copy data from an ODBC buffer to an Arrow Array
43pub trait ReadStrategy {
44    /// Describes the buffer which is bound to the ODBC cursor.
45    fn buffer_desc(&self) -> BufferDesc;
46
47    /// Create an arrow array from an ODBC buffer described in [`Self::buffer_description`].
48    fn fill_arrow_array(&self, column_view: AnyColumnBufferSlice)
49    -> Result<ArrayRef, MappingError>;
50}
51
52pub struct NonNullableBoolean;
53
54impl ReadStrategy for NonNullableBoolean {
55    fn buffer_desc(&self) -> BufferDesc {
56        BufferDesc::Bit { nullable: false }
57    }
58
59    fn fill_arrow_array(
60        &self,
61        column_view: AnyColumnBufferSlice,
62    ) -> Result<ArrayRef, MappingError> {
63        let values = column_view.as_slice::<Bit>().unwrap();
64        let mut builder = BooleanBuilder::new();
65        for bit in values {
66            builder.append_value(bit.as_bool());
67        }
68        Ok(Arc::new(builder.finish()))
69    }
70}
71
72pub struct NullableBoolean;
73
74impl ReadStrategy for NullableBoolean {
75    fn buffer_desc(&self) -> BufferDesc {
76        BufferDesc::Bit { nullable: true }
77    }
78
79    fn fill_arrow_array(
80        &self,
81        column_view: AnyColumnBufferSlice,
82    ) -> Result<ArrayRef, MappingError> {
83        let values = column_view.as_nullable_slice().unwrap();
84        let mut builder = BooleanBuilder::new();
85        for bit in values {
86            builder.append_option(bit.copied().map(Bit::as_bool))
87        }
88        Ok(Arc::new(builder.finish()))
89    }
90}
91
92/// Allows setting limits for buffers bound to the ODBC data source. Check this out if you find that
93/// you get memory allocation, or zero sized column errors. Used than constructing a reader using
94/// [`crate::OdbcReaderBuilder`].
95#[derive(Default, Debug, Clone, Copy)]
96pub struct BufferAllocationOptions {
97    /// An upper limit for the size of buffers bound to variadic text columns of the data source.
98    /// This limit does not (directly) apply to the size of the created arrow buffers, but rather
99    /// applies to the buffers used for the data in transit. Use this option if you have e.g.
100    /// `VARCHAR(MAX)` fields in your database schema. In such a case without an upper limit, the
101    /// ODBC driver of your data source is asked for the maximum size of an element, and is likely
102    /// to answer with either `0` or a value which is way larger than any actual entry in the column
103    /// If you can not adapt your database schema, this limit might be what you are looking for. On
104    /// windows systems the size is double words (16Bit), as windows utilizes an UTF-16 encoding. So
105    /// this translates to roughly the size in letters. On non windows systems this is the size in
106    /// bytes and the datasource is assumed to utilize an UTF-8 encoding. `None` means no upper
107    /// limit is set and the maximum element size, reported by ODBC is used to determine buffer
108    /// sizes.
109    pub max_text_size: Option<usize>,
110    /// An upper limit for the size of buffers bound to variadic binary columns of the data source.
111    /// This limit does not (directly) apply to the size of the created arrow buffers, but rather
112    /// applies to the buffers used for the data in transit. Use this option if you have e.g.
113    /// `VARBINARY(MAX)` fields in your database schema. In such a case without an upper limit, the
114    /// ODBC driver of your data source is asked for the maximum size of an element, and is likely
115    /// to answer with either `0` or a value which is way larger than any actual entry in the
116    /// column. If you can not adapt your database schema, this limit might be what you are looking
117    /// for. This is the maximum size in bytes of the binary column.
118    pub max_binary_size: Option<usize>,
119    /// Set to `true` in order to trigger an [`ColumnFailure::TooLarge`] instead of a panic in case
120    /// the buffers can not be allocated due to their size. This might have a performance cost for
121    /// constructing the reader. `false` by default.
122    pub fallibale_allocations: bool,
123}
124
125pub fn choose_column_strategy(
126    field: &Field,
127    query_metadata: &mut impl ResultSetMetadata,
128    col_index: u16,
129    buffer_allocation_options: BufferAllocationOptions,
130    map_value_errors_to_null: bool,
131    trim_fixed_sized_character_strings: bool,
132    text_encoding: TextEncoding,
133) -> Result<Box<dyn ReadStrategy + Send>, ColumnFailure> {
134    let strat: Box<dyn ReadStrategy + Send> = match field.data_type() {
135        ArrowDataType::Boolean => {
136            if field.is_nullable() {
137                Box::new(NullableBoolean)
138            } else {
139                Box::new(NonNullableBoolean)
140            }
141        }
142        ArrowDataType::Int8 => Int8Type::identical(field.is_nullable()),
143        ArrowDataType::Int16 => Int16Type::identical(field.is_nullable()),
144        ArrowDataType::Int32 => Int32Type::identical(field.is_nullable()),
145        ArrowDataType::Int64 => Int64Type::identical(field.is_nullable()),
146        ArrowDataType::UInt8 => UInt8Type::identical(field.is_nullable()),
147        ArrowDataType::Float32 => Float32Type::identical(field.is_nullable()),
148        ArrowDataType::Float64 => Float64Type::identical(field.is_nullable()),
149        ArrowDataType::Date32 => Date32Type::map_infalliable(field.is_nullable(), days_since_epoch),
150        ArrowDataType::Time32(TimeUnit::Second) => {
151            Time32SecondType::map_infalliable(field.is_nullable(), seconds_since_midnight)
152        }
153        ArrowDataType::Time32(TimeUnit::Millisecond) => Box::new(TimeMsI32),
154        ArrowDataType::Time64(TimeUnit::Microsecond) => Box::new(TimeUsI64),
155        ArrowDataType::Time64(TimeUnit::Nanosecond) => Box::new(TimeNsI64),
156        ArrowDataType::Utf8 => {
157            let sql_type = query_metadata
158                .col_data_type(col_index)
159                .map_err(ColumnFailure::FailedToDescribeColumn)?;
160            // Use a zero based index here, because we use it everywhere else there we communicate
161            // with users.
162            debug!("Relational type of column {}: {sql_type:?}", col_index - 1);
163            let lazy_display_size = || query_metadata.col_display_size(col_index);
164            // Use the SQL type first to determine buffer length.
165            choose_text_strategy(
166                sql_type,
167                lazy_display_size,
168                buffer_allocation_options.max_text_size,
169                trim_fixed_sized_character_strings,
170                text_encoding,
171            )?
172        }
173        ArrowDataType::Decimal128(precision, scale @ 0..) => {
174            Box::new(Decimal::new(*precision, *scale))
175        }
176        ArrowDataType::Binary => {
177            let sql_type = query_metadata
178                .col_data_type(col_index)
179                .map_err(ColumnFailure::FailedToDescribeColumn)?;
180            let length = sql_type.column_size();
181            let length = match (length, buffer_allocation_options.max_binary_size) {
182                (None, None) => return Err(ColumnFailure::ZeroSizedColumn { sql_type }),
183                (None, Some(limit)) => limit,
184                (Some(len), None) => len.get(),
185                (Some(len), Some(limit)) => {
186                    if len.get() < limit {
187                        len.get()
188                    } else {
189                        limit
190                    }
191                }
192            };
193            Box::new(Binary::new(length))
194        }
195        ArrowDataType::Timestamp(TimeUnit::Second, _) => {
196            TimestampSecondType::map_infalliable(field.is_nullable(), seconds_since_epoch)
197        }
198        ArrowDataType::Timestamp(TimeUnit::Millisecond, _) => {
199            TimestampMillisecondType::map_infalliable(field.is_nullable(), ms_since_epoch)
200        }
201        ArrowDataType::Timestamp(TimeUnit::Microsecond, _) => {
202            TimestampMicrosecondType::map_infalliable(field.is_nullable(), us_since_epoch)
203        }
204        ArrowDataType::Timestamp(TimeUnit::Nanosecond, _) => {
205            TimestampNanosecondType::map_falliable(
206                field.is_nullable(),
207                map_value_errors_to_null,
208                ns_since_epoch,
209            )
210        }
211        ArrowDataType::FixedSizeBinary(length) => {
212            Box::new(FixedSizedBinary::new((*length).try_into().unwrap()))
213        }
214        unsupported_arrow_type => {
215            return Err(ColumnFailure::UnsupportedArrowType(
216                unsupported_arrow_type.clone(),
217            ));
218        }
219    };
220    Ok(strat)
221}
222
223/// Read error related to a specific column
224#[derive(Error, Debug)]
225pub enum ColumnFailure {
226    /// We are getting a display or column size from ODBC but it is not larger than 0.
227    #[error(
228        "The ODBC driver did not specify a sensible upper bound for the column. This usually \
229        happens for large variadic types (E.g. VARCHAR(max)). In other cases it can be a \
230        shortcoming of the ODBC driver. Try casting the column into a type with a sensible upper \
231        bound. `arrow-odbc` also allows the application to specify a generic upper bound, which it \
232        would automatically apply. The type of the column causing this error is {:?}.",
233        sql_type
234    )]
235    ZeroSizedColumn { sql_type: OdbcDataType },
236    /// Unable to retrieve the column display size for the column.
237    #[error(
238        "Unable to deduce the maximum string length for the SQL Data Type reported by the ODBC \
239        driver. Reported SQL data type is: {:?}.\n Error fetching column display or octet size: \
240        {source}",
241        sql_type
242    )]
243    UnknownStringLength {
244        sql_type: OdbcDataType,
245        source: odbc_api::Error,
246    },
247    /// The type specified in the arrow schema is not supported to be fetched from the database.
248    #[error(
249        "Unsupported arrow type: `{0}`. This type can currently not be fetched from an ODBC data \
250        source by an instance of OdbcReader."
251    )]
252    UnsupportedArrowType(ArrowDataType),
253    /// At ODBC api calls gaining information about the columns did fail.
254    #[error(
255        "An error occurred fetching the column description or data type from the metainformation \
256        attached to the ODBC result set:\n{0}"
257    )]
258    FailedToDescribeColumn(#[source] odbc_api::Error),
259    #[error(
260        "Column buffer is too large to be allocated. Tried to alloacte {num_elements} elements \
261        with {element_size} bytes in size each."
262    )]
263    TooLarge {
264        num_elements: usize,
265        element_size: usize,
266    },
267}
268
269impl ColumnFailure {
270    /// Provides the error with additional context of Error with column name and index.
271    pub fn into_crate_error(self, name: String, index: usize) -> crate::Error {
272        crate::Error::ColumnFailure {
273            name,
274            index,
275            source: self,
276        }
277    }
278}