use std::{
fmt,
future::Future,
pin::Pin,
task::{Context, Poll},
};
use futures_core::ready;
#[cfg(feature = "tracing")]
use {
std::sync::Arc,
tracing::{debug_span, Span},
};
use crate::{
conn::{
pool::{Pool, QueueId},
Conn,
},
error::*,
};
pub(crate) enum GetConnInner {
New,
Done,
Connecting(crate::BoxFuture<'static, Conn>),
Checking(crate::BoxFuture<'static, Conn>),
}
impl fmt::Debug for GetConnInner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
GetConnInner::New => f.debug_tuple("GetConnInner::New").finish(),
GetConnInner::Done => f.debug_tuple("GetConnInner::Done").finish(),
GetConnInner::Connecting(_) => f
.debug_tuple("GetConnInner::Connecting")
.field(&"<future>")
.finish(),
GetConnInner::Checking(_) => f
.debug_tuple("GetConnInner::Checking")
.field(&"<future>")
.finish(),
}
}
}
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct GetConn {
pub(crate) queue_id: QueueId,
pub(crate) pool: Option<Pool>,
pub(crate) inner: GetConnInner,
reset_upon_returning_to_a_pool: bool,
#[cfg(feature = "tracing")]
span: Arc<Span>,
}
impl GetConn {
pub(crate) fn new(pool: &Pool, reset_upon_returning_to_a_pool: bool) -> GetConn {
GetConn {
queue_id: QueueId::next(),
pool: Some(pool.clone()),
inner: GetConnInner::New,
reset_upon_returning_to_a_pool,
#[cfg(feature = "tracing")]
span: Arc::new(debug_span!("mysql_async::get_conn")),
}
}
fn pool_mut(&mut self) -> &mut Pool {
self.pool
.as_mut()
.expect("GetConn::poll polled after returning Async::Ready")
}
fn pool_take(&mut self) -> Pool {
self.pool
.take()
.expect("GetConn::poll polled after returning Async::Ready")
}
}
impl Future for GetConn {
type Output = Result<Conn>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
#[cfg(feature = "tracing")]
let span = self.span.clone();
#[cfg(feature = "tracing")]
let _span_guard = span.enter();
loop {
match self.inner {
GetConnInner::New => {
let queue_id = self.queue_id;
let next = ready!(self.pool_mut().poll_new_conn(cx, queue_id))?;
match next {
GetConnInner::Connecting(conn_fut) => {
self.inner = GetConnInner::Connecting(conn_fut);
}
GetConnInner::Checking(conn_fut) => {
self.inner = GetConnInner::Checking(conn_fut);
}
GetConnInner::Done => unreachable!(
"Pool::poll_new_conn never gives out already-consumed GetConns"
),
GetConnInner::New => {
unreachable!("Pool::poll_new_conn never gives out GetConnInner::New")
}
}
}
GetConnInner::Done => {
unreachable!("GetConn::poll polled after returning Async::Ready");
}
GetConnInner::Connecting(ref mut f) => {
let result = ready!(Pin::new(f).poll(cx));
let pool = self.pool_take();
self.inner = GetConnInner::Done;
return match result {
Ok(mut c) => {
c.inner.pool = Some(pool);
c.inner.reset_upon_returning_to_a_pool =
self.reset_upon_returning_to_a_pool;
Poll::Ready(Ok(c))
}
Err(e) => {
pool.cancel_connection();
Poll::Ready(Err(e))
}
};
}
GetConnInner::Checking(ref mut f) => {
let result = ready!(Pin::new(f).poll(cx));
match result {
Ok(mut c) => {
self.inner = GetConnInner::Done;
let pool = self.pool_take();
c.inner.pool = Some(pool);
c.inner.reset_upon_returning_to_a_pool =
self.reset_upon_returning_to_a_pool;
return Poll::Ready(Ok(c));
}
Err(_) => {
self.inner = GetConnInner::New;
let pool = self.pool_mut();
pool.cancel_connection();
continue;
}
}
}
}
}
}
}
impl Drop for GetConn {
fn drop(&mut self) {
if let Some(pool) = self.pool.take() {
pool.unqueue(self.queue_id);
if let GetConnInner::Connecting(..) = self.inner {
pool.cancel_connection();
}
}
}
}