#![warn(missing_docs)]
mod config;
pub mod executor;
pub mod runtime;
mod timer;
pub use config::Builder;
use config::Config;
pub use executor::Executor;
pub use futures;
use futures::channel::mpsc;
use futures::lock::{Mutex, MutexGuard};
use futures::Future;
use futures::FutureExt;
use futures::StreamExt;
use log::debug;
use log::error;
use std::cmp;
use std::error;
use std::fmt;
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Weak};
use std::time::{Duration, Instant};
static CONNECTION_ID: AtomicUsize = AtomicUsize::new(0);
pub enum Error<E> {
Inner(E),
Timeout,
}
impl<E> From<E> for Error<E> {
fn from(e: E) -> Error<E> {
Error::Inner(e)
}
}
impl<E> fmt::Display for Error<E>
where
E: error::Error + 'static,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Error::Inner(ref err) => write!(f, "{}", err),
Error::Timeout => write!(f, "Timed out in mobc"),
}
}
}
impl<E> fmt::Debug for Error<E>
where
E: error::Error + 'static,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Error::Inner(ref err) => write!(f, "{:?}", err),
Error::Timeout => write!(f, "Timed out in mobc"),
}
}
}
pub type AnyFuture<T, E> = Pin<Box<dyn Future<Output = Result<T, E>> + Send>>;
pub trait ConnectionManager: Send + Sync + 'static {
type Connection: Send + 'static;
type Error: error::Error + Send + Sync + 'static;
type Executor: Executor;
fn get_executor(&self) -> Self::Executor;
fn connect(&self) -> AnyFuture<Self::Connection, Self::Error>;
fn is_valid(&self, conn: Self::Connection) -> AnyFuture<Self::Connection, Self::Error>;
fn has_broken(&self, conn: &mut Option<Self::Connection>) -> bool;
}
struct Conn<C> {
raw: Option<C>,
id: u64,
birth: Instant,
}
struct IdleConn<C> {
conn: Conn<C>,
idle_start: Instant,
}
struct PoolInternals<C> {
conns: mpsc::Sender<IdleConn<C>>,
num_conns: u32,
idle_conns: u32,
pending_conns: u32,
is_initial_done: bool,
initial_done: mpsc::Sender<()>,
}
struct SharedPool<M>
where
M: ConnectionManager,
{
config: Config<M::Executor>,
manager: M,
internals: Mutex<PoolInternals<M::Connection>>,
conns: Mutex<mpsc::Receiver<IdleConn<M::Connection>>>,
initial_wg: Mutex<mpsc::Receiver<()>>,
last_error: Mutex<Option<Error<M::Error>>>,
}
pub struct Pool<M>(Arc<SharedPool<M>>)
where
M: ConnectionManager;
impl<M> Clone for Pool<M>
where
M: ConnectionManager,
{
fn clone(&self) -> Self {
Pool(self.0.clone())
}
}
impl<M> Pool<M>
where
M: ConnectionManager,
{
pub async fn new(manager: M) -> Result<Pool<M>, Error<M::Error>> {
Pool::builder().build(manager).await
}
pub fn builder() -> Builder<M> {
Builder::new()
}
async fn new_inner(config: Config<M::Executor>, manager: M, reaper_rate: Duration) -> Pool<M> {
let (recycle, conns) = mpsc::channel(config.max_size as usize);
let initial_size = config.min_idle.unwrap_or(config.max_size);
let (initial_done, initial_wg) = mpsc::channel(initial_size as usize);
let internals = PoolInternals {
conns: recycle,
num_conns: 0,
pending_conns: 0,
idle_conns: 0,
is_initial_done: false,
initial_done,
};
let shared = Arc::new(SharedPool {
config: config,
manager: manager,
internals: Mutex::new(internals),
conns: Mutex::new(conns),
initial_wg: Mutex::new(initial_wg),
last_error: Mutex::new(None),
});
let mut internals = shared.internals.lock().await;
establish_idle_connections(&shared, &mut internals);
if shared.config.max_lifetime.is_some() || shared.config.idle_timeout.is_some() {
reap_connections(&shared, reaper_rate);
}
drop(internals);
Pool(shared)
}
pub async fn get(&self) -> Result<PooledConnection<M>, Error<M::Error>> {
self.get_timeout(self.0.config.connection_timeout).await
}
pub async fn get_timeout(&self, dur: Duration) -> Result<PooledConnection<M>, Error<M::Error>> {
let timeout = timer::timeout(dur);
let lock_and_get = async {
debug!("try to lock the conns");
let mut conns = self.0.conns.lock().await;
conns.next().await
};
debug!("waiting for get timeout");
futures::select! {
() = timeout.fuse() => Err(Error::Timeout),
r = lock_and_get.fuse() => match r {
Some(conn) => {
debug!("get conn");
let mut internals = self.0.internals.lock().await;
internals.idle_conns -= 1;
establish_idle_connections(&self.0, &mut internals);
return Ok(PooledConnection {
pool: Some(self.clone()),
conn: Some(conn.conn),
})
}
None => Err(Error::Timeout),
}
}
}
pub async fn try_get(&self) -> Option<PooledConnection<M>> {
let mut conns = self.0.conns.lock().await;
match conns.try_next() {
Ok(Some(conn)) => {
let mut internals = self.0.internals.lock().await;
internals.idle_conns -= 1;
Some(PooledConnection {
pool: Some(self.clone()),
conn: Some(conn.conn),
})
}
_ => None,
}
}
async fn wait_for_initialization(&self) -> Result<(), Error<M::Error>> {
debug!("waiting for initialization");
let mut timeout = timer::timeout(self.0.config.connection_timeout).fuse();
let initial_size = self.0.config.min_idle.unwrap_or(self.0.config.max_size);
let mut initial_wg = self.0.initial_wg.lock().await;
let mut initial_count = 0;
loop {
futures::select! {
() = timeout => break,
_ = initial_wg.next() => {
initial_count += 1;
let mut internals = self.0.internals.lock().await;
if initial_count == initial_size {
debug!("initial ok");
internals.is_initial_done = true;
break
}
}
}
}
if let Some(e) = self.0.last_error.lock().await.take() {
return Err(e);
}
Ok(())
}
fn put_back(self, _checkout: Instant, mut conn: Conn<M::Connection>) {
let _ = self.0.config.executor.clone().spawn(Box::pin(async move {
let broken = conn.raw.is_none() || self.0.manager.has_broken(&mut conn.raw);
let mut internals = self.0.internals.lock().await;
if broken {
drop_conns(&self.0, internals, vec![conn]);
return;
} else {
let conn = IdleConn {
conn,
idle_start: Instant::now(),
};
debug!("put back");
internals.conns.try_send(conn).unwrap();
internals.idle_conns += 1;
}
}));
}
pub async fn state(&self) -> State {
let internals = self.0.internals.lock().await;
State {
connections: internals.num_conns,
idle_connections: internals.idle_conns,
_p: (),
}
}
}
fn drop_conns<M>(
shared: &Arc<SharedPool<M>>,
mut internals: MutexGuard<PoolInternals<M::Connection>>,
conn: Vec<Conn<M::Connection>>,
) where
M: ConnectionManager,
{
internals.num_conns -= conn.len() as u32;
establish_idle_connections(shared, &mut internals);
drop(internals);
}
fn establish_idle_connections<M>(
shared: &Arc<SharedPool<M>>,
internals: &mut PoolInternals<M::Connection>,
) where
M: ConnectionManager,
{
debug!(
"num_conns {}, pending_conns {}, max_size {}",
internals.num_conns, internals.pending_conns, shared.config.max_size
);
if internals.num_conns + internals.pending_conns >= shared.config.max_size {
return;
}
let min = shared.config.min_idle.unwrap_or(shared.config.max_size);
let idle = internals.idle_conns as u32;
debug!(
"idle {} min {}, {}, {}",
idle, min, internals.num_conns, internals.pending_conns,
);
for _ in idle..min {
add_connection(shared, internals);
}
}
fn add_connection<M>(shared: &Arc<SharedPool<M>>, internals: &mut PoolInternals<M::Connection>)
where
M: ConnectionManager,
{
debug!("add connection");
internals.pending_conns += 1;
inner(Duration::from_secs(0), shared);
fn inner<M>(delay: Duration, shared: &Arc<SharedPool<M>>)
where
M: ConnectionManager,
{
let new_shared = Arc::downgrade(shared);
let _ = shared.config.executor.clone().spawn(Box::pin(async move {
let shared = match new_shared.upgrade() {
Some(shared) => shared,
None => return,
};
let conn = shared.manager.connect().await;
match conn {
Ok(conn) => {
debug!("adding connection");
let id = CONNECTION_ID.fetch_add(1, Ordering::Relaxed) as u64;
let mut internals = shared.internals.lock().await;
*shared.last_error.lock().await = None;
let now = Instant::now();
let mut conn = IdleConn {
conn: Conn {
raw: Some(conn),
birth: now,
id,
},
idle_start: now,
};
loop {
match internals.conns.try_send(conn) {
Ok(()) => break,
Err(c) => conn = c.into_inner(),
}
}
internals.pending_conns -= 1;
internals.idle_conns += 1;
internals.num_conns += 1;
if !internals.is_initial_done {
internals.initial_done.try_send(()).unwrap();
}
drop(internals);
}
Err(err) => {
error!("mobc failed to connect: {:?}", err);
*shared.last_error.lock().await = Some(Error::Inner(err));
let mut internals = shared.internals.lock().await;
if !internals.is_initial_done {
if internals.initial_done.try_send(()).err().is_some() {
return;
};
}
let delay = cmp::max(Duration::from_millis(200), delay);
let delay = cmp::min(shared.config.connection_timeout / 2, delay * 2);
inner(delay, &shared);
}
}
}));
}
}
fn reap_connections<M>(shared: &Arc<SharedPool<M>>, reaper_rate: Duration)
where
M: ConnectionManager,
{
let new_shared = Arc::downgrade(shared);
let _ = shared
.manager
.get_executor()
.clone()
.spawn(Box::pin(async move {
let mut interval = timer::interval(reaper_rate);
#[cfg(feature = "tokio-runtime")]
#[cfg(not(feature = "async-std-runtime"))]
timer::timeout(reaper_rate).await;
loop {
interval.tick().await;
debug!("start reaping");
reap_conn(&new_shared).await;
}
}));
async fn reap_conn<M>(shared: &Weak<SharedPool<M>>)
where
M: ConnectionManager,
{
let shared = match shared.upgrade() {
Some(shared) => shared,
None => return,
};
let mut to_drop = vec![];
let mut internals = shared.internals.lock().await;
let mut conns = shared.conns.lock().await;
let mut checked_num: u32 = 0;
let now = Instant::now();
while let Ok(Some(conn)) = conns.try_next() {
let mut reap = false;
if let Some(timeout) = shared.config.idle_timeout {
debug!("idle time {:?}", now - conn.idle_start);
reap |= now - conn.idle_start >= timeout;
}
if let Some(lifetime) = shared.config.max_lifetime {
reap |= now - conn.conn.birth >= lifetime;
}
debug!("reap => {}", reap);
if reap {
to_drop.push(conn.conn);
} else {
internals.conns.try_send(conn).unwrap()
}
checked_num += 1;
if checked_num == internals.idle_conns {
break;
}
}
debug!("no more conns");
internals.idle_conns -= to_drop.len() as u32;
drop_conns(&shared, internals, to_drop);
debug!("reap finish");
}
}
pub struct State {
pub connections: u32,
pub idle_connections: u32,
_p: (),
}
impl fmt::Debug for State {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("State")
.field("connections", &self.connections)
.field("idle_connections", &self.idle_connections)
.finish()
}
}
pub struct PooledConnection<M>
where
M: ConnectionManager,
{
pool: Option<Pool<M>>,
conn: Option<Conn<M::Connection>>,
}
impl<M> PooledConnection<M>
where
M: ConnectionManager,
{
pub fn take_raw_conn(&mut self) -> M::Connection {
self.conn.as_mut().unwrap().raw.take().unwrap()
}
pub fn set_raw_conn(&mut self, raw: M::Connection) {
self.conn.as_mut().unwrap().raw = Some(raw);
}
}
impl<M> Drop for PooledConnection<M>
where
M: ConnectionManager,
{
fn drop(&mut self) {
self.pool
.take()
.unwrap()
.put_back(Instant::now(), self.conn.take().unwrap());
}
}
impl<M> Deref for PooledConnection<M>
where
M: ConnectionManager,
{
type Target = M::Connection;
fn deref(&self) -> &Self::Target {
&self.conn.as_ref().unwrap().raw.as_ref().unwrap()
}
}
impl<M> DerefMut for PooledConnection<M>
where
M: ConnectionManager,
{
fn deref_mut(&mut self) -> &mut M::Connection {
self.conn.as_mut().unwrap().raw.as_mut().unwrap()
}
}