pub struct ConcurrentOdbcReader<C: Cursor> { /* private fields */ }Expand description
Arrow ODBC reader. Implements the arrow::record_batch::RecordBatchReader trait so it can be
used to fill Arrow arrays from an ODBC data source. Similar to crate::OdbcReader, yet
ConcurrentOdbcReader fetches ODBC batches in a second transit buffer eagerly from the
database in a dedicated system thread. This allows the allocation of the Arrow arrays and your
application logic to run on the main thread, while fetching the batches from the source happens
concurrently. You need twice the memory for the transit buffer for this strategy, since one is
may be in use by the main thread in order to copy values into arrow arrays, while the other is
used to write values from the database.
§Example
use arrow_odbc::{odbc_api::{Environment, ConnectionOptions}, OdbcReaderBuilder};
use std::sync::OnceLock;
// In order to fetch in a dedicated system thread we need a cursor with static lifetime,
// this implies a static ODBC environment.
static ENV: OnceLock<Environment> = OnceLock::new();
const CONNECTION_STRING: &str = "\
Driver={ODBC Driver 18 for SQL Server};\
Server=localhost;\
UID=SA;\
PWD=My@Test@Password1;\
";
fn main() -> Result<(), anyhow::Error> {
let odbc_environment = ENV.get_or_init(|| {Environment::new().unwrap() });
// Connect with database.
let connection = odbc_environment.connect_with_connection_string(
CONNECTION_STRING,
ConnectionOptions::default()
)?;
// This SQL statement does not require any arguments.
let parameters = ();
// Do not apply any timout.
let timeout_sec = None;
// Execute query and create result set
let cursor = connection
// Using `into_cursor` instead of `execute` takes ownership of the connection and
// allows for a cursor with static lifetime.
.into_cursor("SELECT * FROM MyTable", parameters, timeout_sec)
.map_err(|e| e.error)?
.expect("SELECT statement must produce a cursor");
// Construct ODBC reader and make it concurrent
let arrow_record_batches = OdbcReaderBuilder::new().build(cursor)?.into_concurrent()?;
for batch in arrow_record_batches {
// ... process batch ...
}
Ok(())
}Implementations§
Source§impl<C: Cursor + Send + 'static> ConcurrentOdbcReader<C>
impl<C: Cursor + Send + 'static> ConcurrentOdbcReader<C>
Sourcepub fn into_cursor(self) -> Result<C, Error>
pub fn into_cursor(self) -> Result<C, Error>
Destroy the ODBC arrow reader and yield the underlyinng cursor object.
One application of this is to process more than one result set in case you executed a stored procedure.
Due to the concurrent fetching of row groups you can not know how many row groups have been
extracted once the cursor is returned. Unless that is that the entire cursor has been
consumed i.e. Self::next returned None.
Trait Implementations§
Source§impl<C> Iterator for ConcurrentOdbcReader<C>where
C: Cursor,
impl<C> Iterator for ConcurrentOdbcReader<C>where
C: Cursor,
Source§type Item = Result<RecordBatch, ArrowError>
type Item = Result<RecordBatch, ArrowError>
Source§fn next(&mut self) -> Option<Self::Item>
fn next(&mut self) -> Option<Self::Item>
Source§fn next_chunk<const N: usize>(
&mut self,
) -> Result<[Self::Item; N], IntoIter<Self::Item, N>>where
Self: Sized,
fn next_chunk<const N: usize>(
&mut self,
) -> Result<[Self::Item; N], IntoIter<Self::Item, N>>where
Self: Sized,
iter_next_chunk)N values. Read more1.0.0 · Source§fn size_hint(&self) -> (usize, Option<usize>)
fn size_hint(&self) -> (usize, Option<usize>)
1.0.0 · Source§fn count(self) -> usizewhere
Self: Sized,
fn count(self) -> usizewhere
Self: Sized,
1.0.0 · Source§fn last(self) -> Option<Self::Item>where
Self: Sized,
fn last(self) -> Option<Self::Item>where
Self: Sized,
Source§fn advance_by(&mut self, n: usize) -> Result<(), NonZero<usize>>
fn advance_by(&mut self, n: usize) -> Result<(), NonZero<usize>>
iter_advance_by)n elements. Read more1.0.0 · Source§fn nth(&mut self, n: usize) -> Option<Self::Item>
fn nth(&mut self, n: usize) -> Option<Self::Item>
nth element of the iterator. Read more1.28.0 · Source§fn step_by(self, step: usize) -> StepBy<Self>where
Self: Sized,
fn step_by(self, step: usize) -> StepBy<Self>where
Self: Sized,
1.0.0 · Source§fn chain<U>(self, other: U) -> Chain<Self, <U as IntoIterator>::IntoIter>
fn chain<U>(self, other: U) -> Chain<Self, <U as IntoIterator>::IntoIter>
1.0.0 · Source§fn zip<U>(self, other: U) -> Zip<Self, <U as IntoIterator>::IntoIter>where
Self: Sized,
U: IntoIterator,
fn zip<U>(self, other: U) -> Zip<Self, <U as IntoIterator>::IntoIter>where
Self: Sized,
U: IntoIterator,
Source§fn intersperse(self, separator: Self::Item) -> Intersperse<Self>
fn intersperse(self, separator: Self::Item) -> Intersperse<Self>
iter_intersperse)separator between adjacent
items of the original iterator. Read moreSource§fn intersperse_with<G>(self, separator: G) -> IntersperseWith<Self, G>
fn intersperse_with<G>(self, separator: G) -> IntersperseWith<Self, G>
iter_intersperse)separator
between adjacent items of the original iterator. Read more1.0.0 · Source§fn map<B, F>(self, f: F) -> Map<Self, F>
fn map<B, F>(self, f: F) -> Map<Self, F>
1.0.0 · Source§fn filter<P>(self, predicate: P) -> Filter<Self, P>
fn filter<P>(self, predicate: P) -> Filter<Self, P>
1.0.0 · Source§fn filter_map<B, F>(self, f: F) -> FilterMap<Self, F>
fn filter_map<B, F>(self, f: F) -> FilterMap<Self, F>
1.0.0 · Source§fn enumerate(self) -> Enumerate<Self>where
Self: Sized,
fn enumerate(self) -> Enumerate<Self>where
Self: Sized,
1.0.0 · Source§fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P>
fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P>
1.0.0 · Source§fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P>
fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P>
1.57.0 · Source§fn map_while<B, P>(self, predicate: P) -> MapWhile<Self, P>
fn map_while<B, P>(self, predicate: P) -> MapWhile<Self, P>
1.0.0 · Source§fn skip(self, n: usize) -> Skip<Self>where
Self: Sized,
fn skip(self, n: usize) -> Skip<Self>where
Self: Sized,
n elements. Read more1.0.0 · Source§fn take(self, n: usize) -> Take<Self>where
Self: Sized,
fn take(self, n: usize) -> Take<Self>where
Self: Sized,
n elements, or fewer
if the underlying iterator ends sooner. Read more1.0.0 · Source§fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
1.29.0 · Source§fn flatten(self) -> Flatten<Self>
fn flatten(self) -> Flatten<Self>
Source§fn map_windows<F, R, const N: usize>(self, f: F) -> MapWindows<Self, F, N>
fn map_windows<F, R, const N: usize>(self, f: F) -> MapWindows<Self, F, N>
iter_map_windows)f for each contiguous window of size N over
self and returns an iterator over the outputs of f. Like slice::windows(),
the windows during mapping overlap as well. Read more1.0.0 · Source§fn inspect<F>(self, f: F) -> Inspect<Self, F>
fn inspect<F>(self, f: F) -> Inspect<Self, F>
1.0.0 · Source§fn by_ref(&mut self) -> &mut Selfwhere
Self: Sized,
fn by_ref(&mut self) -> &mut Selfwhere
Self: Sized,
Iterator. Read moreSource§fn try_collect<B>(
&mut self,
) -> <<Self::Item as Try>::Residual as Residual<B>>::TryType
fn try_collect<B>( &mut self, ) -> <<Self::Item as Try>::Residual as Residual<B>>::TryType
iterator_try_collect)Source§fn collect_into<E>(self, collection: &mut E) -> &mut E
fn collect_into<E>(self, collection: &mut E) -> &mut E
iter_collect_into)1.0.0 · Source§fn partition<B, F>(self, f: F) -> (B, B)
fn partition<B, F>(self, f: F) -> (B, B)
Source§fn is_partitioned<P>(self, predicate: P) -> bool
fn is_partitioned<P>(self, predicate: P) -> bool
iter_is_partitioned)true precede all those that return false. Read more1.27.0 · Source§fn try_fold<B, F, R>(&mut self, init: B, f: F) -> R
fn try_fold<B, F, R>(&mut self, init: B, f: F) -> R
1.27.0 · Source§fn try_for_each<F, R>(&mut self, f: F) -> R
fn try_for_each<F, R>(&mut self, f: F) -> R
1.0.0 · Source§fn fold<B, F>(self, init: B, f: F) -> B
fn fold<B, F>(self, init: B, f: F) -> B
1.51.0 · Source§fn reduce<F>(self, f: F) -> Option<Self::Item>
fn reduce<F>(self, f: F) -> Option<Self::Item>
Source§fn try_reduce<R>(
&mut self,
f: impl FnMut(Self::Item, Self::Item) -> R,
) -> <<R as Try>::Residual as Residual<Option<<R as Try>::Output>>>::TryType
fn try_reduce<R>( &mut self, f: impl FnMut(Self::Item, Self::Item) -> R, ) -> <<R as Try>::Residual as Residual<Option<<R as Try>::Output>>>::TryType
iterator_try_reduce)1.0.0 · Source§fn all<F>(&mut self, f: F) -> bool
fn all<F>(&mut self, f: F) -> bool
1.0.0 · Source§fn any<F>(&mut self, f: F) -> bool
fn any<F>(&mut self, f: F) -> bool
1.0.0 · Source§fn find<P>(&mut self, predicate: P) -> Option<Self::Item>
fn find<P>(&mut self, predicate: P) -> Option<Self::Item>
1.30.0 · Source§fn find_map<B, F>(&mut self, f: F) -> Option<B>
fn find_map<B, F>(&mut self, f: F) -> Option<B>
Source§fn try_find<R>(
&mut self,
f: impl FnMut(&Self::Item) -> R,
) -> <<R as Try>::Residual as Residual<Option<Self::Item>>>::TryType
fn try_find<R>( &mut self, f: impl FnMut(&Self::Item) -> R, ) -> <<R as Try>::Residual as Residual<Option<Self::Item>>>::TryType
try_find)1.0.0 · Source§fn position<P>(&mut self, predicate: P) -> Option<usize>
fn position<P>(&mut self, predicate: P) -> Option<usize>
1.0.0 · Source§fn max(self) -> Option<Self::Item>
fn max(self) -> Option<Self::Item>
1.0.0 · Source§fn min(self) -> Option<Self::Item>
fn min(self) -> Option<Self::Item>
1.6.0 · Source§fn max_by_key<B, F>(self, f: F) -> Option<Self::Item>
fn max_by_key<B, F>(self, f: F) -> Option<Self::Item>
1.15.0 · Source§fn max_by<F>(self, compare: F) -> Option<Self::Item>
fn max_by<F>(self, compare: F) -> Option<Self::Item>
1.6.0 · Source§fn min_by_key<B, F>(self, f: F) -> Option<Self::Item>
fn min_by_key<B, F>(self, f: F) -> Option<Self::Item>
1.15.0 · Source§fn min_by<F>(self, compare: F) -> Option<Self::Item>
fn min_by<F>(self, compare: F) -> Option<Self::Item>
1.0.0 · Source§fn unzip<A, B, FromA, FromB>(self) -> (FromA, FromB)
fn unzip<A, B, FromA, FromB>(self) -> (FromA, FromB)
1.36.0 · Source§fn copied<'a, T>(self) -> Copied<Self>
fn copied<'a, T>(self) -> Copied<Self>
Source§fn array_chunks<const N: usize>(self) -> ArrayChunks<Self, N>where
Self: Sized,
fn array_chunks<const N: usize>(self) -> ArrayChunks<Self, N>where
Self: Sized,
iter_array_chunks)N elements of the iterator at a time. Read more1.11.0 · Source§fn product<P>(self) -> P
fn product<P>(self) -> P
Source§fn cmp_by<I, F>(self, other: I, cmp: F) -> Ordering
fn cmp_by<I, F>(self, other: I, cmp: F) -> Ordering
iter_order_by)Iterator with those
of another with respect to the specified comparison function. Read more1.5.0 · Source§fn partial_cmp<I>(self, other: I) -> Option<Ordering>
fn partial_cmp<I>(self, other: I) -> Option<Ordering>
PartialOrd elements of
this Iterator with those of another. The comparison works like short-circuit
evaluation, returning a result without comparing the remaining elements.
As soon as an order can be determined, the evaluation stops and a result is returned. Read moreSource§fn partial_cmp_by<I, F>(self, other: I, partial_cmp: F) -> Option<Ordering>where
Self: Sized,
I: IntoIterator,
F: FnMut(Self::Item, <I as IntoIterator>::Item) -> Option<Ordering>,
fn partial_cmp_by<I, F>(self, other: I, partial_cmp: F) -> Option<Ordering>where
Self: Sized,
I: IntoIterator,
F: FnMut(Self::Item, <I as IntoIterator>::Item) -> Option<Ordering>,
iter_order_by)Iterator with those
of another with respect to the specified comparison function. Read moreSource§fn eq_by<I, F>(self, other: I, eq: F) -> bool
fn eq_by<I, F>(self, other: I, eq: F) -> bool
iter_order_by)1.5.0 · Source§fn lt<I>(self, other: I) -> bool
fn lt<I>(self, other: I) -> bool
Iterator are lexicographically
less than those of another. Read more1.5.0 · Source§fn le<I>(self, other: I) -> bool
fn le<I>(self, other: I) -> bool
Iterator are lexicographically
less or equal to those of another. Read more1.5.0 · Source§fn gt<I>(self, other: I) -> bool
fn gt<I>(self, other: I) -> bool
Iterator are lexicographically
greater than those of another. Read more1.5.0 · Source§fn ge<I>(self, other: I) -> bool
fn ge<I>(self, other: I) -> bool
Iterator are lexicographically
greater than or equal to those of another. Read more