1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
mod compression;
mod export_source;
mod options;
mod reader;

use std::{
    fmt::Debug,
    io::Result as IoResult,
    pin::Pin,
    task::{ready, Context, Poll},
};

use compression::ExaExportReader;
pub use export_source::ExportSource;
use futures_io::AsyncRead;
use futures_util::FutureExt;
pub use options::ExportBuilder;
use pin_project::pin_project;

use super::SocketFuture;

/// An ETL EXPORT worker.
///
/// The type implements [`AsyncRead`] and is [`Send`] and [`Sync`] so it can be freely used
/// in any data pipeline.
///
/// # IMPORTANT
///
/// Dropping a reader before it returned EOF will result in the `EXPORT` query returning an error.
/// While not necessarily a problem if you're not interested in the whole export, there's no way to
/// circumvent that other than handling the error in code.
#[allow(clippy::large_enum_variant)]
#[pin_project(project = ExaExportProj)]
pub enum ExaExport {
    Setup(SocketFuture, bool),
    Reading(#[pin] ExaExportReader),
}

impl Debug for ExaExport {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            Self::Setup(..) => f.debug_tuple("Setup").finish(),
            Self::Reading(arg0) => f.debug_tuple("Reading").field(arg0).finish(),
        }
    }
}

impl AsyncRead for ExaExport {
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<IoResult<usize>> {
        loop {
            let (socket, with_compression) = match self.as_mut().project() {
                ExaExportProj::Reading(r) => return r.poll_read(cx, buf),
                ExaExportProj::Setup(f, c) => (ready!(f.poll_unpin(cx))?, *c),
            };

            let reader = ExaExportReader::new(socket, with_compression);
            self.set(Self::Reading(reader));
        }
    }
}