arrow_odbc/reader/odbc_reader.rs
1use std::cmp::min;
2
3use arrow::{
4 datatypes::SchemaRef,
5 error::ArrowError,
6 record_batch::{RecordBatch, RecordBatchReader},
7};
8use odbc_api::{BlockCursor, Cursor, buffers::ColumnarAnyBuffer};
9
10use crate::{BufferAllocationOptions, ConcurrentOdbcReader, Error};
11
12use super::{TextEncoding, to_record_batch::ToRecordBatch};
13
14/// Arrow ODBC reader. Implements the [`arrow::record_batch::RecordBatchReader`] trait so it can be
15/// used to fill Arrow arrays from an ODBC data source.
16///
17/// This reader is generic over the cursor type so it can be used in cases there the cursor only
18/// borrows a statement handle (most likely the case then using prepared queries), or owned
19/// statement handles (recommened then using one shot queries, to have an easier life with the
20/// borrow checker).
21///
22/// # Example
23///
24/// ```no_run
25/// use arrow_odbc::{odbc_api::{Environment, ConnectionOptions}, OdbcReaderBuilder};
26///
27/// const CONNECTION_STRING: &str = "\
28/// Driver={ODBC Driver 18 for SQL Server};\
29/// Server=localhost;\
30/// UID=SA;\
31/// PWD=My@Test@Password1;\
32/// ";
33///
34/// fn main() -> Result<(), anyhow::Error> {
35///
36/// let odbc_environment = Environment::new()?;
37///
38/// // Connect with database.
39/// let connection = odbc_environment.connect_with_connection_string(
40/// CONNECTION_STRING,
41/// ConnectionOptions::default()
42/// )?;
43///
44/// // This SQL statement does not require any arguments.
45/// let parameters = ();
46///
47/// // Do not apply any timeout.
48/// let timeout_sec = None;
49///
50/// // Execute query and create result set
51/// let cursor = connection
52/// .execute("SELECT * FROM MyTable", parameters, timeout_sec)?
53/// .expect("SELECT statement must produce a cursor");
54///
55/// // Read result set as arrow batches. Infer Arrow types automatically using the meta
56/// // information of `cursor`.
57/// let arrow_record_batches = OdbcReaderBuilder::new()
58/// .build(cursor)?;
59///
60/// for batch in arrow_record_batches {
61/// // ... process batch ...
62/// }
63/// Ok(())
64/// }
65/// ```
66pub struct OdbcReader<C: Cursor> {
67 /// Converts the content of ODBC buffers into Arrow record batches
68 converter: ToRecordBatch,
69 /// Fetches values from the ODBC datasource using columnar batches. Values are streamed batch
70 /// by batch in order to avoid reallocation of the buffers used for tranistion.
71 batch_stream: BlockCursor<C, ColumnarAnyBuffer>,
72 /// We remember if the user decided to use fallibale allocations or not in case we need to
73 /// allocate another buffer due to a state transition towards [`ConcurrentOdbcReader`].
74 fallibale_allocations: bool,
75}
76
77impl<C: Cursor> OdbcReader<C> {
78 /// Consume this instance to create a similar ODBC reader which fetches batches asynchronously.
79 ///
80 /// Steals all resources from this [`OdbcReader`] instance, and allocates another buffer for
81 /// transiting data from the ODBC data source to the application. This way one buffer can be
82 /// written to by a dedicated system thread, while the other is read by the application. Use
83 /// this if you want to trade memory for speed.
84 ///
85 /// # Example
86 ///
87 /// ```no_run
88 /// use arrow_odbc::{odbc_api::{Environment, ConnectionOptions}, OdbcReaderBuilder};
89 /// use std::sync::OnceLock;
90 ///
91 /// // In order to fetch in a dedicated system thread we need a cursor with static lifetime,
92 /// // this implies a static ODBC environment.
93 /// static ENV: OnceLock<Environment> = OnceLock::new();
94 ///
95 /// const CONNECTION_STRING: &str = "\
96 /// Driver={ODBC Driver 18 for SQL Server};\
97 /// Server=localhost;\
98 /// UID=SA;\
99 /// PWD=My@Test@Password1;\
100 /// ";
101 ///
102 /// fn main() -> Result<(), anyhow::Error> {
103 ///
104 /// let odbc_environment = ENV.get_or_init(|| {Environment::new().unwrap() });
105 ///
106 /// // Connect with database.
107 /// let connection = odbc_environment.connect_with_connection_string(
108 /// CONNECTION_STRING,
109 /// ConnectionOptions::default()
110 /// )?;
111 ///
112 /// // This SQL statement does not require any arguments.
113 /// let parameters = ();
114 ///
115 /// // Do not apply any timeout.
116 /// let timeout_sec = None;
117 ///
118 /// // Execute query and create result set
119 /// let cursor = connection
120 /// // Using `into_cursor` instead of `execute` takes ownership of the connection and
121 /// // allows for a cursor with static lifetime.
122 /// .into_cursor("SELECT * FROM MyTable", parameters, timeout_sec)
123 /// .map_err(|e|e.error)?
124 /// .expect("SELECT statement must produce a cursor");
125 ///
126 /// // Construct ODBC reader ...
127 /// let arrow_record_batches = OdbcReaderBuilder::new()
128 /// .build(cursor)?
129 /// // ... and make it concurrent
130 /// .into_concurrent()?;
131 ///
132 /// for batch in arrow_record_batches {
133 /// // ... process batch ...
134 /// }
135 /// Ok(())
136 /// }
137 /// ```
138 pub fn into_concurrent(self) -> Result<ConcurrentOdbcReader<C>, Error>
139 where
140 C: Send + 'static,
141 {
142 ConcurrentOdbcReader::from_block_cursor(
143 self.batch_stream,
144 self.converter,
145 self.fallibale_allocations,
146 )
147 }
148
149 /// Destroy the ODBC arrow reader and yield the underlyinng cursor object.
150 ///
151 /// One application of this is to process more than one result set in case you executed a stored
152 /// procedure.
153 pub fn into_cursor(self) -> Result<C, odbc_api::Error> {
154 let (cursor, _buffer) = self.batch_stream.unbind()?;
155 Ok(cursor)
156 }
157
158 /// Size of the internal preallocated buffer bound to the cursor and filled by your ODBC driver
159 /// in rows. Each record batch will at most have this many rows. Only the last one may have
160 /// less.
161 pub fn max_rows_per_batch(&self) -> usize {
162 self.batch_stream.row_array_size()
163 }
164}
165
166impl<C> Iterator for OdbcReader<C>
167where
168 C: Cursor,
169{
170 type Item = Result<RecordBatch, ArrowError>;
171
172 fn next(&mut self) -> Option<Self::Item> {
173 match self.batch_stream.fetch_with_truncation_check(true) {
174 // We successfully fetched a batch from the database. Try to copy it into a record batch
175 // and forward errors if any.
176 Ok(Some(batch)) => {
177 let result_record_batch = self
178 .converter
179 .buffer_to_record_batch(batch)
180 .map_err(|mapping_error| ArrowError::ExternalError(Box::new(mapping_error)));
181 Some(result_record_batch)
182 }
183 // We ran out of batches in the result set. End the iterator.
184 Ok(None) => None,
185 // We had an error fetching the next batch from the database, let's report it as an
186 // external error.
187 Err(odbc_error) => Some(Err(odbc_to_arrow_error(odbc_error))),
188 }
189 }
190}
191
192impl<C> RecordBatchReader for OdbcReader<C>
193where
194 C: Cursor,
195{
196 fn schema(&self) -> SchemaRef {
197 self.converter.schema().clone()
198 }
199}
200
201/// Creates instances of [`OdbcReader`] based on [`odbc_api::Cursor`].
202///
203/// Using a builder pattern instead of passing structs with all required arguments to the
204/// constructors of [`OdbcReader`] allows `arrow_odbc` to introduce new paramters to fine tune the
205/// creation and behavior of the readers without breaking the code of downstream applications.
206#[derive(Default, Clone)]
207pub struct OdbcReaderBuilder {
208 /// `Some` implies the user has set this explicitly using
209 /// [`OdbcReaderBuilder::with_max_num_rows_per_batch`]. `None` implies that we have to choose
210 /// for the user.
211 max_num_rows_per_batch: usize,
212 max_bytes_per_batch: usize,
213 schema: Option<SchemaRef>,
214 max_text_size: Option<usize>,
215 max_binary_size: Option<usize>,
216 map_value_errors_to_null: bool,
217 dbms_name: Option<String>,
218 fallibale_allocations: bool,
219 trim_fixed_sized_character_strings: bool,
220 text_encoding: TextEncoding,
221}
222
223impl OdbcReaderBuilder {
224 pub fn new() -> Self {
225 // In the abscence of an explicit row limit set by the user we choose u16 MAX (65535). This
226 // is a reasonable high value to allow for siginificantly reducing IO overhead as opposed to
227 // row by row fetching already. Likely for many database schemas a memory limitation will
228 // kick in before this limit. If not however it can still be dangerous to go beyond this
229 // number. Some drivers use a 16Bit integer to count rows and you can run into overflow
230 // errors if you use one of them. Once such issue occurred with SAP anywhere.
231 const DEFAULT_MAX_ROWS_PER_BATCH: usize = u16::MAX as usize;
232 const DEFAULT_MAX_BYTES_PER_BATCH: usize = 512 * 1024 * 1024;
233
234 OdbcReaderBuilder {
235 max_num_rows_per_batch: DEFAULT_MAX_ROWS_PER_BATCH,
236 max_bytes_per_batch: DEFAULT_MAX_BYTES_PER_BATCH,
237 schema: None,
238 max_text_size: None,
239 max_binary_size: None,
240 fallibale_allocations: false,
241 map_value_errors_to_null: false,
242 dbms_name: None,
243 trim_fixed_sized_character_strings: false,
244 text_encoding: TextEncoding::Auto,
245 }
246 }
247
248 /// Limits the maximum amount of rows which are fetched in a single roundtrip to the datasource.
249 /// Higher numbers lower the IO overhead and may speed up your runtime, but also require larger
250 /// preallocated buffers and use more memory. This value defaults to `65535` which is `u16` max.
251 /// Some ODBC drivers use a 16Bit integer to count rows so this can avoid overflows. The
252 /// improvements in saving IO overhead going above that number are estimated to be small. Your
253 /// milage may vary of course.
254 pub fn with_max_num_rows_per_batch(&mut self, max_num_rows_per_batch: usize) -> &mut Self {
255 self.max_num_rows_per_batch = max_num_rows_per_batch;
256 self
257 }
258
259 /// In addition to a row size limit you may specify an upper bound in bytes for allocating the
260 /// transit buffer. This is useful if you do not know the database schema, or your code has to
261 /// work with different ones, but you know the amount of memory in your machine. This limit is
262 /// applied in addition to [`OdbcReaderBuilder::with_max_num_rows_per_batch`]. Whichever of
263 /// these leads to a smaller buffer is used. This defaults to 512 MiB.
264 pub fn with_max_bytes_per_batch(&mut self, max_bytes_per_batch: usize) -> &mut Self {
265 self.max_bytes_per_batch = max_bytes_per_batch;
266 self
267 }
268
269 /// Describes the types of the Arrow Arrays in the record batches. It is also used to determine
270 /// CData type requested from the data source. If this is not explicitly set the type is infered
271 /// from the schema information provided by the ODBC driver. A reason for setting this
272 /// explicitly could be that you have superior knowledge about your data compared to the ODBC
273 /// driver. E.g. a type for an unsigned byte (`u8`) is not part of the ODBC standard. Therfore
274 /// the driver might at best be able to tell you that this is an (`i8`). If you want to still
275 /// have `u8`s in the resulting array you need to specify the schema manually. Also many drivers
276 /// struggle with reporting nullability correctly and just report every column as nullable.
277 /// Explicitly specifying a schema can also compensate for such shortcomings if it turns out to
278 /// be relevant.
279 pub fn with_schema(&mut self, schema: SchemaRef) -> &mut Self {
280 self.schema = Some(schema);
281 self
282 }
283
284 /// In order for fast bulk fetching to work, `arrow-odbc` needs to know the size of the largest
285 /// possible field in each column. It will do so itself automatically by considering the schema
286 /// information. However, trouble arises if the schema contains ounbounded variadic fields like
287 /// `VARCHAR(MAX)` which can hold really large values. These have a very high upper element
288 /// size, if any. In order to work with such schemas we need a limit, of what the an upper
289 /// bound of the actual values in the column is, as opposed to the what the largest value is the
290 /// column could theoretically store. There is no need for this to be very precise, but just
291 /// knowing that a value would never exceed 4KiB rather than 2GiB is enough to allow for
292 /// tremendous efficiency gains. The size of the text is specified in UTF-8 encoded bytes if
293 /// using a narrow encoding (typically all non-windows systems) and in UTF-16 encoded pairs of
294 /// bytes on systems using a wide encoding (typically windows). This means about the size in
295 /// letters, yet if you are using a lot of emojis or other special characters this number might
296 /// need to be larger.
297 pub fn with_max_text_size(&mut self, max_text_size: usize) -> &mut Self {
298 self.max_text_size = Some(max_text_size);
299 self
300 }
301
302 /// An upper limit for the size of buffers bound to variadic binary columns of the data source.
303 /// This limit does not (directly) apply to the size of the created arrow buffers, but rather
304 /// applies to the buffers used for the data in transit. Use this option if you have e.g.
305 /// `VARBINARY(MAX)` fields in your database schema. In such a case without an upper limit, the
306 /// ODBC driver of your data source is asked for the maximum size of an element, and is likely
307 /// to answer with either `0` or a value which is way larger than any actual entry in the
308 /// column. If you can not adapt your database schema, this limit might be what you are looking
309 /// for. This is the maximum size in bytes of the binary column. If this method is not called no
310 /// upper limit is set and the maximum element size, reported by ODBC is used to determine
311 /// buffer sizes.
312 pub fn with_max_binary_size(&mut self, max_binary_size: usize) -> &mut Self {
313 self.max_binary_size = Some(max_binary_size);
314 self
315 }
316
317 /// Set to `true` in order to trigger an [`crate::ColumnFailure::TooLarge`] instead of a panic
318 /// in case the buffers can not be allocated due to their size. This might have a performance
319 /// cost for constructing the reader. `false` by default.
320 pub fn with_fallibale_allocations(&mut self, fallibale_allocations: bool) -> &mut Self {
321 self.fallibale_allocations = fallibale_allocations;
322 self
323 }
324
325 /// Set to `true` in order to map a value in the database which can not be successfully
326 /// converted into its target type to NULL, rather than emitting an external Arrow Error.
327 /// E.g. currently mapping errors can happen if a datetime value is not in the rang
328 /// representable by arrow. Default is `false`.
329 pub fn value_errors_as_null(&mut self, map_value_errors_to_null: bool) -> &mut Self {
330 self.map_value_errors_to_null = map_value_errors_to_null;
331 self
332 }
333
334 /// If set to `true` text in fixed sized character columns like e.g. CHAR are trimmed of
335 /// whitespaces before converted into Arrow UTF-8 arrays. Default is `false`.
336 pub fn trim_fixed_sized_characters(
337 &mut self,
338 fixed_sized_character_strings_are_trimmed: bool,
339 ) -> &mut Self {
340 self.trim_fixed_sized_character_strings = fixed_sized_character_strings_are_trimmed;
341 self
342 }
343
344 /// Controls the encoding used for transferring text data from the ODBC data source to the
345 /// application. The resulting Arrow arrays will still be UTF-8 encoded. You may want to use
346 /// this if you get garbage characters or invalid UTF-8 errors on non-windows systems to set the
347 /// encoding to [`TextEncoding::Utf16`]. On windows systems you may want to set this to
348 /// [`TextEncoding::Utf8`] to gain performance benefits, after you have verified that your
349 /// system locale is set to UTF-8. The default is [`TextEncoding::Auto`].
350 pub fn with_payload_text_encoding(&mut self, text_encoding: TextEncoding) -> &mut Self {
351 self.text_encoding = text_encoding;
352 self
353 }
354
355 /// If provided the name of the database management system (DBMS) is used to account for
356 /// database specific behavior when determining the arrow schema.
357 ///
358 /// To deterimne the name of the dbms you can call
359 /// [`odbc_api::Connection::database_management_system_name`].
360 pub fn with_dbms_name(&mut self, dbms_name: String) -> &mut Self {
361 self.dbms_name = Some(dbms_name);
362 self
363 }
364
365 /// No matter if the user explicitly specified a limit in row size, a memory limit, both or
366 /// neither. In order to construct a reader we need to decide on the buffer size in rows.
367 fn buffer_size_in_rows(&self, bytes_per_row: usize) -> Result<usize, Error> {
368 // If schema is empty, return before division by zero error.
369 if bytes_per_row == 0 {
370 return Ok(self.max_bytes_per_batch);
371 }
372 let rows_per_batch = self.max_bytes_per_batch / bytes_per_row;
373 if rows_per_batch == 0 {
374 Err(Error::OdbcBufferTooSmall {
375 max_bytes_per_batch: self.max_bytes_per_batch,
376 bytes_per_row,
377 })
378 } else {
379 Ok(min(self.max_num_rows_per_batch, rows_per_batch))
380 }
381 }
382
383 /// Constructs an [`OdbcReader`] which consumes the giver cursor. The cursor will also be used
384 /// to infer the Arrow schema if it has not been supplied explicitly.
385 ///
386 /// # Parameters
387 ///
388 /// * `cursor`: ODBC cursor used to fetch batches from the data source. The constructor will
389 /// bind buffers to this cursor in order to perform bulk fetches from the source. This is
390 /// usually faster than fetching results row by row as it saves roundtrips to the database.
391 /// The type of these buffers will be inferred from the arrow schema. Not every arrow type is
392 /// supported though.
393 pub fn build<C>(&self, mut cursor: C) -> Result<OdbcReader<C>, Error>
394 where
395 C: Cursor,
396 {
397 let buffer_allocation_options = BufferAllocationOptions {
398 max_text_size: self.max_text_size,
399 max_binary_size: self.max_binary_size,
400 fallibale_allocations: self.fallibale_allocations,
401 };
402 let converter = ToRecordBatch::new(
403 &mut cursor,
404 self.schema.clone(),
405 buffer_allocation_options,
406 self.map_value_errors_to_null,
407 self.dbms_name.as_deref(),
408 self.trim_fixed_sized_character_strings,
409 self.text_encoding,
410 )?;
411 let bytes_per_row = converter.row_size_in_bytes();
412 let buffer_size_in_rows = self.buffer_size_in_rows(bytes_per_row)?;
413 let row_set_buffer =
414 converter.allocate_buffer(buffer_size_in_rows, self.fallibale_allocations)?;
415 let batch_stream = cursor.bind_buffer(row_set_buffer).unwrap();
416
417 Ok(OdbcReader {
418 converter,
419 batch_stream,
420 fallibale_allocations: self.fallibale_allocations,
421 })
422 }
423}
424
425pub fn odbc_to_arrow_error(odbc_error: odbc_api::Error) -> ArrowError {
426 ArrowError::from_external_error(Box::new(odbc_error))
427}