arrow_odbc/reader/concurrent_odbc_reader.rs
1use arrow::{
2 datatypes::SchemaRef,
3 error::ArrowError,
4 record_batch::{RecordBatch, RecordBatchReader},
5};
6use odbc_api::{BlockCursor, ConcurrentBlockCursor, Cursor, buffers::ColumnarAnyBuffer};
7
8use crate::Error;
9
10use super::{odbc_reader::odbc_to_arrow_error, to_record_batch::ToRecordBatch};
11
12/// Arrow ODBC reader. Implements the [`arrow::record_batch::RecordBatchReader`] trait so it can be
13/// used to fill Arrow arrays from an ODBC data source. Similar to [`crate::OdbcReader`], yet
14/// [`ConcurrentOdbcReader`] fetches ODBC batches in a second transit buffer eagerly from the
15/// database in a dedicated system thread. This allows the allocation of the Arrow arrays and your
16/// application logic to run on the main thread, while fetching the batches from the source happens
17/// concurrently. You need twice the memory for the transit buffer for this strategy, since one is
18/// may be in use by the main thread in order to copy values into arrow arrays, while the other is
19/// used to write values from the database.
20///
21/// # Example
22///
23/// ```no_run
24/// use arrow_odbc::{odbc_api::{Environment, ConnectionOptions}, OdbcReaderBuilder};
25/// use std::sync::OnceLock;
26///
27/// // In order to fetch in a dedicated system thread we need a cursor with static lifetime,
28/// // this implies a static ODBC environment.
29/// static ENV: OnceLock<Environment> = OnceLock::new();
30///
31/// const CONNECTION_STRING: &str = "\
32/// Driver={ODBC Driver 18 for SQL Server};\
33/// Server=localhost;\
34/// UID=SA;\
35/// PWD=My@Test@Password1;\
36/// ";
37///
38/// fn main() -> Result<(), anyhow::Error> {
39///
40/// let odbc_environment = ENV.get_or_init(|| {Environment::new().unwrap() });
41///
42/// // Connect with database.
43/// let connection = odbc_environment.connect_with_connection_string(
44/// CONNECTION_STRING,
45/// ConnectionOptions::default()
46/// )?;
47///
48/// // This SQL statement does not require any arguments.
49/// let parameters = ();
50///
51/// // Do not apply any timout.
52/// let timeout_sec = None;
53///
54/// // Execute query and create result set
55/// let cursor = connection
56/// // Using `into_cursor` instead of `execute` takes ownership of the connection and
57/// // allows for a cursor with static lifetime.
58/// .into_cursor("SELECT * FROM MyTable", parameters, timeout_sec)
59/// .map_err(|e| e.error)?
60/// .expect("SELECT statement must produce a cursor");
61///
62/// // Construct ODBC reader and make it concurrent
63/// let arrow_record_batches = OdbcReaderBuilder::new().build(cursor)?.into_concurrent()?;
64///
65/// for batch in arrow_record_batches {
66/// // ... process batch ...
67/// }
68/// Ok(())
69/// }
70/// ```
71pub struct ConcurrentOdbcReader<C: Cursor> {
72 /// We fill the buffers using ODBC concurrently. The buffer currently being filled is bound to
73 /// the Cursor. This is the buffer which is unbound and read by the application to fill the
74 /// arrow arrays. After being read we will reuse the buffer and bind it to the cursor in order
75 /// to safe allocations.
76 buffer: ColumnarAnyBuffer,
77 /// Converts the content of ODBC buffers into Arrow record batches
78 converter: ToRecordBatch,
79 /// Fetches values from the ODBC datasource using columnar batches. Values are streamed batch
80 /// by batch in order to avoid reallocation of the buffers used for tranistion.
81 batch_stream: ConcurrentBlockCursor<C, ColumnarAnyBuffer>,
82}
83
84impl<C: Cursor + Send + 'static> ConcurrentOdbcReader<C> {
85 /// The schema implied by `block_cursor` and `converter` must match. Invariant is hard to check
86 /// in type system, keep this constructor private to this crate. Users should use
87 /// [`crate::OdbcReader::into_concurrent`] instead.
88 pub(crate) fn from_block_cursor(
89 block_cursor: BlockCursor<C, ColumnarAnyBuffer>,
90 converter: ToRecordBatch,
91 fallibale_allocations: bool,
92 ) -> Result<Self, Error> {
93 let max_batch_size = block_cursor.row_array_size();
94 let batch_stream = ConcurrentBlockCursor::from_block_cursor(block_cursor);
95 // Note that we delay buffer allocation until after the fetch thread has started and we
96 // start fetching the first row group concurrently as early, not waiting for the buffer
97 // allocation to go through.
98 let buffer = converter.allocate_buffer(max_batch_size, fallibale_allocations)?;
99
100 Ok(Self {
101 buffer,
102 converter,
103 batch_stream,
104 })
105 }
106
107 /// Destroy the ODBC arrow reader and yield the underlyinng cursor object.
108 ///
109 /// One application of this is to process more than one result set in case you executed a stored
110 /// procedure.
111 ///
112 /// Due to the concurrent fetching of row groups you can not know how many row groups have been
113 /// extracted once the cursor is returned. Unless that is that the entire cursor has been
114 /// consumed i.e. [`Self::next`] returned `None`.
115 pub fn into_cursor(self) -> Result<C, odbc_api::Error> {
116 self.batch_stream.into_cursor()
117 }
118}
119
120impl<C> Iterator for ConcurrentOdbcReader<C>
121where
122 C: Cursor,
123{
124 type Item = Result<RecordBatch, ArrowError>;
125
126 fn next(&mut self) -> Option<Self::Item> {
127 match self.batch_stream.fetch_into(&mut self.buffer) {
128 // We successfully fetched a batch from the database. Try to copy it into a record batch
129 // and forward errors if any.
130 Ok(true) => {
131 let result_record_batch = self
132 .converter
133 .buffer_to_record_batch(&self.buffer)
134 .map_err(|mapping_error| ArrowError::ExternalError(Box::new(mapping_error)));
135 Some(result_record_batch)
136 }
137 // We ran out of batches in the result set. End the iterator.
138 Ok(false) => None,
139 // We had an error fetching the next batch from the database, let's report it as an
140 // external error.
141 Err(odbc_error) => Some(Err(odbc_to_arrow_error(odbc_error))),
142 }
143 }
144}
145
146impl<C> RecordBatchReader for ConcurrentOdbcReader<C>
147where
148 C: Cursor,
149{
150 fn schema(&self) -> SchemaRef {
151 self.converter.schema().clone()
152 }
153}