Enum sqlx_exasol::etl::ExaImport
source · pub enum ExaImport {
Setup(BoxFuture<'static, Result<ExaSocket>>, usize, bool),
Writing(ExaImportWriter),
}Expand description
An ETL IMPORT worker.
The type implements AsyncWrite and is Send and Sync so it can be (almost) freely
used for any data pipeline.
The only caveat is that you MUST call futures_util::AsyncWriteExt::close on each worker to
finalize the import. Otherwise, Exasol keeps on expecting data.
Atomicity
IMPORT jobs are not atomic by themselves. If an error occurs during the data ingestion,
some of the data might be already sent and written in the database. However, since
IMPORT is fundamentally just a query, it can be transactional. Therefore,
beginning a transaction and passing that to the ImportBuilder::build method will result in
the import job needing to be explicitly committed:
use std::env;
use sqlx_exasol::{etl::*, *};
let pool = ExaPool::connect(&env::var("DATABASE_URL").unwrap()).await?;
let mut con = pool.acquire().await?;
let mut tx = con.begin().await?;
let (query_fut, writers) = ImportBuilder::new("SOME_TABLE").build(&mut *tx).await?;
// concurrently use the writers and await the query future
tx.commit().await?;IMPORTANT
In multi-node environments closing a writer without writing any data to it can, and most likely will, cause issues; Exasol does not immediately start reading data from all workers but rather seems to connect to them sequentially after each of them provides some data.
From what I could gather from the logs, providing no data (although the request is responded to gracefully) makes Exasol retry the connection. With these workers being implemented as one-shot HTTP servers, there’s nothing to connect to anymore. Even if it were, it’s possible the connection would just be re-attempted over and over since we’d still be sending no data (I did not test this, though).
Since not using one or more import workers seems to be treated as an error on Exasol’s side, it’s best not to create excess writers that you don’t plan on using to avoid such issues.
Variants§
Trait Implementations§
source§impl AsyncWrite for ExaImport
impl AsyncWrite for ExaImport
source§fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8]
) -> Poll<IoResult<usize>>
fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8] ) -> Poll<IoResult<usize>>
buf into the object. Read moresource§fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>>
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>>
impl<'pin> Unpin for ExaImportwhere __ExaImport<'pin>: Unpin,
Auto Trait Implementations§
impl !RefUnwindSafe for ExaImport
impl Send for ExaImport
impl !Sync for ExaImport
impl !UnwindSafe for ExaImport
Blanket Implementations§
source§impl<W> AsyncWriteExt for Wwhere
W: AsyncWrite + ?Sized,
impl<W> AsyncWriteExt for Wwhere W: AsyncWrite + ?Sized,
source§fn flush(&mut self) -> Flush<'_, Self>where
Self: Unpin,
fn flush(&mut self) -> Flush<'_, Self>where Self: Unpin,
AsyncWrite. Read moresource§fn close(&mut self) -> Close<'_, Self>where
Self: Unpin,
fn close(&mut self) -> Close<'_, Self>where Self: Unpin,
AsyncWrite.source§fn write<'a>(&'a mut self, buf: &'a [u8]) -> Write<'a, Self>where
Self: Unpin,
fn write<'a>(&'a mut self, buf: &'a [u8]) -> Write<'a, Self>where Self: Unpin,
buf into the object. Read moresource§fn write_vectored<'a>(
&'a mut self,
bufs: &'a [IoSlice<'a>]
) -> WriteVectored<'a, Self>where
Self: Unpin,
fn write_vectored<'a>( &'a mut self, bufs: &'a [IoSlice<'a>] ) -> WriteVectored<'a, Self>where Self: Unpin,
bufs into the object using vectored
IO operations. Read more