use std::{
fmt::Debug,
future::Future,
io,
pin::Pin,
task::{ready, Context, Poll},
};
use futures_core::FusedFuture;
use futures_util::{
future::{self, MaybeDone, TryJoinAll},
FutureExt,
};
use crate::{
connection::websocket::{
future::{ExaFuture, ExaRoundtrip, WebSocketFuture},
request::Execute,
ExaWebSocket,
},
etl::{error::ExaEtlError, job::ServerBootstrap},
responses::{QueryResult, SingleResult},
ExaQueryResult, SqlxResult,
};
pub struct EtlQuery<'c> {
query: MaybeDone<ExaFuture<'c, ExecuteEtl>>,
bootstrap: MaybeDone<TryJoinAll<ServerBootstrap>>,
}
impl<'c> EtlQuery<'c> {
pub(crate) fn new(
query_future: ExaFuture<'c, ExecuteEtl>,
bootstrap_futures: Vec<ServerBootstrap>,
) -> Self {
Self {
query: future::maybe_done(query_future),
bootstrap: future::maybe_done(future::try_join_all(bootstrap_futures)),
}
}
}
impl Future for EtlQuery<'_> {
type Output = SqlxResult<ExaQueryResult>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if !self.bootstrap.is_terminated() {
let _ = self.bootstrap.poll_unpin(cx);
}
if !self.query.is_terminated() {
let _ = self.query.poll_unpin(cx);
}
let query_res = Pin::new(&mut self.query).take_output().transpose()?;
Pin::new(&mut self.bootstrap).take_output().transpose()?;
query_res.map(Ok).map_or(Poll::Pending, Poll::Ready)
}
}
#[derive(Debug)]
pub struct ExecuteEtl(pub ExaRoundtrip<Execute, SingleResult>);
impl WebSocketFuture for ExecuteEtl {
type Output = ExaQueryResult;
fn poll_unpin(
&mut self,
cx: &mut Context<'_>,
ws: &mut ExaWebSocket,
) -> Poll<SqlxResult<Self::Output>> {
match QueryResult::from(ready!(self.0.poll_unpin(cx, ws))?) {
QueryResult::ResultSet { .. } => Err(io::Error::from(ExaEtlError::ResultSetFromEtl))?,
QueryResult::RowCount { row_count } => Poll::Ready(Ok(ExaQueryResult::new(row_count))),
}
}
}
impl Debug for EtlQuery<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EtlQuery").finish()
}
}