arrow_odbc/reader/
concurrent_odbc_reader.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
use arrow::{
    datatypes::SchemaRef,
    error::ArrowError,
    record_batch::{RecordBatch, RecordBatchReader},
};
use odbc_api::{buffers::ColumnarAnyBuffer, BlockCursor, ConcurrentBlockCursor, Cursor};

use crate::Error;

use super::{odbc_reader::odbc_to_arrow_error, to_record_batch::ToRecordBatch};

/// Arrow ODBC reader. Implements the [`arrow::record_batch::RecordBatchReader`] trait so it can be
/// used to fill Arrow arrays from an ODBC data source. Similar to [`crate::OdbcReader`], yet
/// [`ConcurrentOdbcReader`] fetches ODBC batches in a second transit buffer eagerly from the
/// database in a dedicated system thread. This allows the allocation of the Arrow arrays and your
/// application logic to run on the main thread, while fetching the batches from the source happens
/// concurrently. You need twice the memory for the transit buffer for this strategy, since one is
/// may be in use by the main thread in order to copy values into arrow arrays, while the other is
/// used to write values from the database.
///
/// # Example
///
/// ```no_run
/// use arrow_odbc::{odbc_api::{Environment, ConnectionOptions}, OdbcReaderBuilder};
/// use std::sync::OnceLock;
///
/// // In order to fetch in a dedicated system thread we need a cursor with static lifetime,
/// // this implies a static ODBC environment.
/// static ENV: OnceLock<Environment> = OnceLock::new();
///
/// const CONNECTION_STRING: &str = "\
///     Driver={ODBC Driver 17 for SQL Server};\
///     Server=localhost;\
///     UID=SA;\
///     PWD=My@Test@Password1;\
/// ";
///
/// fn main() -> Result<(), anyhow::Error> {
///
///     let odbc_environment = ENV.get_or_init(|| {Environment::new().unwrap() });
///     
///     // Connect with database.
///     let connection = odbc_environment.connect_with_connection_string(
///         CONNECTION_STRING,
///         ConnectionOptions::default()
///     )?;
///
///     // This SQL statement does not require any arguments.
///     let parameters = ();
///
///     // Execute query and create result set
///     let cursor = connection
///         // Using `into_cursor` instead of `execute` takes ownership of the connection and
///         // allows for a cursor with static lifetime.
///         .into_cursor("SELECT * FROM MyTable", parameters)
///         .map_err(|e| e.error)?
///         .expect("SELECT statement must produce a cursor");
///
///     // Construct ODBC reader and make it concurrent
///     let arrow_record_batches = OdbcReaderBuilder::new().build(cursor)?.into_concurrent()?;
///
///     for batch in arrow_record_batches {
///         // ... process batch ...
///     }
///     Ok(())
/// }
/// ```
pub struct ConcurrentOdbcReader<C: Cursor> {
    /// We fill the buffers using ODBC concurrently. The buffer currently being filled is bound to
    /// the Cursor. This is the buffer which is unbound and read by the application to fill the
    /// arrow arrays. After being read we will reuse the buffer and bind it to the cursor in order
    /// to safe allocations.
    buffer: ColumnarAnyBuffer,
    /// Converts the content of ODBC buffers into Arrow record batches
    converter: ToRecordBatch,
    /// Fetches values from the ODBC datasource using columnar batches. Values are streamed batch
    /// by batch in order to avoid reallocation of the buffers used for tranistion.
    batch_stream: ConcurrentBlockCursor<C, ColumnarAnyBuffer>,
}

impl<C: Cursor + Send + 'static> ConcurrentOdbcReader<C> {
    /// The schema implied by `block_cursor` and `converter` must match. Invariant is hard to check
    /// in type system, keep this constructor private to this crate. Users should use
    /// [`crate::OdbcReader::into_concurrent`] instead.
    pub(crate) fn from_block_cursor(
        block_cursor: BlockCursor<C, ColumnarAnyBuffer>,
        converter: ToRecordBatch,
        fallibale_allocations: bool,
    ) -> Result<Self, Error> {
        let max_batch_size = block_cursor.row_array_size();
        let batch_stream = ConcurrentBlockCursor::from_block_cursor(block_cursor);
        // Note that we delay buffer allocation until after the fetch thread has started and we
        // start fetching the first row group concurrently as early, not waiting for the buffer
        // allocation to go through.
        let buffer = converter.allocate_buffer(max_batch_size, fallibale_allocations)?;

        Ok(Self {
            buffer,
            converter,
            batch_stream,
        })
    }

    /// Destroy the ODBC arrow reader and yield the underlyinng cursor object.
    ///
    /// One application of this is to process more than one result set in case you executed a stored
    /// procedure.
    ///
    /// Due to the concurrent fetching of row groups you can not know how many row groups have been
    /// extracted once the cursor is returned. Unless that is that the entire cursor has been
    /// consumed i.e. [`Self::next`] returned `None`.
    pub fn into_cursor(self) -> Result<C, odbc_api::Error> {
        self.batch_stream.into_cursor()
    }
}

impl<C> Iterator for ConcurrentOdbcReader<C>
where
    C: Cursor,
{
    type Item = Result<RecordBatch, ArrowError>;

    fn next(&mut self) -> Option<Self::Item> {
        match self.batch_stream.fetch_into(&mut self.buffer) {
            // We successfully fetched a batch from the database. Try to copy it into a record batch
            // and forward errors if any.
            Ok(true) => {
                let result_record_batch = self
                    .converter
                    .buffer_to_record_batch(&self.buffer)
                    .map_err(|mapping_error| ArrowError::ExternalError(Box::new(mapping_error)));
                Some(result_record_batch)
            }
            // We ran out of batches in the result set. End the iterator.
            Ok(false) => None,
            // We had an error fetching the next batch from the database, let's report it as an
            // external error.
            Err(odbc_error) => Some(Err(odbc_to_arrow_error(odbc_error))),
        }
    }
}

impl<C> RecordBatchReader for ConcurrentOdbcReader<C>
where
    C: Cursor,
{
    fn schema(&self) -> SchemaRef {
        self.converter.schema().clone()
    }
}