use tokio::sync::mpsc;
use super::Connection;
use crate::runtime::AcknowledgedMessage;
pub(super) fn channel() -> (PoolManager, ManagementRequestReceiver) {
let (sender, receiver) = mpsc::unbounded_channel();
(
PoolManager { sender },
ManagementRequestReceiver { receiver },
)
}
#[derive(Clone, Debug)]
pub(super) struct PoolManager {
sender: mpsc::UnboundedSender<PoolManagementRequest>,
}
impl PoolManager {
pub(super) fn clear(&self) {
let _ = self.sender.send(PoolManagementRequest::Clear);
}
pub(super) async fn mark_as_ready(&self) {
let (message, acknowledgment_receiver) = AcknowledgedMessage::package(());
if self
.sender
.send(PoolManagementRequest::MarkAsReady {
completion_handler: message,
})
.is_ok()
{
acknowledgment_receiver.wait_for_acknowledgment().await;
}
}
pub(crate) fn check_in(&self, connection: Connection) -> std::result::Result<(), Connection> {
if let Err(request) = self.sender.send(PoolManagementRequest::CheckIn(connection)) {
let conn = request.0.unwrap_check_in();
return Err(conn);
}
Ok(())
}
pub(super) fn handle_connection_failed(&self) {
let _ = self
.sender
.send(PoolManagementRequest::HandleConnectionFailed);
}
pub(super) fn handle_connection_succeeded(&self, connection: Option<Connection>) {
let _ = self
.sender
.send(PoolManagementRequest::HandleConnectionSucceeded(connection));
}
}
#[derive(Debug)]
pub(super) struct ManagementRequestReceiver {
pub(super) receiver: mpsc::UnboundedReceiver<PoolManagementRequest>,
}
impl ManagementRequestReceiver {
pub(super) async fn recv(&mut self) -> Option<PoolManagementRequest> {
self.receiver.recv().await
}
}
#[derive(Debug)]
pub(super) enum PoolManagementRequest {
Clear,
MarkAsReady {
completion_handler: AcknowledgedMessage<()>,
},
CheckIn(Connection),
HandleConnectionFailed,
HandleConnectionSucceeded(Option<Connection>),
}
impl PoolManagementRequest {
fn unwrap_check_in(self) -> Connection {
match self {
PoolManagementRequest::CheckIn(conn) => conn,
_ => panic!("tried to unwrap checkin but got {:?}", self),
}
}
}