arrow_odbc/reader/
concurrent_odbc_reader.rs

1use arrow::{
2    datatypes::SchemaRef,
3    error::ArrowError,
4    record_batch::{RecordBatch, RecordBatchReader},
5};
6use odbc_api::{BlockCursor, ConcurrentBlockCursor, Cursor, buffers::ColumnarAnyBuffer};
7
8use crate::Error;
9
10use super::{odbc_reader::odbc_to_arrow_error, to_record_batch::ToRecordBatch};
11
12/// Arrow ODBC reader. Implements the [`arrow::record_batch::RecordBatchReader`] trait so it can be
13/// used to fill Arrow arrays from an ODBC data source. Similar to [`crate::OdbcReader`], yet
14/// [`ConcurrentOdbcReader`] fetches ODBC batches in a second transit buffer eagerly from the
15/// database in a dedicated system thread. This allows the allocation of the Arrow arrays and your
16/// application logic to run on the main thread, while fetching the batches from the source happens
17/// concurrently. You need twice the memory for the transit buffer for this strategy, since one is
18/// may be in use by the main thread in order to copy values into arrow arrays, while the other is
19/// used to write values from the database.
20///
21/// # Example
22///
23/// ```no_run
24/// use arrow_odbc::{odbc_api::{Environment, ConnectionOptions}, OdbcReaderBuilder};
25/// use std::sync::OnceLock;
26///
27/// // In order to fetch in a dedicated system thread we need a cursor with static lifetime,
28/// // this implies a static ODBC environment.
29/// static ENV: OnceLock<Environment> = OnceLock::new();
30///
31/// const CONNECTION_STRING: &str = "\
32///     Driver={ODBC Driver 18 for SQL Server};\
33///     Server=localhost;\
34///     UID=SA;\
35///     PWD=My@Test@Password1;\
36/// ";
37///
38/// fn main() -> Result<(), anyhow::Error> {
39///
40///     let odbc_environment = ENV.get_or_init(|| {Environment::new().unwrap() });
41///     
42///     // Connect with database.
43///     let connection = odbc_environment.connect_with_connection_string(
44///         CONNECTION_STRING,
45///         ConnectionOptions::default()
46///     )?;
47///
48///     // This SQL statement does not require any arguments.
49///     let parameters = ();
50///
51///     // Do not apply any timout.
52///     let timeout_sec = None;
53///
54///     // Execute query and create result set
55///     let cursor = connection
56///         // Using `into_cursor` instead of `execute` takes ownership of the connection and
57///         // allows for a cursor with static lifetime.
58///         .into_cursor("SELECT * FROM MyTable", parameters, timeout_sec)
59///         .map_err(|e| e.error)?
60///         .expect("SELECT statement must produce a cursor");
61///
62///     // Construct ODBC reader and make it concurrent
63///     let arrow_record_batches = OdbcReaderBuilder::new().build(cursor)?.into_concurrent()?;
64///
65///     for batch in arrow_record_batches {
66///         // ... process batch ...
67///     }
68///     Ok(())
69/// }
70/// ```
71pub struct ConcurrentOdbcReader<C: Cursor> {
72    /// We fill the buffers using ODBC concurrently. The buffer currently being filled is bound to
73    /// the Cursor. This is the buffer which is unbound and read by the application to fill the
74    /// arrow arrays. After being read we will reuse the buffer and bind it to the cursor in order
75    /// to safe allocations.
76    buffer: ColumnarAnyBuffer,
77    /// Converts the content of ODBC buffers into Arrow record batches
78    converter: ToRecordBatch,
79    /// Fetches values from the ODBC datasource using columnar batches. Values are streamed batch
80    /// by batch in order to avoid reallocation of the buffers used for tranistion.
81    batch_stream: ConcurrentBlockCursor<C, ColumnarAnyBuffer>,
82}
83
84impl<C: Cursor + Send + 'static> ConcurrentOdbcReader<C> {
85    /// The schema implied by `block_cursor` and `converter` must match. Invariant is hard to check
86    /// in type system, keep this constructor private to this crate. Users should use
87    /// [`crate::OdbcReader::into_concurrent`] instead.
88    pub(crate) fn from_block_cursor(
89        block_cursor: BlockCursor<C, ColumnarAnyBuffer>,
90        converter: ToRecordBatch,
91        fallibale_allocations: bool,
92    ) -> Result<Self, Error> {
93        let max_batch_size = block_cursor.row_array_size();
94        let batch_stream = ConcurrentBlockCursor::from_block_cursor(block_cursor);
95        // Note that we delay buffer allocation until after the fetch thread has started and we
96        // start fetching the first row group concurrently as early, not waiting for the buffer
97        // allocation to go through.
98        let buffer = converter.allocate_buffer(max_batch_size, fallibale_allocations)?;
99
100        Ok(Self {
101            buffer,
102            converter,
103            batch_stream,
104        })
105    }
106
107    /// Destroy the ODBC arrow reader and yield the underlyinng cursor object.
108    ///
109    /// One application of this is to process more than one result set in case you executed a stored
110    /// procedure.
111    ///
112    /// Due to the concurrent fetching of row groups you can not know how many row groups have been
113    /// extracted once the cursor is returned. Unless that is that the entire cursor has been
114    /// consumed i.e. [`Self::next`] returned `None`.
115    pub fn into_cursor(self) -> Result<C, odbc_api::Error> {
116        self.batch_stream.into_cursor()
117    }
118}
119
120impl<C> Iterator for ConcurrentOdbcReader<C>
121where
122    C: Cursor,
123{
124    type Item = Result<RecordBatch, ArrowError>;
125
126    fn next(&mut self) -> Option<Self::Item> {
127        match self.batch_stream.fetch_into(&mut self.buffer) {
128            // We successfully fetched a batch from the database. Try to copy it into a record batch
129            // and forward errors if any.
130            Ok(true) => {
131                let result_record_batch = self
132                    .converter
133                    .buffer_to_record_batch(&self.buffer)
134                    .map_err(|mapping_error| ArrowError::ExternalError(Box::new(mapping_error)));
135                Some(result_record_batch)
136            }
137            // We ran out of batches in the result set. End the iterator.
138            Ok(false) => None,
139            // We had an error fetching the next batch from the database, let's report it as an
140            // external error.
141            Err(odbc_error) => Some(Err(odbc_to_arrow_error(odbc_error))),
142        }
143    }
144}
145
146impl<C> RecordBatchReader for ConcurrentOdbcReader<C>
147where
148    C: Cursor,
149{
150    fn schema(&self) -> SchemaRef {
151        self.converter.schema().clone()
152    }
153}