use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use futures_core::ready;
use tokio::sync::mpsc::UnboundedSender;
use crate::{
conn::pool::{Inner, Pool, QUEUE_END_ID},
error::Error,
Conn,
};
use std::sync::{atomic, Arc};
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct DisconnectPool {
pool_inner: Arc<Inner>,
drop: Option<UnboundedSender<Option<Conn>>>,
}
impl DisconnectPool {
pub(crate) fn new(pool: Pool) -> Self {
Self {
pool_inner: pool.inner,
drop: Some(pool.drop),
}
}
}
impl Future for DisconnectPool {
type Output = Result<(), Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.pool_inner.close.store(true, atomic::Ordering::Release);
let mut exchange = self.pool_inner.exchange.lock().unwrap();
exchange.spawn_futures_if_needed(&self.pool_inner);
exchange.waiting.push(cx.waker().clone(), QUEUE_END_ID);
drop(exchange);
if self.pool_inner.closed.load(atomic::Ordering::Acquire) {
Poll::Ready(Ok(()))
} else {
match self.drop.take() {
Some(drop) => match drop.send(None) {
Ok(_) => {
ready!(Box::pin(drop.closed()).as_mut().poll(cx));
Poll::Ready(Ok(()))
}
Err(_) => {
Poll::Ready(Ok(()))
}
},
None => Poll::Pending,
}
}
}
}