arrow_odbc/reader/
odbc_reader.rs

1use std::cmp::min;
2
3use arrow::{
4    datatypes::SchemaRef,
5    error::ArrowError,
6    record_batch::{RecordBatch, RecordBatchReader},
7};
8use odbc_api::{buffers::ColumnarAnyBuffer, BlockCursor, Cursor};
9
10use crate::{BufferAllocationOptions, ConcurrentOdbcReader, Error};
11
12use super::to_record_batch::ToRecordBatch;
13
14/// Arrow ODBC reader. Implements the [`arrow::record_batch::RecordBatchReader`] trait so it can be
15/// used to fill Arrow arrays from an ODBC data source.
16///
17/// This reader is generic over the cursor type so it can be used in cases there the cursor only
18/// borrows a statement handle (most likely the case then using prepared queries), or owned
19/// statement handles (recommened then using one shot queries, to have an easier life with the
20/// borrow checker).
21///
22/// # Example
23///
24/// ```no_run
25/// use arrow_odbc::{odbc_api::{Environment, ConnectionOptions}, OdbcReaderBuilder};
26///
27/// const CONNECTION_STRING: &str = "\
28///     Driver={ODBC Driver 18 for SQL Server};\
29///     Server=localhost;\
30///     UID=SA;\
31///     PWD=My@Test@Password1;\
32/// ";
33///
34/// fn main() -> Result<(), anyhow::Error> {
35///
36///     let odbc_environment = Environment::new()?;
37///     
38///     // Connect with database.
39///     let connection = odbc_environment.connect_with_connection_string(
40///         CONNECTION_STRING,
41///         ConnectionOptions::default()
42///     )?;
43///
44///     // This SQL statement does not require any arguments.
45///     let parameters = ();
46///
47///     // Do not apply any timeout.
48///     let timeout_sec = None;
49///
50///     // Execute query and create result set
51///     let cursor = connection
52///         .execute("SELECT * FROM MyTable", parameters, timeout_sec)?
53///         .expect("SELECT statement must produce a cursor");
54///
55///     // Read result set as arrow batches. Infer Arrow types automatically using the meta
56///     // information of `cursor`.
57///     let arrow_record_batches = OdbcReaderBuilder::new()
58///         .build(cursor)?;
59///
60///     for batch in arrow_record_batches {
61///         // ... process batch ...
62///     }
63///     Ok(())
64/// }
65/// ```
66pub struct OdbcReader<C: Cursor> {
67    /// Converts the content of ODBC buffers into Arrow record batches
68    converter: ToRecordBatch,
69    /// Fetches values from the ODBC datasource using columnar batches. Values are streamed batch
70    /// by batch in order to avoid reallocation of the buffers used for tranistion.
71    batch_stream: BlockCursor<C, ColumnarAnyBuffer>,
72    /// We remember if the user decided to use fallibale allocations or not in case we need to
73    /// allocate another buffer due to a state transition towards [`ConcurrentOdbcReader`].
74    fallibale_allocations: bool,
75}
76
77impl<C: Cursor> OdbcReader<C> {
78    /// Consume this instance to create a similar ODBC reader which fetches batches asynchronously.
79    ///
80    /// Steals all resources from this [`OdbcReader`] instance, and allocates another buffer for
81    /// transiting data from the ODBC data source to the application. This way one buffer can be
82    /// written to by a dedicated system thread, while the other is read by the application. Use
83    /// this if you want to trade memory for speed.
84    ///
85    /// # Example
86    ///
87    /// ```no_run
88    /// use arrow_odbc::{odbc_api::{Environment, ConnectionOptions}, OdbcReaderBuilder};
89    /// use std::sync::OnceLock;
90    ///
91    /// // In order to fetch in a dedicated system thread we need a cursor with static lifetime,
92    /// // this implies a static ODBC environment.
93    /// static ENV: OnceLock<Environment> = OnceLock::new();
94    ///
95    /// const CONNECTION_STRING: &str = "\
96    ///     Driver={ODBC Driver 18 for SQL Server};\
97    ///     Server=localhost;\
98    ///     UID=SA;\
99    ///     PWD=My@Test@Password1;\
100    /// ";
101    ///
102    /// fn main() -> Result<(), anyhow::Error> {
103    ///
104    ///     let odbc_environment = ENV.get_or_init(|| {Environment::new().unwrap() });
105    ///     
106    ///     // Connect with database.
107    ///     let connection = odbc_environment.connect_with_connection_string(
108    ///         CONNECTION_STRING,
109    ///         ConnectionOptions::default()
110    ///     )?;
111    ///
112    ///     // This SQL statement does not require any arguments.
113    ///     let parameters = ();
114    ///
115    ///     // Do not apply any timeout.
116    ///     let timeout_sec = None;
117    ///
118    ///     // Execute query and create result set
119    ///     let cursor = connection
120    ///         // Using `into_cursor` instead of `execute` takes ownership of the connection and
121    ///         // allows for a cursor with static lifetime.
122    ///         .into_cursor("SELECT * FROM MyTable", parameters, timeout_sec)
123    ///         .map_err(|e|e.error)?
124    ///         .expect("SELECT statement must produce a cursor");
125    ///
126    ///     // Construct ODBC reader ...
127    ///     let arrow_record_batches = OdbcReaderBuilder::new()
128    ///         .build(cursor)?
129    ///         // ... and make it concurrent
130    ///         .into_concurrent()?;
131    ///
132    ///     for batch in arrow_record_batches {
133    ///         // ... process batch ...
134    ///     }
135    ///     Ok(())
136    /// }
137    /// ```
138    pub fn into_concurrent(self) -> Result<ConcurrentOdbcReader<C>, Error>
139    where
140        C: Send + 'static,
141    {
142        ConcurrentOdbcReader::from_block_cursor(
143            self.batch_stream,
144            self.converter,
145            self.fallibale_allocations,
146        )
147    }
148
149    /// Destroy the ODBC arrow reader and yield the underlyinng cursor object.
150    ///
151    /// One application of this is to process more than one result set in case you executed a stored
152    /// procedure.
153    pub fn into_cursor(self) -> Result<C, odbc_api::Error> {
154        let (cursor, _buffer) = self.batch_stream.unbind()?;
155        Ok(cursor)
156    }
157
158    /// Size of the internal preallocated buffer bound to the cursor and filled by your ODBC driver
159    /// in rows. Each record batch will at most have this many rows. Only the last one may have
160    /// less.
161    pub fn max_rows_per_batch(&self) -> usize {
162        self.batch_stream.row_array_size()
163    }
164}
165
166impl<C> Iterator for OdbcReader<C>
167where
168    C: Cursor,
169{
170    type Item = Result<RecordBatch, ArrowError>;
171
172    fn next(&mut self) -> Option<Self::Item> {
173        match self.batch_stream.fetch_with_truncation_check(true) {
174            // We successfully fetched a batch from the database. Try to copy it into a record batch
175            // and forward errors if any.
176            Ok(Some(batch)) => {
177                let result_record_batch = self
178                    .converter
179                    .buffer_to_record_batch(batch)
180                    .map_err(|mapping_error| ArrowError::ExternalError(Box::new(mapping_error)));
181                Some(result_record_batch)
182            }
183            // We ran out of batches in the result set. End the iterator.
184            Ok(None) => None,
185            // We had an error fetching the next batch from the database, let's report it as an
186            // external error.
187            Err(odbc_error) => Some(Err(odbc_to_arrow_error(odbc_error))),
188        }
189    }
190}
191
192impl<C> RecordBatchReader for OdbcReader<C>
193where
194    C: Cursor,
195{
196    fn schema(&self) -> SchemaRef {
197        self.converter.schema().clone()
198    }
199}
200
201/// Creates instances of [`OdbcReader`] based on [`odbc_api::Cursor`].
202///
203/// Using a builder pattern instead of passing structs with all required arguments to the
204/// constructors of [`OdbcReader`] allows `arrow_odbc` to introduce new paramters to fine tune the
205/// creation and behavior of the readers without breaking the code of downstream applications.
206#[derive(Default, Clone)]
207pub struct OdbcReaderBuilder {
208    /// `Some` implies the user has set this explicitly using
209    /// [`OdbcReaderBuilder::with_max_num_rows_per_batch`]. `None` implies that we have to choose
210    /// for the user.
211    max_num_rows_per_batch: usize,
212    max_bytes_per_batch: usize,
213    schema: Option<SchemaRef>,
214    max_text_size: Option<usize>,
215    max_binary_size: Option<usize>,
216    map_value_errors_to_null: bool,
217    fallibale_allocations: bool,
218    trim_fixed_sized_character_strings: bool,
219}
220
221impl OdbcReaderBuilder {
222    pub fn new() -> Self {
223        // In the abscence of an explicit row limit set by the user we choose u16 MAX (65535). This
224        // is a reasonable high value to allow for siginificantly reducing IO overhead as opposed to
225        // row by row fetching already. Likely for many database schemas a memory limitation will
226        // kick in before this limit. If not however it can still be dangerous to go beyond this
227        // number. Some drivers use a 16Bit integer to count rows and you can run into overflow
228        // errors if you use one of them. Once such issue occurred with SAP anywhere.
229        const DEFAULT_MAX_ROWS_PER_BATCH: usize = u16::MAX as usize;
230        const DEFAULT_MAX_BYTES_PER_BATCH: usize = 512 * 1024 * 1024;
231
232        OdbcReaderBuilder {
233            max_num_rows_per_batch: DEFAULT_MAX_ROWS_PER_BATCH,
234            max_bytes_per_batch: DEFAULT_MAX_BYTES_PER_BATCH,
235            schema: None,
236            max_text_size: None,
237            max_binary_size: None,
238            fallibale_allocations: false,
239            map_value_errors_to_null: false,
240            trim_fixed_sized_character_strings: false,
241        }
242    }
243
244    /// Limits the maximum amount of rows which are fetched in a single roundtrip to the datasource.
245    /// Higher numbers lower the IO overhead and may speed up your runtime, but also require larger
246    /// preallocated buffers and use more memory. This value defaults to `65535` which is `u16` max.
247    /// Some ODBC drivers use a 16Bit integer to count rows so this can avoid overflows. The
248    /// improvements in saving IO overhead going above that number are estimated to be small. Your
249    /// milage may vary of course.
250    pub fn with_max_num_rows_per_batch(&mut self, max_num_rows_per_batch: usize) -> &mut Self {
251        self.max_num_rows_per_batch = max_num_rows_per_batch;
252        self
253    }
254
255    /// In addition to a row size limit you may specify an upper bound in bytes for allocating the
256    /// transit buffer. This is useful if you do not know the database schema, or your code has to
257    /// work with different ones, but you know the amount of memory in your machine. This limit is
258    /// applied in addition to [`OdbcReaderBuilder::with_max_num_rows_per_batch`]. Whichever of
259    /// these leads to a smaller buffer is used. This defaults to 512 MiB.
260    pub fn with_max_bytes_per_batch(&mut self, max_bytes_per_batch: usize) -> &mut Self {
261        self.max_bytes_per_batch = max_bytes_per_batch;
262        self
263    }
264
265    /// Describes the types of the Arrow Arrays in the record batches. It is also used to determine
266    /// CData type requested from the data source. If this is not explicitly set the type is infered
267    /// from the schema information provided by the ODBC driver. A reason for setting this
268    /// explicitly could be that you have superior knowledge about your data compared to the ODBC
269    /// driver. E.g. a type for an unsigned byte (`u8`) is not part of the ODBC standard. Therfore
270    /// the driver might at best be able to tell you that this is an (`i8`). If you want to still
271    /// have `u8`s in the resulting array you need to specify the schema manually. Also many drivers
272    /// struggle with reporting nullability correctly and just report every column as nullable.
273    /// Explicitly specifying a schema can also compensate for such shortcomings if it turns out to
274    /// be relevant.
275    pub fn with_schema(&mut self, schema: SchemaRef) -> &mut Self {
276        self.schema = Some(schema);
277        self
278    }
279
280    /// An upper limit for the size of buffers bound to variadic text columns of the data source.
281    /// This limit does not (directly) apply to the size of the created arrow buffers, but rather
282    /// applies to the buffers used for the data in transit. Use this option if you have e.g.
283    /// `VARCHAR(MAX)` fields in your database schema. In such a case without an upper limit, the
284    /// ODBC driver of your data source is asked for the maximum size of an element, and is likely
285    /// to answer with either `0` or a value which is way larger than any actual entry in the column
286    /// If you can not adapt your database schema, this limit might be what you are looking for. On
287    /// windows systems the size is double words (16Bit), as windows utilizes an UTF-16 encoding. So
288    /// this translates to roughly the size in letters. On non windows systems this is the size in
289    /// bytes and the datasource is assumed to utilize an UTF-8 encoding. If this method is not
290    /// called no upper limit is set and the maximum element size, reported by ODBC is used to
291    /// determine buffer sizes.
292    pub fn with_max_text_size(&mut self, max_text_size: usize) -> &mut Self {
293        self.max_text_size = Some(max_text_size);
294        self
295    }
296
297    /// An upper limit for the size of buffers bound to variadic binary columns of the data source.
298    /// This limit does not (directly) apply to the size of the created arrow buffers, but rather
299    /// applies to the buffers used for the data in transit. Use this option if you have e.g.
300    /// `VARBINARY(MAX)` fields in your database schema. In such a case without an upper limit, the
301    /// ODBC driver of your data source is asked for the maximum size of an element, and is likely
302    /// to answer with either `0` or a value which is way larger than any actual entry in the
303    /// column. If you can not adapt your database schema, this limit might be what you are looking
304    /// for. This is the maximum size in bytes of the binary column. If this method is not called no
305    /// upper limit is set and the maximum element size, reported by ODBC is used to determine
306    /// buffer sizes.
307    pub fn with_max_binary_size(&mut self, max_binary_size: usize) -> &mut Self {
308        self.max_binary_size = Some(max_binary_size);
309        self
310    }
311
312    /// Set to `true` in order to trigger an [`crate::ColumnFailure::TooLarge`] instead of a panic
313    /// in case the buffers can not be allocated due to their size. This might have a performance
314    /// cost for constructing the reader. `false` by default.
315    pub fn with_fallibale_allocations(&mut self, fallibale_allocations: bool) -> &mut Self {
316        self.fallibale_allocations = fallibale_allocations;
317        self
318    }
319
320    /// Set to `true` in order to map a value in the database which can not be successfully
321    /// converted into its target type to NULL, rather than emitting an external Arrow Error.
322    /// E.g. currently mapping errors can happen if a datetime value is not in the rang
323    /// representable by arrow. Default is `false`.
324    pub fn value_errors_as_null(&mut self, map_value_errors_to_null: bool) -> &mut Self {
325        self.map_value_errors_to_null = map_value_errors_to_null;
326        self
327    }
328
329    /// If set to `true` text in fixed sized character columns like e.g. CHAR are trimmed of
330    /// whitespaces before converted into Arrow UTF-8 arrays. Default is `false`.
331    pub fn trim_fixed_sized_characters(
332        &mut self,
333        fixed_sized_character_strings_are_trimmed: bool,
334    ) -> &mut Self {
335        self.trim_fixed_sized_character_strings = fixed_sized_character_strings_are_trimmed;
336        self
337    }
338
339    /// No matter if the user explicitly specified a limit in row size, a memory limit, both or
340    /// neither. In order to construct a reader we need to decide on the buffer size in rows.
341    fn buffer_size_in_rows(&self, bytes_per_row: usize) -> Result<usize, Error> {
342        // If schema is empty, return before division by zero error.
343        if bytes_per_row == 0 {
344            return Ok(self.max_bytes_per_batch);
345        }
346        let rows_per_batch = self.max_bytes_per_batch / bytes_per_row;
347        if rows_per_batch == 0 {
348            Err(Error::OdbcBufferTooSmall {
349                max_bytes_per_batch: self.max_bytes_per_batch,
350                bytes_per_row,
351            })
352        } else {
353            Ok(min(self.max_num_rows_per_batch, rows_per_batch))
354        }
355    }
356
357    /// Constructs an [`OdbcReader`] which consumes the giver cursor. The cursor will also be used
358    /// to infer the Arrow schema if it has not been supplied explicitly.
359    ///
360    /// # Parameters
361    ///
362    /// * `cursor`: ODBC cursor used to fetch batches from the data source. The constructor will
363    ///   bind buffers to this cursor in order to perform bulk fetches from the source. This is
364    ///   usually faster than fetching results row by row as it saves roundtrips to the database.
365    ///   The type of these buffers will be inferred from the arrow schema. Not every arrow type is
366    ///   supported though.
367    pub fn build<C>(&self, mut cursor: C) -> Result<OdbcReader<C>, Error>
368    where
369        C: Cursor,
370    {
371        let buffer_allocation_options = BufferAllocationOptions {
372            max_text_size: self.max_text_size,
373            max_binary_size: self.max_binary_size,
374            fallibale_allocations: self.fallibale_allocations,
375        };
376        let converter = ToRecordBatch::new(
377            &mut cursor,
378            self.schema.clone(),
379            buffer_allocation_options,
380            self.map_value_errors_to_null,
381            self.trim_fixed_sized_character_strings,
382        )?;
383        let bytes_per_row = converter.row_size_in_bytes();
384        let buffer_size_in_rows = self.buffer_size_in_rows(bytes_per_row)?;
385        let row_set_buffer =
386            converter.allocate_buffer(buffer_size_in_rows, self.fallibale_allocations)?;
387        let batch_stream = cursor.bind_buffer(row_set_buffer).unwrap();
388
389        Ok(OdbcReader {
390            converter,
391            batch_stream,
392            fallibale_allocations: self.fallibale_allocations,
393        })
394    }
395}
396
397pub fn odbc_to_arrow_error(odbc_error: odbc_api::Error) -> ArrowError {
398    ArrowError::from_external_error(Box::new(odbc_error))
399}