arrow_odbc/reader/odbc_reader.rs
1use std::cmp::min;
2
3use arrow::{
4 datatypes::SchemaRef,
5 error::ArrowError,
6 record_batch::{RecordBatch, RecordBatchReader},
7};
8use odbc_api::{BlockCursor, Cursor, buffers::ColumnarAnyBuffer};
9
10use crate::{BufferAllocationOptions, ConcurrentOdbcReader, Error};
11
12use super::{TextEncoding, to_record_batch::ToRecordBatch};
13
14/// Arrow ODBC reader. Implements the [`arrow::record_batch::RecordBatchReader`] trait so it can be
15/// used to fill Arrow arrays from an ODBC data source.
16///
17/// This reader is generic over the cursor type so it can be used in cases there the cursor only
18/// borrows a statement handle (most likely the case then using prepared queries), or owned
19/// statement handles (recommened then using one shot queries, to have an easier life with the
20/// borrow checker).
21///
22/// # Example
23///
24/// ```no_run
25/// use arrow_odbc::{odbc_api::{Environment, ConnectionOptions}, OdbcReaderBuilder};
26///
27/// const CONNECTION_STRING: &str = "\
28/// Driver={ODBC Driver 18 for SQL Server};\
29/// Server=localhost;\
30/// UID=SA;\
31/// PWD=My@Test@Password1;\
32/// ";
33///
34/// fn main() -> Result<(), anyhow::Error> {
35///
36/// let odbc_environment = Environment::new()?;
37///
38/// // Connect with database.
39/// let connection = odbc_environment.connect_with_connection_string(
40/// CONNECTION_STRING,
41/// ConnectionOptions::default()
42/// )?;
43///
44/// // This SQL statement does not require any arguments.
45/// let parameters = ();
46///
47/// // Do not apply any timeout.
48/// let timeout_sec = None;
49///
50/// // Execute query and create result set
51/// let cursor = connection
52/// .execute("SELECT * FROM MyTable", parameters, timeout_sec)?
53/// .expect("SELECT statement must produce a cursor");
54///
55/// // Read result set as arrow batches. Infer Arrow types automatically using the meta
56/// // information of `cursor`.
57/// let arrow_record_batches = OdbcReaderBuilder::new()
58/// .build(cursor)?;
59///
60/// for batch in arrow_record_batches {
61/// // ... process batch ...
62/// }
63/// Ok(())
64/// }
65/// ```
66pub struct OdbcReader<C: Cursor> {
67 /// Converts the content of ODBC buffers into Arrow record batches
68 converter: ToRecordBatch,
69 /// Fetches values from the ODBC datasource using columnar batches. Values are streamed batch
70 /// by batch in order to avoid reallocation of the buffers used for tranistion.
71 batch_stream: BlockCursor<C, ColumnarAnyBuffer>,
72 /// We remember if the user decided to use fallibale allocations or not in case we need to
73 /// allocate another buffer due to a state transition towards [`ConcurrentOdbcReader`].
74 fallibale_allocations: bool,
75}
76
77impl<C: Cursor> OdbcReader<C> {
78 /// Consume this instance to create a similar ODBC reader which fetches batches asynchronously.
79 ///
80 /// Steals all resources from this [`OdbcReader`] instance, and allocates another buffer for
81 /// transiting data from the ODBC data source to the application. This way one buffer can be
82 /// written to by a dedicated system thread, while the other is read by the application. Use
83 /// this if you want to trade memory for speed.
84 ///
85 /// # Example
86 ///
87 /// ```no_run
88 /// use arrow_odbc::{odbc_api::{Environment, ConnectionOptions}, OdbcReaderBuilder};
89 /// use std::sync::OnceLock;
90 ///
91 /// // In order to fetch in a dedicated system thread we need a cursor with static lifetime,
92 /// // this implies a static ODBC environment.
93 /// static ENV: OnceLock<Environment> = OnceLock::new();
94 ///
95 /// const CONNECTION_STRING: &str = "\
96 /// Driver={ODBC Driver 18 for SQL Server};\
97 /// Server=localhost;\
98 /// UID=SA;\
99 /// PWD=My@Test@Password1;\
100 /// ";
101 ///
102 /// fn main() -> Result<(), anyhow::Error> {
103 ///
104 /// let odbc_environment = ENV.get_or_init(|| {Environment::new().unwrap() });
105 ///
106 /// // Connect with database.
107 /// let connection = odbc_environment.connect_with_connection_string(
108 /// CONNECTION_STRING,
109 /// ConnectionOptions::default()
110 /// )?;
111 ///
112 /// // This SQL statement does not require any arguments.
113 /// let parameters = ();
114 ///
115 /// // Do not apply any timeout.
116 /// let timeout_sec = None;
117 ///
118 /// // Execute query and create result set
119 /// let cursor = connection
120 /// // Using `into_cursor` instead of `execute` takes ownership of the connection and
121 /// // allows for a cursor with static lifetime.
122 /// .into_cursor("SELECT * FROM MyTable", parameters, timeout_sec)
123 /// .map_err(|e|e.error)?
124 /// .expect("SELECT statement must produce a cursor");
125 ///
126 /// // Construct ODBC reader ...
127 /// let arrow_record_batches = OdbcReaderBuilder::new()
128 /// .build(cursor)?
129 /// // ... and make it concurrent
130 /// .into_concurrent()?;
131 ///
132 /// for batch in arrow_record_batches {
133 /// // ... process batch ...
134 /// }
135 /// Ok(())
136 /// }
137 /// ```
138 pub fn into_concurrent(self) -> Result<ConcurrentOdbcReader<C>, Error>
139 where
140 C: Send + 'static,
141 {
142 ConcurrentOdbcReader::from_block_cursor(
143 self.batch_stream,
144 self.converter,
145 self.fallibale_allocations,
146 )
147 }
148
149 /// Destroy the ODBC arrow reader and yield the underlyinng cursor object.
150 ///
151 /// One application of this is to process more than one result set in case you executed a stored
152 /// procedure.
153 pub fn into_cursor(self) -> Result<C, odbc_api::Error> {
154 let (cursor, _buffer) = self.batch_stream.unbind()?;
155 Ok(cursor)
156 }
157
158 /// Size of the internal preallocated buffer bound to the cursor and filled by your ODBC driver
159 /// in rows. Each record batch will at most have this many rows. Only the last one may have
160 /// less.
161 pub fn max_rows_per_batch(&self) -> usize {
162 self.batch_stream.row_array_size()
163 }
164}
165
166impl<C> Iterator for OdbcReader<C>
167where
168 C: Cursor,
169{
170 type Item = Result<RecordBatch, ArrowError>;
171
172 fn next(&mut self) -> Option<Self::Item> {
173 match self.batch_stream.fetch_with_truncation_check(true) {
174 // We successfully fetched a batch from the database. Try to copy it into a record batch
175 // and forward errors if any.
176 Ok(Some(batch)) => {
177 let result_record_batch = self
178 .converter
179 .buffer_to_record_batch(batch)
180 .map_err(|mapping_error| ArrowError::ExternalError(Box::new(mapping_error)));
181 Some(result_record_batch)
182 }
183 // We ran out of batches in the result set. End the iterator.
184 Ok(None) => None,
185 // We had an error fetching the next batch from the database, let's report it as an
186 // external error.
187 Err(odbc_error) => Some(Err(odbc_to_arrow_error(odbc_error))),
188 }
189 }
190}
191
192impl<C> RecordBatchReader for OdbcReader<C>
193where
194 C: Cursor,
195{
196 fn schema(&self) -> SchemaRef {
197 self.converter.schema().clone()
198 }
199}
200
201/// Creates instances of [`OdbcReader`] based on [`odbc_api::Cursor`].
202///
203/// Using a builder pattern instead of passing structs with all required arguments to the
204/// constructors of [`OdbcReader`] allows `arrow_odbc` to introduce new paramters to fine tune the
205/// creation and behavior of the readers without breaking the code of downstream applications.
206#[derive(Default, Clone)]
207pub struct OdbcReaderBuilder {
208 /// `Some` implies the user has set this explicitly using
209 /// [`OdbcReaderBuilder::with_max_num_rows_per_batch`]. `None` implies that we have to choose
210 /// for the user.
211 max_num_rows_per_batch: usize,
212 max_bytes_per_batch: usize,
213 schema: Option<SchemaRef>,
214 max_text_size: Option<usize>,
215 max_binary_size: Option<usize>,
216 map_value_errors_to_null: bool,
217 fallibale_allocations: bool,
218 trim_fixed_sized_character_strings: bool,
219 text_encoding: TextEncoding,
220}
221
222impl OdbcReaderBuilder {
223 pub fn new() -> Self {
224 // In the abscence of an explicit row limit set by the user we choose u16 MAX (65535). This
225 // is a reasonable high value to allow for siginificantly reducing IO overhead as opposed to
226 // row by row fetching already. Likely for many database schemas a memory limitation will
227 // kick in before this limit. If not however it can still be dangerous to go beyond this
228 // number. Some drivers use a 16Bit integer to count rows and you can run into overflow
229 // errors if you use one of them. Once such issue occurred with SAP anywhere.
230 const DEFAULT_MAX_ROWS_PER_BATCH: usize = u16::MAX as usize;
231 const DEFAULT_MAX_BYTES_PER_BATCH: usize = 512 * 1024 * 1024;
232
233 OdbcReaderBuilder {
234 max_num_rows_per_batch: DEFAULT_MAX_ROWS_PER_BATCH,
235 max_bytes_per_batch: DEFAULT_MAX_BYTES_PER_BATCH,
236 schema: None,
237 max_text_size: None,
238 max_binary_size: None,
239 fallibale_allocations: false,
240 map_value_errors_to_null: false,
241 trim_fixed_sized_character_strings: false,
242 text_encoding: TextEncoding::Auto,
243 }
244 }
245
246 /// Limits the maximum amount of rows which are fetched in a single roundtrip to the datasource.
247 /// Higher numbers lower the IO overhead and may speed up your runtime, but also require larger
248 /// preallocated buffers and use more memory. This value defaults to `65535` which is `u16` max.
249 /// Some ODBC drivers use a 16Bit integer to count rows so this can avoid overflows. The
250 /// improvements in saving IO overhead going above that number are estimated to be small. Your
251 /// milage may vary of course.
252 pub fn with_max_num_rows_per_batch(&mut self, max_num_rows_per_batch: usize) -> &mut Self {
253 self.max_num_rows_per_batch = max_num_rows_per_batch;
254 self
255 }
256
257 /// In addition to a row size limit you may specify an upper bound in bytes for allocating the
258 /// transit buffer. This is useful if you do not know the database schema, or your code has to
259 /// work with different ones, but you know the amount of memory in your machine. This limit is
260 /// applied in addition to [`OdbcReaderBuilder::with_max_num_rows_per_batch`]. Whichever of
261 /// these leads to a smaller buffer is used. This defaults to 512 MiB.
262 pub fn with_max_bytes_per_batch(&mut self, max_bytes_per_batch: usize) -> &mut Self {
263 self.max_bytes_per_batch = max_bytes_per_batch;
264 self
265 }
266
267 /// Describes the types of the Arrow Arrays in the record batches. It is also used to determine
268 /// CData type requested from the data source. If this is not explicitly set the type is infered
269 /// from the schema information provided by the ODBC driver. A reason for setting this
270 /// explicitly could be that you have superior knowledge about your data compared to the ODBC
271 /// driver. E.g. a type for an unsigned byte (`u8`) is not part of the ODBC standard. Therfore
272 /// the driver might at best be able to tell you that this is an (`i8`). If you want to still
273 /// have `u8`s in the resulting array you need to specify the schema manually. Also many drivers
274 /// struggle with reporting nullability correctly and just report every column as nullable.
275 /// Explicitly specifying a schema can also compensate for such shortcomings if it turns out to
276 /// be relevant.
277 pub fn with_schema(&mut self, schema: SchemaRef) -> &mut Self {
278 self.schema = Some(schema);
279 self
280 }
281
282 /// In order for fast bulk fetching to work, `arrow-odbc` needs to know the size of the largest
283 /// possible field in each column. It will do so itself automatically by considering the schema
284 /// information. However, trouble arises if the schema contains ounbounded variadic fields like
285 /// `VARCHAR(MAX)` which can hold really large values. These have a very high upper element
286 /// size, if any. In order to work with such schemas we need a limit, of what the an upper
287 /// bound of the actual values in the column is, as opposed to the what the largest value is the
288 /// column could theoretically store. There is no need for this to be very precise, but just
289 /// knowing that a value would never exceed 4KiB rather than 2GiB is enough to allow for
290 /// tremendous efficiency gains. The size of the text is specified in UTF-8 encoded bytes if
291 /// using a narrow encoding (typically all non-windows systems) and in UTF-16 encoded pairs of
292 /// bytes on systems using a wide encoding (typically windows). This means about the size in
293 /// letters, yet if you are using a lot of emojis or other special characters this number might
294 /// need to be larger.
295 pub fn with_max_text_size(&mut self, max_text_size: usize) -> &mut Self {
296 self.max_text_size = Some(max_text_size);
297 self
298 }
299
300 /// An upper limit for the size of buffers bound to variadic binary columns of the data source.
301 /// This limit does not (directly) apply to the size of the created arrow buffers, but rather
302 /// applies to the buffers used for the data in transit. Use this option if you have e.g.
303 /// `VARBINARY(MAX)` fields in your database schema. In such a case without an upper limit, the
304 /// ODBC driver of your data source is asked for the maximum size of an element, and is likely
305 /// to answer with either `0` or a value which is way larger than any actual entry in the
306 /// column. If you can not adapt your database schema, this limit might be what you are looking
307 /// for. This is the maximum size in bytes of the binary column. If this method is not called no
308 /// upper limit is set and the maximum element size, reported by ODBC is used to determine
309 /// buffer sizes.
310 pub fn with_max_binary_size(&mut self, max_binary_size: usize) -> &mut Self {
311 self.max_binary_size = Some(max_binary_size);
312 self
313 }
314
315 /// Set to `true` in order to trigger an [`crate::ColumnFailure::TooLarge`] instead of a panic
316 /// in case the buffers can not be allocated due to their size. This might have a performance
317 /// cost for constructing the reader. `false` by default.
318 pub fn with_fallibale_allocations(&mut self, fallibale_allocations: bool) -> &mut Self {
319 self.fallibale_allocations = fallibale_allocations;
320 self
321 }
322
323 /// Set to `true` in order to map a value in the database which can not be successfully
324 /// converted into its target type to NULL, rather than emitting an external Arrow Error.
325 /// E.g. currently mapping errors can happen if a datetime value is not in the rang
326 /// representable by arrow. Default is `false`.
327 pub fn value_errors_as_null(&mut self, map_value_errors_to_null: bool) -> &mut Self {
328 self.map_value_errors_to_null = map_value_errors_to_null;
329 self
330 }
331
332 /// If set to `true` text in fixed sized character columns like e.g. CHAR are trimmed of
333 /// whitespaces before converted into Arrow UTF-8 arrays. Default is `false`.
334 pub fn trim_fixed_sized_characters(
335 &mut self,
336 fixed_sized_character_strings_are_trimmed: bool,
337 ) -> &mut Self {
338 self.trim_fixed_sized_character_strings = fixed_sized_character_strings_are_trimmed;
339 self
340 }
341
342 /// Controls the encoding used for transferring text data from the ODBC data source to the
343 /// application. The resulting Arrow arrays will still be UTF-8 encoded. You may want to use
344 /// this if you get garbage characters or invalid UTF-8 errors on non-windows systems to set the
345 /// encoding to [`TextEncoding::Wide`]. On windows systems you may want to set this to
346 /// [`TextEncoding::Narrow`] to gain performance benefits, after you have verified that your
347 /// system locale is set to UTF-8. The default is [`TextEncoding::Auto`].
348 pub fn with_payload_text_encoding(&mut self, text_encoding: TextEncoding) -> &mut Self {
349 self.text_encoding = text_encoding;
350 self
351 }
352
353 /// No matter if the user explicitly specified a limit in row size, a memory limit, both or
354 /// neither. In order to construct a reader we need to decide on the buffer size in rows.
355 fn buffer_size_in_rows(&self, bytes_per_row: usize) -> Result<usize, Error> {
356 // If schema is empty, return before division by zero error.
357 if bytes_per_row == 0 {
358 return Ok(self.max_bytes_per_batch);
359 }
360 let rows_per_batch = self.max_bytes_per_batch / bytes_per_row;
361 if rows_per_batch == 0 {
362 Err(Error::OdbcBufferTooSmall {
363 max_bytes_per_batch: self.max_bytes_per_batch,
364 bytes_per_row,
365 })
366 } else {
367 Ok(min(self.max_num_rows_per_batch, rows_per_batch))
368 }
369 }
370
371 /// Constructs an [`OdbcReader`] which consumes the giver cursor. The cursor will also be used
372 /// to infer the Arrow schema if it has not been supplied explicitly.
373 ///
374 /// # Parameters
375 ///
376 /// * `cursor`: ODBC cursor used to fetch batches from the data source. The constructor will
377 /// bind buffers to this cursor in order to perform bulk fetches from the source. This is
378 /// usually faster than fetching results row by row as it saves roundtrips to the database.
379 /// The type of these buffers will be inferred from the arrow schema. Not every arrow type is
380 /// supported though.
381 pub fn build<C>(&self, mut cursor: C) -> Result<OdbcReader<C>, Error>
382 where
383 C: Cursor,
384 {
385 let buffer_allocation_options = BufferAllocationOptions {
386 max_text_size: self.max_text_size,
387 max_binary_size: self.max_binary_size,
388 fallibale_allocations: self.fallibale_allocations,
389 };
390 let converter = ToRecordBatch::new(
391 &mut cursor,
392 self.schema.clone(),
393 buffer_allocation_options,
394 self.map_value_errors_to_null,
395 self.trim_fixed_sized_character_strings,
396 self.text_encoding,
397 )?;
398 let bytes_per_row = converter.row_size_in_bytes();
399 let buffer_size_in_rows = self.buffer_size_in_rows(bytes_per_row)?;
400 let row_set_buffer =
401 converter.allocate_buffer(buffer_size_in_rows, self.fallibale_allocations)?;
402 let batch_stream = cursor.bind_buffer(row_set_buffer).unwrap();
403
404 Ok(OdbcReader {
405 converter,
406 batch_stream,
407 fallibale_allocations: self.fallibale_allocations,
408 })
409 }
410}
411
412pub fn odbc_to_arrow_error(odbc_error: odbc_api::Error) -> ArrowError {
413 ArrowError::from_external_error(Box::new(odbc_error))
414}