use arrow::{
datatypes::SchemaRef,
error::ArrowError,
record_batch::{RecordBatch, RecordBatchReader},
};
use odbc_api::{BlockCursor, ConcurrentBlockCursor, Cursor, buffers::ColumnarAnyBuffer};
use crate::Error;
use super::{odbc_reader::odbc_to_arrow_error, to_record_batch::ToRecordBatch};
pub struct ConcurrentOdbcReader<C: Cursor> {
buffer: ColumnarAnyBuffer,
converter: ToRecordBatch,
batch_stream: ConcurrentBlockCursor<C, ColumnarAnyBuffer>,
}
impl<C: Cursor + Send + 'static> ConcurrentOdbcReader<C> {
pub(crate) fn from_block_cursor(
block_cursor: BlockCursor<C, ColumnarAnyBuffer>,
converter: ToRecordBatch,
fallibale_allocations: bool,
) -> Result<Self, Error> {
let max_batch_size = block_cursor.row_array_size();
let batch_stream = ConcurrentBlockCursor::from_block_cursor(block_cursor);
let buffer = converter.allocate_buffer(max_batch_size, fallibale_allocations)?;
Ok(Self {
buffer,
converter,
batch_stream,
})
}
pub fn into_cursor(self) -> Result<C, odbc_api::Error> {
self.batch_stream.into_cursor()
}
}
impl<C> Iterator for ConcurrentOdbcReader<C>
where
C: Cursor,
{
type Item = Result<RecordBatch, ArrowError>;
fn next(&mut self) -> Option<Self::Item> {
match self.batch_stream.fetch_into(&mut self.buffer) {
Ok(true) => {
let result_record_batch = self
.converter
.buffer_to_record_batch(&self.buffer)
.map_err(|mapping_error| ArrowError::ExternalError(Box::new(mapping_error)));
Some(result_record_batch)
}
Ok(false) => None,
Err(odbc_error) => Some(Err(odbc_to_arrow_error(odbc_error))),
}
}
}
impl<C> RecordBatchReader for ConcurrentOdbcReader<C>
where
C: Cursor,
{
fn schema(&self) -> SchemaRef {
self.converter.schema().clone()
}
}