pub enum ExaImport {
Setup(BoxFuture<'static, Result<ExaSocket>>, usize, bool),
Writing(ExaImportWriter),
}
Expand description
An ETL IMPORT worker.
The type implements AsyncWrite
and is Send
and Sync
so it can be (almost) freely
used for any data pipeline.
The only caveat is that you MUST call futures_util::AsyncWriteExt::close
on each worker to
finalize the import. Otherwise, Exasol keeps on expecting data.
§Atomicity
IMPORT
jobs are not atomic by themselves. If an error occurs during the data ingestion, some
of the data might be already sent and written in the database. However, since IMPORT
is
fundamentally just a query, it can be transactional. Therefore, beginning a transaction and
passing that to the ImportBuilder::build
method will result in the import job needing to be
explicitly committed:
use std::env;
use sqlx_exasol::{etl::*, *};
let pool = ExaPool::connect(&env::var("DATABASE_URL").unwrap()).await?;
let mut con = pool.acquire().await?;
let mut tx = con.begin().await?;
let (query_fut, writers) = ImportBuilder::new("SOME_TABLE").build(&mut *tx).await?;
// concurrently use the writers and await the query future
tx.commit().await?;
§IMPORTANT
In multi-node environments closing a writer without writing any data to it can, and most likely will, cause issues; Exasol does not immediately start reading data from all workers but rather seems to connect to them sequentially after each of them provides some data.
From what I could gather from the logs, providing no data (although the request is responded to gracefully) makes Exasol retry the connection. With these workers being implemented as one-shot HTTP servers, there’s nothing to connect to anymore. Even if it were, the connection would just be re-attempted over and over since we’d still be sending no data.
Since not using one or more import workers seems to be treated as an error on Exasol’s side, it’s best not to create excess writers that you don’t plan on using to avoid such issues.
See https://github.com/exasol/websocket-api/issues/33 for more details.
Variants§
Setup(BoxFuture<'static, Result<ExaSocket>>, usize, bool)
Setup state of the worker. This typically means waiting on the TLS handshake.
This approach is needed because Exasol will issue connections sequentially and thus perform TLS handshakes the same way.
Therefore we accommodate the worker state until the query gets executed and data gets sent through the workers, which happens within consumer code.
Writing(ExaImportWriter)
The worker is fully connected and ready for I/O.
Trait Implementations§
Source§impl AsyncWrite for ExaImport
impl AsyncWrite for ExaImport
Source§fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize>>
fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<Result<usize>>
buf
into the object. Read moreSource§fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>
Auto Trait Implementations§
impl !Freeze for ExaImport
impl !RefUnwindSafe for ExaImport
impl Send for ExaImport
impl !Sync for ExaImport
impl Unpin for ExaImport
impl !UnwindSafe for ExaImport
Blanket Implementations§
Source§impl<W> AsyncWriteExt for Wwhere
W: AsyncWrite + ?Sized,
impl<W> AsyncWriteExt for Wwhere
W: AsyncWrite + ?Sized,
Source§fn flush(&mut self) -> Flush<'_, Self>where
Self: Unpin,
fn flush(&mut self) -> Flush<'_, Self>where
Self: Unpin,
AsyncWrite
. Read moreSource§fn close(&mut self) -> Close<'_, Self>where
Self: Unpin,
fn close(&mut self) -> Close<'_, Self>where
Self: Unpin,
AsyncWrite
.Source§fn write<'a>(&'a mut self, buf: &'a [u8]) -> Write<'a, Self>where
Self: Unpin,
fn write<'a>(&'a mut self, buf: &'a [u8]) -> Write<'a, Self>where
Self: Unpin,
buf
into the object. Read moreSource§fn write_vectored<'a>(
&'a mut self,
bufs: &'a [IoSlice<'a>],
) -> WriteVectored<'a, Self>where
Self: Unpin,
fn write_vectored<'a>(
&'a mut self,
bufs: &'a [IoSlice<'a>],
) -> WriteVectored<'a, Self>where
Self: Unpin,
bufs
into the object using vectored
IO operations. Read moreSource§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read more