use crossbeam_queue::SegQueue;
use futures::channel::oneshot;
use std::fmt;
use std::sync::Arc;
use crate::manage_connection::ManageConnection;
use crate::queue::{Live, Queue};
use crate::Config;
use crate::Error;
pub struct ConnectionPool<C: ManageConnection + Send> {
pub(crate) conns: Arc<Queue<C::Connection>>,
pub(crate) waiting: SegQueue<oneshot::Sender<Live<C::Connection>>>,
manager: C,
config: Arc<Config>,
}
impl<C: ManageConnection + Send + fmt::Debug> fmt::Debug for ConnectionPool<C> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("ConnectionPool")
.field("waiting_count", &self.waiting.len())
.field("manager", &self.manager)
.field("config", &self.config)
.finish()
}
}
impl<C: ManageConnection> ConnectionPool<C> {
pub fn new(conns: Queue<C::Connection>, manager: C, config: Arc<Config>) -> ConnectionPool<C> {
ConnectionPool {
conns: Arc::new(conns),
waiting: SegQueue::new(),
manager,
config,
}
}
pub fn max_size(&self) -> usize {
self.config.max_size
}
pub async fn connect(&self) -> Result<C::Connection, Error<C::Error>> {
self.manager.connect().await
}
pub fn notify_of_connection(&self, tx: oneshot::Sender<Live<C::Connection>>) {
self.waiting.push(tx);
}
pub fn try_waiting(
&self,
) -> Option<oneshot::Sender<Live<<C as ManageConnection>::Connection>>> {
self.waiting.pop().ok()
}
pub async fn is_valid(&self, conn: &mut C::Connection) -> Result<(), Error<C::Error>> {
self.manager.is_valid(conn).await
}
pub fn has_broken(&self, conn: &mut Live<C::Connection>) -> bool {
self.manager.has_broken(&mut conn.conn)
}
}