#![deny(missing_docs, missing_debug_implementations)]
extern crate futures;
extern crate tokio_executor;
extern crate tokio_timer;
use std::borrow::BorrowMut;
use std::cmp::{max, min};
use std::collections::VecDeque;
use std::fmt;
use std::iter::FromIterator;
use std::marker::PhantomData;
use std::mem;
use std::sync::{Arc, Mutex, MutexGuard, Weak};
use std::time::{Duration, Instant};
use futures::future::{lazy, loop_fn, ok, Either, Loop};
use futures::prelude::*;
use futures::stream::FuturesUnordered;
use futures::sync::oneshot;
use tokio_executor::spawn;
use tokio_timer::{Interval, Timeout};
mod util;
use util::*;
pub trait ManageConnection: Send + Sync + 'static {
type Connection: Send + 'static;
type Error: Send + 'static;
fn connect(&self) -> Box<Future<Item = Self::Connection, Error = Self::Error> + Send>;
fn is_valid(
&self,
conn: Self::Connection,
) -> Box<Future<Item = Self::Connection, Error = (Self::Error, Self::Connection)> + Send>;
fn has_broken(&self, conn: &mut Self::Connection) -> bool;
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RunError<E> {
User(E),
TimedOut,
}
pub trait ErrorSink<E>: fmt::Debug + Send + Sync + 'static {
fn sink(&self, error: E);
fn boxed_clone(&self) -> Box<ErrorSink<E>>;
}
#[derive(Debug, Clone, Copy)]
pub struct NopErrorSink;
impl<E> ErrorSink<E> for NopErrorSink {
fn sink(&self, _: E) {}
fn boxed_clone(&self) -> Box<ErrorSink<E>> {
Box::new(self.clone())
}
}
pub struct State {
pub connections: u32,
pub idle_connections: u32,
_p: (),
}
impl fmt::Debug for State {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("State")
.field("connections", &self.connections)
.field("idle_connections", &self.idle_connections)
.finish()
}
}
#[derive(Debug)]
struct Conn<C>
where
C: Send,
{
conn: C,
birth: Instant,
}
struct IdleConn<C>
where
C: Send,
{
conn: Conn<C>,
idle_start: Instant,
}
impl<C> IdleConn<C>
where
C: Send,
{
fn make_idle(conn: Conn<C>) -> IdleConn<C> {
let now = Instant::now();
IdleConn {
conn: conn,
idle_start: now,
}
}
}
#[derive(Debug)]
pub struct Builder<M: ManageConnection> {
max_size: u32,
min_idle: Option<u32>,
test_on_check_out: bool,
max_lifetime: Option<Duration>,
idle_timeout: Option<Duration>,
connection_timeout: Duration,
error_sink: Box<ErrorSink<M::Error>>,
reaper_rate: Duration,
_p: PhantomData<M>,
}
impl<M: ManageConnection> Default for Builder<M> {
fn default() -> Self {
Builder {
max_size: 10,
min_idle: None,
test_on_check_out: true,
max_lifetime: Some(Duration::from_secs(30 * 60)),
idle_timeout: Some(Duration::from_secs(10 * 60)),
connection_timeout: Duration::from_secs(30),
error_sink: Box::new(NopErrorSink),
reaper_rate: Duration::from_secs(30),
_p: PhantomData,
}
}
}
impl<M: ManageConnection> Builder<M> {
pub fn new() -> Builder<M> {
Default::default()
}
pub fn max_size(mut self, max_size: u32) -> Builder<M> {
assert!(max_size > 0, "max_size must be greater than zero!");
self.max_size = max_size;
self
}
pub fn min_idle(mut self, min_idle: Option<u32>) -> Builder<M> {
self.min_idle = min_idle;
self
}
pub fn test_on_check_out(mut self, test_on_check_out: bool) -> Builder<M> {
self.test_on_check_out = test_on_check_out;
self
}
pub fn max_lifetime(mut self, max_lifetime: Option<Duration>) -> Builder<M> {
assert!(
max_lifetime != Some(Duration::from_secs(0)),
"max_lifetime must be greater than zero!"
);
self.max_lifetime = max_lifetime;
self
}
pub fn idle_timeout(mut self, idle_timeout: Option<Duration>) -> Builder<M> {
assert!(
idle_timeout != Some(Duration::from_secs(0)),
"idle_timeout must be greater than zero!"
);
self.idle_timeout = idle_timeout;
self
}
pub fn connection_timeout(mut self, connection_timeout: Duration) -> Builder<M> {
assert!(
connection_timeout > Duration::from_secs(0),
"connection_timeout must be non-zero"
);
self.connection_timeout = connection_timeout;
self
}
pub fn error_sink(mut self, error_sink: Box<ErrorSink<M::Error>>) -> Builder<M> {
self.error_sink = error_sink;
self
}
#[allow(dead_code)]
pub fn reaper_rate(mut self, reaper_rate: Duration) -> Builder<M> {
self.reaper_rate = reaper_rate;
self
}
fn build_inner(self, manager: M) -> (Pool<M>, impl Future<Item = (), Error = M::Error> + Send) {
if let Some(min_idle) = self.min_idle {
assert!(
self.max_size >= min_idle,
"min_idle must be no larger than max_size"
);
}
let p = Pool::new_inner(self, manager);
let f = p.replenish_idle_connections();
(p, f)
}
pub fn build(self, manager: M) -> impl Future<Item = Pool<M>, Error = M::Error> + Send {
let (p, f) = self.build_inner(manager);
f.map(|_| p)
}
pub fn build_unchecked(self, manager: M) -> Pool<M> {
let (p, f) = self.build_inner(manager);
p.spawn(p.sink_error(f));
p
}
}
#[allow(missing_debug_implementations)]
struct PoolInternals<C>
where
C: Send,
{
waiters: VecDeque<oneshot::Sender<Conn<C>>>,
conns: VecDeque<IdleConn<C>>,
num_conns: u32,
pending_conns: u32,
}
impl<C> PoolInternals<C>
where
C: Send,
{
fn put_idle_conn(&mut self, mut conn: IdleConn<C>) {
loop {
if let Some(waiter) = self.waiters.pop_front() {
match waiter.send(conn.conn) {
Ok(_) => break,
Err(c) => conn.conn = c,
}
} else {
self.conns.push_back(conn);
break;
}
}
}
}
#[allow(missing_debug_implementations)]
struct SharedPool<M>
where
M: ManageConnection + Send,
{
statics: Builder<M>,
manager: M,
internals: Mutex<PoolInternals<M::Connection>>,
}
impl<M> SharedPool<M>
where
M: ManageConnection,
{
fn spawn<R>(&self, runnable: R)
where
R: IntoFuture<Item = (), Error = ()>,
R::Future: Send + 'static,
{
spawn(runnable.into_future());
}
fn sink_error<'a, E, F>(&self, f: F) -> impl Future<Item = F::Item, Error = ()> + Send + 'a
where
F: Future<Error = E> + Send + 'a,
E: Into<M::Error>,
{
let sink = self.statics.error_sink.boxed_clone();
f.map_err(move |e| sink.sink(e.into()))
}
fn or_timeout<'a, F>(
&self,
f: F,
) -> impl Future<Item = Option<F::Item>, Error = F::Error> + Send + 'a
where
F: IntoFuture + Send,
F::Future: Send + 'a,
F::Item: Send + 'a,
F::Error: Send + ::std::fmt::Debug + 'a,
{
let runnable = f.into_future();
Timeout::new(runnable, self.statics.connection_timeout).then(|r| match r {
Ok(item) => Ok(Some(item)),
Err(ref e) if e.is_elapsed() || e.is_timer() => Ok(None),
Err(e) => Err(e.into_inner().unwrap()),
})
}
}
pub struct Pool<M>
where
M: ManageConnection,
{
inner: Arc<SharedPool<M>>,
}
impl<M> Clone for Pool<M>
where
M: ManageConnection,
{
fn clone(&self) -> Self {
Pool {
inner: self.inner.clone(),
}
}
}
impl<M> fmt::Debug for Pool<M>
where
M: ManageConnection,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_fmt(format_args!("Pool({:p})", self.inner))
}
}
fn add_connection<M>(
pool: &Arc<SharedPool<M>>,
internals: &mut PoolInternals<M::Connection>,
) -> impl Future<Item = (), Error = M::Error> + Send
where
M: ManageConnection,
{
assert!(internals.num_conns + internals.pending_conns < pool.statics.max_size);
internals.pending_conns += 1;
fn do_it<M>(pool: &Arc<SharedPool<M>>) -> impl Future<Item = (), Error = M::Error> + Send
where
M: ManageConnection,
{
let new_shared = Arc::downgrade(pool);
let (tx, rx) = oneshot::channel();
spawn(lazy(move || {
match new_shared.upgrade() {
None => Either::A(ok(())),
Some(shared) => {
Either::B(shared.manager.connect().then(move |result| {
let mut locked = shared.internals.lock().unwrap();
match result {
Ok(conn) => {
let now = Instant::now();
let conn = IdleConn {
conn: Conn {
conn: conn,
birth: now,
},
idle_start: now,
};
locked.pending_conns -= 1;
locked.num_conns += 1;
locked.put_idle_conn(conn);
tx.send(Ok(())).map_err(|_| ())
}
Err(err) => {
locked.pending_conns -= 1;
tx.send(Err(err)).map_err(|_| ())
}
}
}))
}
}
}));
rx.then(|v| match v {
Ok(o) => o,
Err(_) => panic!(),
})
}
do_it(pool)
}
fn get_idle_connection<M>(
inner: Arc<SharedPool<M>>,
) -> impl Future<Item = Conn<M::Connection>, Error = Arc<SharedPool<M>>> + Send
where
M: ManageConnection + Send,
M::Connection: Send,
M::Error: Send,
{
loop_fn(inner, |inner| {
let pool = inner.clone();
let mut internals = inner.internals.lock().unwrap();
if let Some(conn) = internals.conns.pop_front() {
if internals.num_conns + internals.pending_conns < pool.statics.max_size {
let f = Pool::replenish_idle_connections_locked(&pool, &mut internals);
pool.spawn(pool.sink_error(f));
}
mem::drop(internals);
if pool.statics.test_on_check_out {
let birth = conn.conn.birth;
Either::A(
pool.manager
.is_valid(conn.conn.conn)
.then(move |r| match r {
Ok(conn) => Ok(Loop::Break(Conn {
conn: conn,
birth: birth,
})),
Err((_, conn)) => {
{
let mut locked = pool.internals.lock().unwrap();
drop_connections(&pool, &mut locked, vec![conn]);
}
Ok(Loop::Continue(pool))
}
}),
)
} else {
Either::B(Ok(Loop::Break(conn.conn)).into_future())
}
} else {
Either::B(Err(pool).into_future())
}
})
}
fn drop_connections<'a, L, M>(
pool: &Arc<SharedPool<M>>,
mut internals: L,
to_drop: Vec<M::Connection>,
) -> Box<Future<Item = (), Error = M::Error> + Send>
where
L: BorrowMut<MutexGuard<'a, PoolInternals<M::Connection>>>,
M: ManageConnection,
{
let internals = internals.borrow_mut();
internals.num_conns -= to_drop.len() as u32;
let f = if internals.num_conns + internals.pending_conns < pool.statics.max_size {
Either::A(Pool::replenish_idle_connections_locked(
pool,
&mut *internals,
))
} else {
Either::B(ok(()))
};
mem::drop(internals);
Box::new(f)
}
fn drop_idle_connections<'a, M>(
pool: &Arc<SharedPool<M>>,
internals: MutexGuard<'a, PoolInternals<M::Connection>>,
to_drop: Vec<IdleConn<M::Connection>>,
) -> Box<Future<Item = (), Error = M::Error> + Send>
where
M: ManageConnection,
{
let to_drop = to_drop.into_iter().map(|c| c.conn.conn).collect();
drop_connections(pool, internals, to_drop)
}
fn reap_connections<'a, M>(
pool: &Arc<SharedPool<M>>,
mut internals: MutexGuard<'a, PoolInternals<M::Connection>>,
) -> impl Future<Item = (), Error = M::Error> + Send
where
M: ManageConnection,
{
let now = Instant::now();
let (to_drop, preserve) = internals.conns.drain(..).partition2(|conn| {
let mut reap = false;
if let Some(timeout) = pool.statics.idle_timeout {
reap |= now - conn.idle_start >= timeout;
}
if let Some(lifetime) = pool.statics.max_lifetime {
reap |= now - conn.conn.birth >= lifetime;
}
reap
});
internals.conns = preserve;
drop_idle_connections(pool, internals, to_drop)
}
fn schedule_one_reaping<M>(
pool: &SharedPool<M>,
interval: Interval,
weak_shared: Weak<SharedPool<M>>,
) where
M: ManageConnection,
{
pool.spawn(
interval
.into_future()
.map_err(|_| ())
.and_then(move |(_, interval)| match weak_shared.upgrade() {
None => Either::A(ok(())),
Some(shared) => {
let shared2 = shared.clone();
let locked = shared.internals.lock().unwrap();
Either::B(
shared
.sink_error(reap_connections(&shared, locked))
.then(move |r| {
schedule_one_reaping(&shared2, interval, weak_shared);
r
}),
)
}
}),
)
}
impl<M: ManageConnection> Pool<M> {
fn new_inner(builder: Builder<M>, manager: M) -> Pool<M> {
let internals = PoolInternals {
waiters: VecDeque::new(),
conns: VecDeque::new(),
num_conns: 0,
pending_conns: 0,
};
let shared = Arc::new(SharedPool {
statics: builder,
manager: manager,
internals: Mutex::new(internals),
});
if shared.statics.max_lifetime.is_some() || shared.statics.idle_timeout.is_some() {
let s = Arc::downgrade(&shared);
spawn(lazy(|| {
s.upgrade().ok_or(()).map(|shared| {
let interval = Interval::new_interval(shared.statics.reaper_rate);
schedule_one_reaping(&shared, interval, s);
})
}))
}
Pool { inner: shared }
}
fn spawn<R>(&self, runnable: R)
where
R: IntoFuture<Item = (), Error = ()>,
R::Future: Send + 'static,
{
self.inner.spawn(runnable);
}
fn sink_error<'a, E, F>(&self, f: F) -> impl Future<Item = F::Item, Error = ()> + Send + 'a
where
F: Future<Error = E> + Send + 'a,
E: Into<M::Error> + 'a,
{
self.inner.sink_error(f)
}
fn replenish_idle_connections_locked(
pool: &Arc<SharedPool<M>>,
internals: &mut PoolInternals<M::Connection>,
) -> impl Future<Item = (), Error = M::Error> + Send {
let slots_available = pool.statics.max_size - internals.num_conns - internals.pending_conns;
let idle = internals.conns.len() as u32;
let desired = pool.statics.min_idle.unwrap_or(0);
let f = FuturesUnordered::from_iter(
(idle..max(idle, min(desired, idle + slots_available)))
.map(|_| add_connection(pool, internals)),
);
f.fold((), |_, _| Ok(()))
}
fn replenish_idle_connections(&self) -> impl Future<Item = (), Error = M::Error> + Send {
let mut locked = self.inner.internals.lock().unwrap();
Pool::replenish_idle_connections_locked(&self.inner, &mut locked)
}
pub fn builder() -> Builder<M> {
Builder::new()
}
pub fn state(&self) -> State {
let locked = self.inner.internals.lock().unwrap();
State {
connections: locked.num_conns,
idle_connections: locked.conns.len() as u32,
_p: (),
}
}
pub fn run<'a, T, E, U, F>(
&self,
f: F,
) -> impl Future<Item = T, Error = RunError<E>> + Send + 'a
where
F: FnOnce(M::Connection) -> U + Send + 'a,
U: IntoFuture<Item = (T, M::Connection), Error = (E, M::Connection)> + Send + 'a,
U::Future: Send + 'a,
E: From<M::Error> + Send + 'a,
T: Send + 'a,
{
let inner = self.inner.clone();
let inner2 = inner.clone();
lazy(move || {
get_idle_connection(inner).then(move |r| match r {
Ok(conn) => Either::A(ok(conn)),
Err(inner) => {
let (tx, rx) = oneshot::channel();
{
let mut locked = inner.internals.lock().unwrap();
locked.waiters.push_back(tx);
if locked.num_conns + locked.pending_conns < inner.statics.max_size {
let f = add_connection(&inner, &mut locked);
inner.spawn(inner.sink_error(f));
}
}
Either::B(inner.or_timeout(rx).then(move |r| match r {
Ok(Some(conn)) => Ok(conn),
_ => Err(RunError::TimedOut),
}))
}
})
})
.and_then(|conn| {
let inner = inner2;
let birth = conn.birth;
f(conn.conn)
.into_future()
.then(move |r| {
let (r, mut conn): (Result<_, E>, _) = match r {
Ok((t, conn)) => (Ok(t), conn),
Err((e, conn)) => (Err(e.into()), conn),
};
let broken = inner.manager.has_broken(&mut conn);
let mut locked = inner.internals.lock().unwrap();
if broken {
drop_connections(&inner, locked, vec![conn]);
} else {
let conn = IdleConn::make_idle(Conn {
conn: conn,
birth: birth,
});
locked.put_idle_conn(conn);
}
r
})
.map_err(|e| RunError::User(e))
})
}
pub fn dedicated_connection(
&self,
) -> impl Future<Item = M::Connection, Error = M::Error> + Send {
let inner = self.inner.clone();
inner.manager.connect()
}
}