Struct arrow_odbc::ConcurrentOdbcReader
source · pub struct ConcurrentOdbcReader<C: Cursor> { /* private fields */ }
Expand description
Arrow ODBC reader. Implements the arrow::record_batch::RecordBatchReader
trait so it can be
used to fill Arrow arrays from an ODBC data source. Similar to crate::OdbcReader
, yet
ConcurrentOdbcReader
fetches ODBC batches in a second transit buffer eagerly from the
database in a dedicated system thread. This allows the allocation of the Arrow arrays and your
application logic to run on the main thread, while fetching the batches from the source happens
concurrently. You need twice the memory for the transit buffer for this strategy, since one is
may be in use by the main thread in order to copy values into arrow arrays, while the other is
used to write values from the database.
§Example
use arrow_odbc::{odbc_api::{Environment, ConnectionOptions}, OdbcReaderBuilder};
use std::sync::OnceLock;
// In order to fetch in a dedicated system thread we need a cursor with static lifetime,
// this implies a static ODBC environment.
static ENV: OnceLock<Environment> = OnceLock::new();
const CONNECTION_STRING: &str = "\
Driver={ODBC Driver 17 for SQL Server};\
Server=localhost;\
UID=SA;\
PWD=My@Test@Password1;\
";
fn main() -> Result<(), anyhow::Error> {
let odbc_environment = ENV.get_or_init(|| {Environment::new().unwrap() });
// Connect with database.
let connection = odbc_environment.connect_with_connection_string(
CONNECTION_STRING,
ConnectionOptions::default()
)?;
// This SQL statement does not require any arguments.
let parameters = ();
// Execute query and create result set
let cursor = connection
// Using `into_cursor` instead of `execute` takes ownership of the connection and
// allows for a cursor with static lifetime.
.into_cursor("SELECT * FROM MyTable", parameters)?
.expect("SELECT statement must produce a cursor");
// Construct ODBC reader and make it concurrent
let arrow_record_batches = OdbcReaderBuilder::new().build(cursor)?.into_concurrent()?;
for batch in arrow_record_batches {
// ... process batch ...
}
Ok(())
}
Implementations§
source§impl<C: Cursor + Send + 'static> ConcurrentOdbcReader<C>
impl<C: Cursor + Send + 'static> ConcurrentOdbcReader<C>
sourcepub fn into_cursor(self) -> Result<C, Error>
pub fn into_cursor(self) -> Result<C, Error>
Destroy the ODBC arrow reader and yield the underlyinng cursor object.
One application of this is to process more than one result set in case you executed a stored procedure.
Due to the concurrent fetching of row groups you can not know how many row groups have been
extracted once the cursor is returned. Unless that is that the entire cursor has been
consumed i.e. Self::next
returned None
.
Trait Implementations§
source§impl<C> Iterator for ConcurrentOdbcReader<C>where
C: Cursor,
impl<C> Iterator for ConcurrentOdbcReader<C>where
C: Cursor,
§type Item = Result<RecordBatch, ArrowError>
type Item = Result<RecordBatch, ArrowError>
source§fn next(&mut self) -> Option<Self::Item>
fn next(&mut self) -> Option<Self::Item>
source§fn next_chunk<const N: usize>(
&mut self
) -> Result<[Self::Item; N], IntoIter<Self::Item, N>>where
Self: Sized,
fn next_chunk<const N: usize>(
&mut self
) -> Result<[Self::Item; N], IntoIter<Self::Item, N>>where
Self: Sized,
iter_next_chunk
)N
values. Read more1.0.0 · source§fn size_hint(&self) -> (usize, Option<usize>)
fn size_hint(&self) -> (usize, Option<usize>)
1.0.0 · source§fn count(self) -> usizewhere
Self: Sized,
fn count(self) -> usizewhere
Self: Sized,
1.0.0 · source§fn last(self) -> Option<Self::Item>where
Self: Sized,
fn last(self) -> Option<Self::Item>where
Self: Sized,
source§fn advance_by(&mut self, n: usize) -> Result<(), NonZero<usize>>
fn advance_by(&mut self, n: usize) -> Result<(), NonZero<usize>>
iter_advance_by
)n
elements. Read more1.0.0 · source§fn nth(&mut self, n: usize) -> Option<Self::Item>
fn nth(&mut self, n: usize) -> Option<Self::Item>
n
th element of the iterator. Read more1.28.0 · source§fn step_by(self, step: usize) -> StepBy<Self>where
Self: Sized,
fn step_by(self, step: usize) -> StepBy<Self>where
Self: Sized,
1.0.0 · source§fn chain<U>(self, other: U) -> Chain<Self, <U as IntoIterator>::IntoIter>
fn chain<U>(self, other: U) -> Chain<Self, <U as IntoIterator>::IntoIter>
1.0.0 · source§fn zip<U>(self, other: U) -> Zip<Self, <U as IntoIterator>::IntoIter>where
Self: Sized,
U: IntoIterator,
fn zip<U>(self, other: U) -> Zip<Self, <U as IntoIterator>::IntoIter>where
Self: Sized,
U: IntoIterator,
source§fn intersperse_with<G>(self, separator: G) -> IntersperseWith<Self, G>
fn intersperse_with<G>(self, separator: G) -> IntersperseWith<Self, G>
iter_intersperse
)separator
between adjacent items of the original iterator. Read more1.0.0 · source§fn map<B, F>(self, f: F) -> Map<Self, F>
fn map<B, F>(self, f: F) -> Map<Self, F>
1.0.0 · source§fn filter<P>(self, predicate: P) -> Filter<Self, P>
fn filter<P>(self, predicate: P) -> Filter<Self, P>
1.0.0 · source§fn filter_map<B, F>(self, f: F) -> FilterMap<Self, F>
fn filter_map<B, F>(self, f: F) -> FilterMap<Self, F>
1.0.0 · source§fn enumerate(self) -> Enumerate<Self>where
Self: Sized,
fn enumerate(self) -> Enumerate<Self>where
Self: Sized,
1.0.0 · source§fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P>
fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P>
1.0.0 · source§fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P>
fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P>
1.57.0 · source§fn map_while<B, P>(self, predicate: P) -> MapWhile<Self, P>
fn map_while<B, P>(self, predicate: P) -> MapWhile<Self, P>
1.0.0 · source§fn skip(self, n: usize) -> Skip<Self>where
Self: Sized,
fn skip(self, n: usize) -> Skip<Self>where
Self: Sized,
n
elements. Read more1.0.0 · source§fn take(self, n: usize) -> Take<Self>where
Self: Sized,
fn take(self, n: usize) -> Take<Self>where
Self: Sized,
n
elements, or fewer
if the underlying iterator ends sooner. Read more1.0.0 · source§fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
source§fn map_windows<F, R, const N: usize>(self, f: F) -> MapWindows<Self, F, N>
fn map_windows<F, R, const N: usize>(self, f: F) -> MapWindows<Self, F, N>
iter_map_windows
)f
for each contiguous window of size N
over
self
and returns an iterator over the outputs of f
. Like slice::windows()
,
the windows during mapping overlap as well. Read more1.0.0 · source§fn inspect<F>(self, f: F) -> Inspect<Self, F>
fn inspect<F>(self, f: F) -> Inspect<Self, F>
1.0.0 · source§fn by_ref(&mut self) -> &mut Selfwhere
Self: Sized,
fn by_ref(&mut self) -> &mut Selfwhere
Self: Sized,
source§fn collect_into<E>(self, collection: &mut E) -> &mut E
fn collect_into<E>(self, collection: &mut E) -> &mut E
iter_collect_into
)1.0.0 · source§fn partition<B, F>(self, f: F) -> (B, B)
fn partition<B, F>(self, f: F) -> (B, B)
source§fn is_partitioned<P>(self, predicate: P) -> bool
fn is_partitioned<P>(self, predicate: P) -> bool
iter_is_partitioned
)true
precede all those that return false
. Read more1.27.0 · source§fn try_fold<B, F, R>(&mut self, init: B, f: F) -> R
fn try_fold<B, F, R>(&mut self, init: B, f: F) -> R
1.27.0 · source§fn try_for_each<F, R>(&mut self, f: F) -> R
fn try_for_each<F, R>(&mut self, f: F) -> R
1.0.0 · source§fn fold<B, F>(self, init: B, f: F) -> B
fn fold<B, F>(self, init: B, f: F) -> B
1.51.0 · source§fn reduce<F>(self, f: F) -> Option<Self::Item>
fn reduce<F>(self, f: F) -> Option<Self::Item>
source§fn try_reduce<F, R>(
&mut self,
f: F
) -> <<R as Try>::Residual as Residual<Option<<R as Try>::Output>>>::TryType
fn try_reduce<F, R>( &mut self, f: F ) -> <<R as Try>::Residual as Residual<Option<<R as Try>::Output>>>::TryType
iterator_try_reduce
)1.0.0 · source§fn all<F>(&mut self, f: F) -> bool
fn all<F>(&mut self, f: F) -> bool
1.0.0 · source§fn any<F>(&mut self, f: F) -> bool
fn any<F>(&mut self, f: F) -> bool
1.0.0 · source§fn find<P>(&mut self, predicate: P) -> Option<Self::Item>
fn find<P>(&mut self, predicate: P) -> Option<Self::Item>
1.30.0 · source§fn find_map<B, F>(&mut self, f: F) -> Option<B>
fn find_map<B, F>(&mut self, f: F) -> Option<B>
source§fn try_find<F, R>(
&mut self,
f: F
) -> <<R as Try>::Residual as Residual<Option<Self::Item>>>::TryType
fn try_find<F, R>( &mut self, f: F ) -> <<R as Try>::Residual as Residual<Option<Self::Item>>>::TryType
try_find
)1.0.0 · source§fn position<P>(&mut self, predicate: P) -> Option<usize>
fn position<P>(&mut self, predicate: P) -> Option<usize>
1.6.0 · source§fn max_by_key<B, F>(self, f: F) -> Option<Self::Item>
fn max_by_key<B, F>(self, f: F) -> Option<Self::Item>
1.15.0 · source§fn max_by<F>(self, compare: F) -> Option<Self::Item>
fn max_by<F>(self, compare: F) -> Option<Self::Item>
1.6.0 · source§fn min_by_key<B, F>(self, f: F) -> Option<Self::Item>
fn min_by_key<B, F>(self, f: F) -> Option<Self::Item>
1.15.0 · source§fn min_by<F>(self, compare: F) -> Option<Self::Item>
fn min_by<F>(self, compare: F) -> Option<Self::Item>
1.0.0 · source§fn unzip<A, B, FromA, FromB>(self) -> (FromA, FromB)
fn unzip<A, B, FromA, FromB>(self) -> (FromA, FromB)
1.36.0 · source§fn copied<'a, T>(self) -> Copied<Self>
fn copied<'a, T>(self) -> Copied<Self>
source§fn array_chunks<const N: usize>(self) -> ArrayChunks<Self, N>where
Self: Sized,
fn array_chunks<const N: usize>(self) -> ArrayChunks<Self, N>where
Self: Sized,
iter_array_chunks
)N
elements of the iterator at a time. Read more1.11.0 · source§fn product<P>(self) -> P
fn product<P>(self) -> P
source§fn cmp_by<I, F>(self, other: I, cmp: F) -> Ordering
fn cmp_by<I, F>(self, other: I, cmp: F) -> Ordering
iter_order_by
)Iterator
with those
of another with respect to the specified comparison function. Read more1.5.0 · source§fn partial_cmp<I>(self, other: I) -> Option<Ordering>
fn partial_cmp<I>(self, other: I) -> Option<Ordering>
PartialOrd
elements of
this Iterator
with those of another. The comparison works like short-circuit
evaluation, returning a result without comparing the remaining elements.
As soon as an order can be determined, the evaluation stops and a result is returned. Read moresource§fn partial_cmp_by<I, F>(self, other: I, partial_cmp: F) -> Option<Ordering>where
Self: Sized,
I: IntoIterator,
F: FnMut(Self::Item, <I as IntoIterator>::Item) -> Option<Ordering>,
fn partial_cmp_by<I, F>(self, other: I, partial_cmp: F) -> Option<Ordering>where
Self: Sized,
I: IntoIterator,
F: FnMut(Self::Item, <I as IntoIterator>::Item) -> Option<Ordering>,
iter_order_by
)Iterator
with those
of another with respect to the specified comparison function. Read moresource§fn eq_by<I, F>(self, other: I, eq: F) -> bool
fn eq_by<I, F>(self, other: I, eq: F) -> bool
iter_order_by
)1.5.0 · source§fn lt<I>(self, other: I) -> bool
fn lt<I>(self, other: I) -> bool
Iterator
are lexicographically
less than those of another. Read more1.5.0 · source§fn le<I>(self, other: I) -> bool
fn le<I>(self, other: I) -> bool
Iterator
are lexicographically
less or equal to those of another. Read more1.5.0 · source§fn gt<I>(self, other: I) -> bool
fn gt<I>(self, other: I) -> bool
Iterator
are lexicographically
greater than those of another. Read more1.5.0 · source§fn ge<I>(self, other: I) -> bool
fn ge<I>(self, other: I) -> bool
Iterator
are lexicographically
greater than or equal to those of another. Read moresource§fn is_sorted_by<F>(self, compare: F) -> bool
fn is_sorted_by<F>(self, compare: F) -> bool
is_sorted
)source§fn is_sorted_by_key<F, K>(self, f: F) -> bool
fn is_sorted_by_key<F, K>(self, f: F) -> bool
is_sorted
)source§impl<C> RecordBatchReader for ConcurrentOdbcReader<C>where
C: Cursor,
impl<C> RecordBatchReader for ConcurrentOdbcReader<C>where
C: Cursor,
source§fn next_batch(&mut self) -> Result<Option<RecordBatch>, ArrowError>
fn next_batch(&mut self) -> Result<Option<RecordBatch>, ArrowError>
next
from the trait Iterator.RecordBatch
.