use std::error;
use std::fmt;
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};
use std::time::Duration;
use async_trait::async_trait;
use crate::inner::PoolInner;
use crate::internals::Conn;
pub use crate::internals::State;
pub struct Pool<M>
where
M: ManageConnection,
{
pub(crate) inner: PoolInner<M>,
}
impl<M> Clone for Pool<M>
where
M: ManageConnection,
{
fn clone(&self) -> Self {
Pool {
inner: self.inner.clone(),
}
}
}
impl<M> fmt::Debug for Pool<M>
where
M: ManageConnection,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_fmt(format_args!("Pool({:?})", self.inner))
}
}
impl<M: ManageConnection> Pool<M> {
pub fn builder() -> Builder<M> {
Builder::new()
}
pub fn state(&self) -> State {
self.inner.state()
}
pub async fn get(&self) -> Result<PooledConnection<'_, M>, RunError<M::Error>> {
self.inner.get().await
}
pub async fn dedicated_connection(&self) -> Result<M::Connection, M::Error> {
self.inner.connect().await
}
}
#[derive(Debug)]
pub struct Builder<M: ManageConnection> {
pub(crate) max_size: u32,
pub(crate) min_idle: Option<u32>,
pub(crate) test_on_check_out: bool,
pub(crate) max_lifetime: Option<Duration>,
pub(crate) idle_timeout: Option<Duration>,
pub(crate) connection_timeout: Duration,
pub(crate) error_sink: Box<dyn ErrorSink<M::Error>>,
pub(crate) reaper_rate: Duration,
pub(crate) connection_customizer: Option<Box<dyn CustomizeConnection<M::Connection, M::Error>>>,
_p: PhantomData<M>,
}
impl<M: ManageConnection> Default for Builder<M> {
fn default() -> Self {
Builder {
max_size: 10,
min_idle: None,
test_on_check_out: true,
max_lifetime: Some(Duration::from_secs(30 * 60)),
idle_timeout: Some(Duration::from_secs(10 * 60)),
connection_timeout: Duration::from_secs(30),
error_sink: Box::new(NopErrorSink),
reaper_rate: Duration::from_secs(30),
connection_customizer: None,
_p: PhantomData,
}
}
}
impl<M: ManageConnection> Builder<M> {
pub fn new() -> Builder<M> {
Default::default()
}
pub fn max_size(mut self, max_size: u32) -> Builder<M> {
assert!(max_size > 0, "max_size must be greater than zero!");
self.max_size = max_size;
self
}
pub fn min_idle(mut self, min_idle: Option<u32>) -> Builder<M> {
self.min_idle = min_idle;
self
}
pub fn test_on_check_out(mut self, test_on_check_out: bool) -> Builder<M> {
self.test_on_check_out = test_on_check_out;
self
}
pub fn max_lifetime(mut self, max_lifetime: Option<Duration>) -> Builder<M> {
assert!(
max_lifetime != Some(Duration::from_secs(0)),
"max_lifetime must be greater than zero!"
);
self.max_lifetime = max_lifetime;
self
}
pub fn idle_timeout(mut self, idle_timeout: Option<Duration>) -> Builder<M> {
assert!(
idle_timeout != Some(Duration::from_secs(0)),
"idle_timeout must be greater than zero!"
);
self.idle_timeout = idle_timeout;
self
}
pub fn connection_timeout(mut self, connection_timeout: Duration) -> Builder<M> {
assert!(
connection_timeout > Duration::from_secs(0),
"connection_timeout must be non-zero"
);
self.connection_timeout = connection_timeout;
self
}
pub fn error_sink(mut self, error_sink: Box<dyn ErrorSink<M::Error>>) -> Builder<M> {
self.error_sink = error_sink;
self
}
#[allow(dead_code)]
pub fn reaper_rate(mut self, reaper_rate: Duration) -> Builder<M> {
self.reaper_rate = reaper_rate;
self
}
pub fn connection_customizer(
mut self,
connection_customizer: Box<dyn CustomizeConnection<M::Connection, M::Error>>,
) -> Builder<M> {
self.connection_customizer = Some(connection_customizer);
self
}
fn build_inner(self, manager: M) -> Pool<M> {
if let Some(min_idle) = self.min_idle {
assert!(
self.max_size >= min_idle,
"min_idle must be no larger than max_size"
);
}
Pool {
inner: PoolInner::new(self, manager),
}
}
pub async fn build(self, manager: M) -> Result<Pool<M>, M::Error> {
let pool = self.build_inner(manager);
pool.inner.start_connections().await.map(|()| pool)
}
pub fn build_unchecked(self, manager: M) -> Pool<M> {
let p = self.build_inner(manager);
p.inner.spawn_start_connections();
p
}
}
#[async_trait]
pub trait ManageConnection: Sized + Send + Sync + 'static {
type Connection: Send + 'static;
type Error: fmt::Debug + Send + 'static;
async fn connect(&self) -> Result<Self::Connection, Self::Error>;
async fn is_valid(&self, conn: &mut PooledConnection<'_, Self>) -> Result<(), Self::Error>;
fn has_broken(&self, conn: &mut Self::Connection) -> bool;
}
#[async_trait]
pub trait CustomizeConnection<C: Send + 'static, E: 'static>:
std::fmt::Debug + Send + Sync + 'static
{
async fn on_acquire(&self, _connection: &mut C) -> Result<(), E> {
Ok(())
}
}
pub struct PooledConnection<'a, M>
where
M: ManageConnection,
{
pool: &'a PoolInner<M>,
conn: Option<Conn<M::Connection>>,
}
impl<'a, M> PooledConnection<'a, M>
where
M: ManageConnection,
{
pub(crate) fn new(pool: &'a PoolInner<M>, conn: Conn<M::Connection>) -> Self {
Self {
pool,
conn: Some(conn),
}
}
pub(crate) fn drop_invalid(mut self) {
let _ = self.conn.take();
}
}
impl<'a, M> Deref for PooledConnection<'a, M>
where
M: ManageConnection,
{
type Target = M::Connection;
fn deref(&self) -> &M::Connection {
&self.conn.as_ref().unwrap().conn
}
}
impl<'a, M> DerefMut for PooledConnection<'a, M>
where
M: ManageConnection,
{
fn deref_mut(&mut self) -> &mut M::Connection {
&mut self.conn.as_mut().unwrap().conn
}
}
impl<'a, M> fmt::Debug for PooledConnection<'a, M>
where
M: ManageConnection,
M::Connection: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(&self.conn.as_ref().unwrap().conn, fmt)
}
}
impl<'a, M> Drop for PooledConnection<'a, M>
where
M: ManageConnection,
{
fn drop(&mut self) {
self.pool.put_back(self.conn.take());
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RunError<E> {
User(E),
TimedOut,
}
impl<E> fmt::Display for RunError<E>
where
E: error::Error + 'static,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
RunError::User(ref err) => write!(f, "{}", err),
RunError::TimedOut => write!(f, "Timed out in bb8"),
}
}
}
impl<E> error::Error for RunError<E>
where
E: error::Error + 'static,
{
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match *self {
RunError::User(ref err) => Some(err),
RunError::TimedOut => None,
}
}
}
impl<E> From<E> for RunError<E>
where
E: error::Error,
{
fn from(error: E) -> Self {
Self::User(error)
}
}
pub trait ErrorSink<E>: fmt::Debug + Send + Sync + 'static {
fn sink(&self, error: E);
fn boxed_clone(&self) -> Box<dyn ErrorSink<E>>;
}
#[derive(Debug, Clone, Copy)]
pub struct NopErrorSink;
impl<E> ErrorSink<E> for NopErrorSink {
fn sink(&self, _: E) {}
fn boxed_clone(&self) -> Box<dyn ErrorSink<E>> {
Box::new(*self)
}
}