arrow_odbc/reader/
odbc_reader.rs

1use std::cmp::min;
2
3use arrow::{
4    datatypes::SchemaRef,
5    error::ArrowError,
6    record_batch::{RecordBatch, RecordBatchReader},
7};
8use odbc_api::{BlockCursor, Cursor, buffers::ColumnarAnyBuffer};
9
10use crate::{BufferAllocationOptions, ConcurrentOdbcReader, Error};
11
12use super::{TextEncoding, to_record_batch::ToRecordBatch};
13
14/// Arrow ODBC reader. Implements the [`arrow::record_batch::RecordBatchReader`] trait so it can be
15/// used to fill Arrow arrays from an ODBC data source.
16///
17/// This reader is generic over the cursor type so it can be used in cases there the cursor only
18/// borrows a statement handle (most likely the case then using prepared queries), or owned
19/// statement handles (recommened then using one shot queries, to have an easier life with the
20/// borrow checker).
21///
22/// # Example
23///
24/// ```no_run
25/// use arrow_odbc::{odbc_api::{Environment, ConnectionOptions}, OdbcReaderBuilder};
26///
27/// const CONNECTION_STRING: &str = "\
28///     Driver={ODBC Driver 18 for SQL Server};\
29///     Server=localhost;\
30///     UID=SA;\
31///     PWD=My@Test@Password1;\
32/// ";
33///
34/// fn main() -> Result<(), anyhow::Error> {
35///
36///     let odbc_environment = Environment::new()?;
37///     
38///     // Connect with database.
39///     let connection = odbc_environment.connect_with_connection_string(
40///         CONNECTION_STRING,
41///         ConnectionOptions::default()
42///     )?;
43///
44///     // This SQL statement does not require any arguments.
45///     let parameters = ();
46///
47///     // Do not apply any timeout.
48///     let timeout_sec = None;
49///
50///     // Execute query and create result set
51///     let cursor = connection
52///         .execute("SELECT * FROM MyTable", parameters, timeout_sec)?
53///         .expect("SELECT statement must produce a cursor");
54///
55///     // Read result set as arrow batches. Infer Arrow types automatically using the meta
56///     // information of `cursor`.
57///     let arrow_record_batches = OdbcReaderBuilder::new()
58///         .build(cursor)?;
59///
60///     for batch in arrow_record_batches {
61///         // ... process batch ...
62///     }
63///     Ok(())
64/// }
65/// ```
66pub struct OdbcReader<C: Cursor> {
67    /// Converts the content of ODBC buffers into Arrow record batches
68    converter: ToRecordBatch,
69    /// Fetches values from the ODBC datasource using columnar batches. Values are streamed batch
70    /// by batch in order to avoid reallocation of the buffers used for tranistion.
71    batch_stream: BlockCursor<C, ColumnarAnyBuffer>,
72    /// We remember if the user decided to use fallibale allocations or not in case we need to
73    /// allocate another buffer due to a state transition towards [`ConcurrentOdbcReader`].
74    fallibale_allocations: bool,
75}
76
77impl<C: Cursor> OdbcReader<C> {
78    /// Consume this instance to create a similar ODBC reader which fetches batches asynchronously.
79    ///
80    /// Steals all resources from this [`OdbcReader`] instance, and allocates another buffer for
81    /// transiting data from the ODBC data source to the application. This way one buffer can be
82    /// written to by a dedicated system thread, while the other is read by the application. Use
83    /// this if you want to trade memory for speed.
84    ///
85    /// # Example
86    ///
87    /// ```no_run
88    /// use arrow_odbc::{odbc_api::{Environment, ConnectionOptions}, OdbcReaderBuilder};
89    /// use std::sync::OnceLock;
90    ///
91    /// // In order to fetch in a dedicated system thread we need a cursor with static lifetime,
92    /// // this implies a static ODBC environment.
93    /// static ENV: OnceLock<Environment> = OnceLock::new();
94    ///
95    /// const CONNECTION_STRING: &str = "\
96    ///     Driver={ODBC Driver 18 for SQL Server};\
97    ///     Server=localhost;\
98    ///     UID=SA;\
99    ///     PWD=My@Test@Password1;\
100    /// ";
101    ///
102    /// fn main() -> Result<(), anyhow::Error> {
103    ///
104    ///     let odbc_environment = ENV.get_or_init(|| {Environment::new().unwrap() });
105    ///     
106    ///     // Connect with database.
107    ///     let connection = odbc_environment.connect_with_connection_string(
108    ///         CONNECTION_STRING,
109    ///         ConnectionOptions::default()
110    ///     )?;
111    ///
112    ///     // This SQL statement does not require any arguments.
113    ///     let parameters = ();
114    ///
115    ///     // Do not apply any timeout.
116    ///     let timeout_sec = None;
117    ///
118    ///     // Execute query and create result set
119    ///     let cursor = connection
120    ///         // Using `into_cursor` instead of `execute` takes ownership of the connection and
121    ///         // allows for a cursor with static lifetime.
122    ///         .into_cursor("SELECT * FROM MyTable", parameters, timeout_sec)
123    ///         .map_err(|e|e.error)?
124    ///         .expect("SELECT statement must produce a cursor");
125    ///
126    ///     // Construct ODBC reader ...
127    ///     let arrow_record_batches = OdbcReaderBuilder::new()
128    ///         .build(cursor)?
129    ///         // ... and make it concurrent
130    ///         .into_concurrent()?;
131    ///
132    ///     for batch in arrow_record_batches {
133    ///         // ... process batch ...
134    ///     }
135    ///     Ok(())
136    /// }
137    /// ```
138    pub fn into_concurrent(self) -> Result<ConcurrentOdbcReader<C>, Error>
139    where
140        C: Send + 'static,
141    {
142        ConcurrentOdbcReader::from_block_cursor(
143            self.batch_stream,
144            self.converter,
145            self.fallibale_allocations,
146        )
147    }
148
149    /// Destroy the ODBC arrow reader and yield the underlyinng cursor object.
150    ///
151    /// One application of this is to process more than one result set in case you executed a stored
152    /// procedure.
153    pub fn into_cursor(self) -> Result<C, odbc_api::Error> {
154        let (cursor, _buffer) = self.batch_stream.unbind()?;
155        Ok(cursor)
156    }
157
158    /// Size of the internal preallocated buffer bound to the cursor and filled by your ODBC driver
159    /// in rows. Each record batch will at most have this many rows. Only the last one may have
160    /// less.
161    pub fn max_rows_per_batch(&self) -> usize {
162        self.batch_stream.row_array_size()
163    }
164}
165
166impl<C> Iterator for OdbcReader<C>
167where
168    C: Cursor,
169{
170    type Item = Result<RecordBatch, ArrowError>;
171
172    fn next(&mut self) -> Option<Self::Item> {
173        match self.batch_stream.fetch_with_truncation_check(true) {
174            // We successfully fetched a batch from the database. Try to copy it into a record batch
175            // and forward errors if any.
176            Ok(Some(batch)) => {
177                let result_record_batch = self
178                    .converter
179                    .buffer_to_record_batch(batch)
180                    .map_err(|mapping_error| ArrowError::ExternalError(Box::new(mapping_error)));
181                Some(result_record_batch)
182            }
183            // We ran out of batches in the result set. End the iterator.
184            Ok(None) => None,
185            // We had an error fetching the next batch from the database, let's report it as an
186            // external error.
187            Err(odbc_error) => Some(Err(odbc_to_arrow_error(odbc_error))),
188        }
189    }
190}
191
192impl<C> RecordBatchReader for OdbcReader<C>
193where
194    C: Cursor,
195{
196    fn schema(&self) -> SchemaRef {
197        self.converter.schema().clone()
198    }
199}
200
201/// Creates instances of [`OdbcReader`] based on [`odbc_api::Cursor`].
202///
203/// Using a builder pattern instead of passing structs with all required arguments to the
204/// constructors of [`OdbcReader`] allows `arrow_odbc` to introduce new paramters to fine tune the
205/// creation and behavior of the readers without breaking the code of downstream applications.
206#[derive(Default, Clone)]
207pub struct OdbcReaderBuilder {
208    /// `Some` implies the user has set this explicitly using
209    /// [`OdbcReaderBuilder::with_max_num_rows_per_batch`]. `None` implies that we have to choose
210    /// for the user.
211    max_num_rows_per_batch: usize,
212    max_bytes_per_batch: usize,
213    schema: Option<SchemaRef>,
214    max_text_size: Option<usize>,
215    max_binary_size: Option<usize>,
216    map_value_errors_to_null: bool,
217    dbms_name: Option<String>,
218    fallibale_allocations: bool,
219    trim_fixed_sized_character_strings: bool,
220    text_encoding: TextEncoding,
221}
222
223impl OdbcReaderBuilder {
224    pub fn new() -> Self {
225        // In the abscence of an explicit row limit set by the user we choose u16 MAX (65535). This
226        // is a reasonable high value to allow for siginificantly reducing IO overhead as opposed to
227        // row by row fetching already. Likely for many database schemas a memory limitation will
228        // kick in before this limit. If not however it can still be dangerous to go beyond this
229        // number. Some drivers use a 16Bit integer to count rows and you can run into overflow
230        // errors if you use one of them. Once such issue occurred with SAP anywhere.
231        const DEFAULT_MAX_ROWS_PER_BATCH: usize = u16::MAX as usize;
232        const DEFAULT_MAX_BYTES_PER_BATCH: usize = 512 * 1024 * 1024;
233
234        OdbcReaderBuilder {
235            max_num_rows_per_batch: DEFAULT_MAX_ROWS_PER_BATCH,
236            max_bytes_per_batch: DEFAULT_MAX_BYTES_PER_BATCH,
237            schema: None,
238            max_text_size: None,
239            max_binary_size: None,
240            fallibale_allocations: false,
241            map_value_errors_to_null: false,
242            dbms_name: None,
243            trim_fixed_sized_character_strings: false,
244            text_encoding: TextEncoding::Auto,
245        }
246    }
247
248    /// Limits the maximum amount of rows which are fetched in a single roundtrip to the datasource.
249    /// Higher numbers lower the IO overhead and may speed up your runtime, but also require larger
250    /// preallocated buffers and use more memory. This value defaults to `65535` which is `u16` max.
251    /// Some ODBC drivers use a 16Bit integer to count rows so this can avoid overflows. The
252    /// improvements in saving IO overhead going above that number are estimated to be small. Your
253    /// milage may vary of course.
254    pub fn with_max_num_rows_per_batch(&mut self, max_num_rows_per_batch: usize) -> &mut Self {
255        self.max_num_rows_per_batch = max_num_rows_per_batch;
256        self
257    }
258
259    /// In addition to a row size limit you may specify an upper bound in bytes for allocating the
260    /// transit buffer. This is useful if you do not know the database schema, or your code has to
261    /// work with different ones, but you know the amount of memory in your machine. This limit is
262    /// applied in addition to [`OdbcReaderBuilder::with_max_num_rows_per_batch`]. Whichever of
263    /// these leads to a smaller buffer is used. This defaults to 512 MiB.
264    pub fn with_max_bytes_per_batch(&mut self, max_bytes_per_batch: usize) -> &mut Self {
265        self.max_bytes_per_batch = max_bytes_per_batch;
266        self
267    }
268
269    /// Describes the types of the Arrow Arrays in the record batches. It is also used to determine
270    /// CData type requested from the data source. If this is not explicitly set the type is infered
271    /// from the schema information provided by the ODBC driver. A reason for setting this
272    /// explicitly could be that you have superior knowledge about your data compared to the ODBC
273    /// driver. E.g. a type for an unsigned byte (`u8`) is not part of the ODBC standard. Therfore
274    /// the driver might at best be able to tell you that this is an (`i8`). If you want to still
275    /// have `u8`s in the resulting array you need to specify the schema manually. Also many drivers
276    /// struggle with reporting nullability correctly and just report every column as nullable.
277    /// Explicitly specifying a schema can also compensate for such shortcomings if it turns out to
278    /// be relevant.
279    pub fn with_schema(&mut self, schema: SchemaRef) -> &mut Self {
280        self.schema = Some(schema);
281        self
282    }
283
284    /// In order for fast bulk fetching to work, `arrow-odbc` needs to know the size of the largest
285    /// possible field in each column. It will do so itself automatically by considering the schema
286    /// information. However, trouble arises if the schema contains ounbounded variadic fields like
287    /// `VARCHAR(MAX)` which can hold really large values. These have a very high upper element
288    /// size, if any. In order to work with such schemas we need a limit, of what the an upper
289    /// bound of the actual values in the column is, as opposed to the what the largest value is the
290    /// column could theoretically store. There is no need for this to be very precise, but just
291    /// knowing that a value would never exceed 4KiB rather than 2GiB is enough to allow for
292    /// tremendous efficiency gains. The size of the text is specified in UTF-8 encoded bytes if
293    /// using a narrow encoding (typically all non-windows systems) and in UTF-16 encoded pairs of
294    /// bytes on systems using a wide encoding (typically windows). This means about the size in
295    /// letters, yet if you are using a lot of emojis or other special characters this number might
296    /// need to be larger.
297    pub fn with_max_text_size(&mut self, max_text_size: usize) -> &mut Self {
298        self.max_text_size = Some(max_text_size);
299        self
300    }
301
302    /// An upper limit for the size of buffers bound to variadic binary columns of the data source.
303    /// This limit does not (directly) apply to the size of the created arrow buffers, but rather
304    /// applies to the buffers used for the data in transit. Use this option if you have e.g.
305    /// `VARBINARY(MAX)` fields in your database schema. In such a case without an upper limit, the
306    /// ODBC driver of your data source is asked for the maximum size of an element, and is likely
307    /// to answer with either `0` or a value which is way larger than any actual entry in the
308    /// column. If you can not adapt your database schema, this limit might be what you are looking
309    /// for. This is the maximum size in bytes of the binary column. If this method is not called no
310    /// upper limit is set and the maximum element size, reported by ODBC is used to determine
311    /// buffer sizes.
312    pub fn with_max_binary_size(&mut self, max_binary_size: usize) -> &mut Self {
313        self.max_binary_size = Some(max_binary_size);
314        self
315    }
316
317    /// Set to `true` in order to trigger an [`crate::ColumnFailure::TooLarge`] instead of a panic
318    /// in case the buffers can not be allocated due to their size. This might have a performance
319    /// cost for constructing the reader. `false` by default.
320    pub fn with_fallibale_allocations(&mut self, fallibale_allocations: bool) -> &mut Self {
321        self.fallibale_allocations = fallibale_allocations;
322        self
323    }
324
325    /// Set to `true` in order to map a value in the database which can not be successfully
326    /// converted into its target type to NULL, rather than emitting an external Arrow Error.
327    /// E.g. currently mapping errors can happen if a datetime value is not in the rang
328    /// representable by arrow. Default is `false`.
329    pub fn value_errors_as_null(&mut self, map_value_errors_to_null: bool) -> &mut Self {
330        self.map_value_errors_to_null = map_value_errors_to_null;
331        self
332    }
333
334    /// If set to `true` text in fixed sized character columns like e.g. CHAR are trimmed of
335    /// whitespaces before converted into Arrow UTF-8 arrays. Default is `false`.
336    pub fn trim_fixed_sized_characters(
337        &mut self,
338        fixed_sized_character_strings_are_trimmed: bool,
339    ) -> &mut Self {
340        self.trim_fixed_sized_character_strings = fixed_sized_character_strings_are_trimmed;
341        self
342    }
343
344    /// Controls the encoding used for transferring text data from the ODBC data source to the
345    /// application. The resulting Arrow arrays will still be UTF-8 encoded. You may want to use
346    /// this if you get garbage characters or invalid UTF-8 errors on non-windows systems to set the
347    /// encoding to [`TextEncoding::Utf16`]. On windows systems you may want to set this to
348    /// [`TextEncoding::Utf8`] to gain performance benefits, after you have verified that your
349    /// system locale is set to UTF-8. The default is [`TextEncoding::Auto`].
350    pub fn with_payload_text_encoding(&mut self, text_encoding: TextEncoding) -> &mut Self {
351        self.text_encoding = text_encoding;
352        self
353    }
354
355    /// If provided the name of the database management system (DBMS) is used to account for
356    /// database specific behavior when determining the arrow schema.
357    ///
358    /// To deterimne the name of the dbms you can call
359    /// [`odbc_api::Connection::database_management_system_name`].
360    pub fn with_dbms_name(&mut self, dbms_name: String) -> &mut Self {
361        self.dbms_name = Some(dbms_name);
362        self
363    }
364
365    /// No matter if the user explicitly specified a limit in row size, a memory limit, both or
366    /// neither. In order to construct a reader we need to decide on the buffer size in rows.
367    fn buffer_size_in_rows(&self, bytes_per_row: usize) -> Result<usize, Error> {
368        // If schema is empty, return before division by zero error.
369        if bytes_per_row == 0 {
370            return Ok(self.max_bytes_per_batch);
371        }
372        let rows_per_batch = self.max_bytes_per_batch / bytes_per_row;
373        if rows_per_batch == 0 {
374            Err(Error::OdbcBufferTooSmall {
375                max_bytes_per_batch: self.max_bytes_per_batch,
376                bytes_per_row,
377            })
378        } else {
379            Ok(min(self.max_num_rows_per_batch, rows_per_batch))
380        }
381    }
382
383    /// Constructs an [`OdbcReader`] which consumes the giver cursor. The cursor will also be used
384    /// to infer the Arrow schema if it has not been supplied explicitly.
385    ///
386    /// # Parameters
387    ///
388    /// * `cursor`: ODBC cursor used to fetch batches from the data source. The constructor will
389    ///   bind buffers to this cursor in order to perform bulk fetches from the source. This is
390    ///   usually faster than fetching results row by row as it saves roundtrips to the database.
391    ///   The type of these buffers will be inferred from the arrow schema. Not every arrow type is
392    ///   supported though.
393    pub fn build<C>(&self, mut cursor: C) -> Result<OdbcReader<C>, Error>
394    where
395        C: Cursor,
396    {
397        let buffer_allocation_options = BufferAllocationOptions {
398            max_text_size: self.max_text_size,
399            max_binary_size: self.max_binary_size,
400            fallibale_allocations: self.fallibale_allocations,
401        };
402        let converter = ToRecordBatch::new(
403            &mut cursor,
404            self.schema.clone(),
405            buffer_allocation_options,
406            self.map_value_errors_to_null,
407            self.dbms_name.as_deref(),
408            self.trim_fixed_sized_character_strings,
409            self.text_encoding,
410        )?;
411        let bytes_per_row = converter.row_size_in_bytes();
412        let buffer_size_in_rows = self.buffer_size_in_rows(bytes_per_row)?;
413        let row_set_buffer =
414            converter.allocate_buffer(buffer_size_in_rows, self.fallibale_allocations)?;
415        let batch_stream = cursor.bind_buffer(row_set_buffer).unwrap();
416
417        Ok(OdbcReader {
418            converter,
419            batch_stream,
420            fallibale_allocations: self.fallibale_allocations,
421        })
422    }
423}
424
425pub fn odbc_to_arrow_error(odbc_error: odbc_api::Error) -> ArrowError {
426    ArrowError::from_external_error(Box::new(odbc_error))
427}