1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
mod compression;
mod options;
mod trim;
mod writer;
use std::{
fmt::Debug,
io::Result as IoResult,
pin::Pin,
task::{ready, Context, Poll},
};
use compression::ExaImportWriter;
use futures_io::AsyncWrite;
use futures_util::FutureExt;
pub use options::ImportBuilder;
use pin_project::pin_project;
pub use trim::Trim;
use super::SocketFuture;
/// An ETL IMPORT worker.
///
/// The type implements [`AsyncWrite`] and is [`Send`] and [`Sync`] so it can be (almost) freely
/// used for any data pipeline.
///
/// The only caveat is that you *MUST* call [`futures_util::AsyncWriteExt::close`] on each worker to
/// finalize the import. Otherwise, Exasol keeps on expecting data.
///
/// # Atomicity
///
/// `IMPORT` jobs are not atomic by themselves. If an error occurs during the data ingestion,
/// some of the data might be already sent and written in the database. However, since
/// `IMPORT` is fundamentally just a query, it *can* be transactional. Therefore,
/// beginning a transaction and passing that to the [`ImportBuilder::build`] method will result in
/// the import job needing to be explicitly committed:
///
/// ```rust,no_run
/// use std::env;
///
/// use sqlx_exasol::{etl::*, *};
///
/// # async {
/// #
/// let pool = ExaPool::connect(&env::var("DATABASE_URL").unwrap()).await?;
/// let mut con = pool.acquire().await?;
/// let mut tx = con.begin().await?;
///
/// let (query_fut, writers) = ImportBuilder::new("SOME_TABLE").build(&mut *tx).await?;
///
/// // concurrently use the writers and await the query future
///
/// tx.commit().await?;
/// #
/// # let res: anyhow::Result<()> = Ok(());
/// # res
/// # };
/// ```
///
/// # IMPORTANT
///
/// In multi-node environments closing a writer without writing any data to it can, and most likely
/// will, cause issues; Exasol does not immediately start reading data from all workers but rather
/// seems to connect to them sequentially after each of them provides some data.
///
/// From what I could gather from the logs, providing no data (although the request is
/// responded to gracefully) makes Exasol retry the connection. With these workers being
/// implemented as one-shot HTTP servers, there's nothing to connect to anymore. Even if it
/// were, the connection would just be re-attempted over and over since we'd still
/// be sending no data.
///
/// Since not using one or more import workers seems to be treated as an error on Exasol's side,
/// it's best not to create excess writers that you don't plan on using to avoid such issues.
#[allow(clippy::large_enum_variant)]
#[pin_project(project = ExaImportProj)]
pub enum ExaImport {
Setup(SocketFuture, usize, bool),
Writing(#[pin] ExaImportWriter),
}
impl Debug for ExaImport {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Setup(..) => f.debug_tuple("Setup").finish(),
Self::Writing(arg0) => f.debug_tuple("Writing").field(arg0).finish(),
}
}
}
impl AsyncWrite for ExaImport {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<IoResult<usize>> {
loop {
let (socket, buffer_size, with_compression) = match self.as_mut().project() {
ExaImportProj::Writing(s) => return s.poll_write(cx, buf),
ExaImportProj::Setup(f, s, c) => (ready!(f.poll_unpin(cx))?, *s, *c),
};
let writer = ExaImportWriter::new(socket, buffer_size, with_compression);
self.set(Self::Writing(writer));
}
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
loop {
let (socket, buffer_size, with_compression) = match self.as_mut().project() {
ExaImportProj::Writing(s) => return s.poll_flush(cx),
ExaImportProj::Setup(f, s, c) => (ready!(f.poll_unpin(cx))?, *s, *c),
};
let writer = ExaImportWriter::new(socket, buffer_size, with_compression);
self.set(Self::Writing(writer));
}
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
loop {
let (socket, buffer_size, with_compression) = match self.as_mut().project() {
ExaImportProj::Writing(s) => return s.poll_close(cx),
ExaImportProj::Setup(f, s, c) => (ready!(f.poll_unpin(cx))?, *s, *c),
};
let writer = ExaImportWriter::new(socket, buffer_size, with_compression);
self.set(Self::Writing(writer));
}
}
}