#![feature(unsafe_destructor)]
#![warn(missing_docs)]
#![allow(unstable, missing_copy_implementations)]
#![doc(html_root_url="https://sfackler.github.io/doc")]
#[macro_use]
extern crate log;
extern crate time;
use std::collections::RingBuf;
use std::error::Error;
use std::fmt;
use std::ops::{Deref, DerefMut};
use std::sync::{Arc, Mutex, Condvar};
use std::time::Duration;
use time::SteadyTime;
pub use config::{Config, ConfigError};
use task::ScheduledThreadPool;
mod config;
mod task;
pub trait ConnectionManager: Send+Sync {
type Connection: Send;
type Error: Send;
fn connect(&self) -> Result<Self::Connection, Self::Error>;
fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error>;
fn has_broken(&self, conn: &mut Self::Connection) -> bool;
}
pub trait ErrorHandler<E>: Send+Sync {
fn handle_error(&self, error: E);
}
impl<E> ErrorHandler<E> for Box<ErrorHandler<E>> {
fn handle_error(&self, error: E) {
(**self).handle_error(error)
}
}
#[derive(Copy, Clone, Debug)]
pub struct NoopErrorHandler;
impl<E> ErrorHandler<E> for NoopErrorHandler {
fn handle_error(&self, _: E) {}
}
#[derive(Copy, Clone, Debug)]
pub struct LoggingErrorHandler;
impl<E> ErrorHandler<E> for LoggingErrorHandler where E: fmt::Debug {
fn handle_error(&self, error: E) {
error!("{:?}", error);
}
}
struct PoolInternals<C> {
conns: RingBuf<C>,
num_conns: u32,
}
struct SharedPool<M, H>
where M: ConnectionManager, H: ErrorHandler<<M as ConnectionManager>::Error> {
config: Config,
manager: M,
error_handler: H,
internals: Mutex<PoolInternals<<M as ConnectionManager>::Connection>>,
cond: Condvar,
thread_pool: ScheduledThreadPool,
}
fn add_connection<M, H>(delay: Duration, shared: &Arc<SharedPool<M, H>>)
where M: ConnectionManager, H: ErrorHandler<<M as ConnectionManager>::Error> {
let new_shared = shared.clone();
shared.thread_pool.run_after(delay, move || {
let shared = new_shared;
match shared.manager.connect() {
Ok(conn) => {
let mut internals = shared.internals.lock().unwrap();
internals.conns.push_back(conn);
internals.num_conns += 1;
shared.cond.notify_one();
}
Err(err) => {
shared.error_handler.handle_error(err);
add_connection(Duration::seconds(1), &shared);
},
}
});
}
pub struct Pool<M, H>
where M: ConnectionManager, H: ErrorHandler<<M as ConnectionManager>::Error> {
shared: Arc<SharedPool<M, H>>,
}
#[unsafe_destructor]
impl<M, H> Drop for Pool<M, H>
where M: ConnectionManager, H: ErrorHandler<<M as ConnectionManager>::Error> {
fn drop(&mut self) {
self.shared.thread_pool.clear();
}
}
impl<M, H> fmt::Debug for Pool<M, H>
where M: ConnectionManager + fmt::Debug, H: ErrorHandler<<M as ConnectionManager>::Error> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "Pool {{ config: {:?}, manager: {:?} }}", self.shared.config,
self.shared.manager)
}
}
#[derive(Debug)]
pub struct InitializationError;
impl fmt::Display for InitializationError {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.write_str(self.description())
}
}
impl Error for InitializationError {
fn description(&self) -> &str {
"Unable to initialize connections"
}
}
#[derive(Debug)]
pub struct GetTimeout;
impl fmt::Display for GetTimeout {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.write_str(self.description())
}
}
impl Error for GetTimeout {
fn description(&self) -> &str {
"Timed out while waiting for a connection"
}
}
impl<M, H> Pool<M, H>
where M: ConnectionManager, H: ErrorHandler<<M as ConnectionManager>::Error> {
pub fn new(config: Config, manager: M, error_handler: H)
-> Result<Pool<M, H>, InitializationError> {
config.validate().unwrap();
let internals = PoolInternals {
conns: RingBuf::new(),
num_conns: 0,
};
let shared = Arc::new(SharedPool {
config: config,
manager: manager,
error_handler: error_handler,
internals: Mutex::new(internals),
cond: Condvar::new(),
thread_pool: ScheduledThreadPool::new(config.helper_tasks as usize),
});
for _ in 0..config.pool_size {
add_connection(Duration::zero(), &shared);
}
if shared.config.initialization_fail_fast {
let internals = shared.internals.lock().unwrap();
let initialized = shared.cond.wait_timeout_with(internals,
shared.config.connection_timeout,
|internals| {
internals.unwrap().num_conns == shared.config.pool_size
}).unwrap().1;
if !initialized {
return Err(InitializationError);
}
}
Ok(Pool {
shared: shared,
})
}
pub fn get<'a>(&'a self) -> Result<PooledConnection<'a, M, H>, GetTimeout> {
let end = SteadyTime::now() + self.shared.config.connection_timeout;
let mut internals = self.shared.internals.lock().unwrap();
loop {
match internals.conns.pop_front() {
Some(mut conn) => {
drop(internals);
if self.shared.config.test_on_check_out {
if let Err(e) = self.shared.manager.is_valid(&mut conn) {
self.shared.error_handler.handle_error(e);
internals = self.shared.internals.lock().unwrap();
internals.num_conns -= 1;
add_connection(Duration::zero(), &self.shared);
continue
}
}
return Ok(PooledConnection {
pool: self,
conn: Some(conn),
})
}
None => {
let now = SteadyTime::now();
let (new_internals, no_timeout) =
self.shared.cond.wait_timeout(internals, end - now).unwrap();
internals = new_internals;
if !no_timeout {
return Err(GetTimeout);
}
}
}
}
}
fn put_back(&self, mut conn: <M as ConnectionManager>::Connection) {
let broken = self.shared.manager.has_broken(&mut conn);
let mut internals = self.shared.internals.lock().unwrap();
if broken {
internals.num_conns -= 1;
} else {
internals.conns.push_back(conn);
self.shared.cond.notify_one();
}
}
}
pub struct PooledConnection<'a, M, H>
where M: ConnectionManager, H: ErrorHandler<<M as ConnectionManager>::Error> {
pool: &'a Pool<M, H>,
conn: Option<<M as ConnectionManager>::Connection>,
}
impl<'a, M, H> fmt::Debug for PooledConnection<'a, M, H>
where M: ConnectionManager + fmt::Debug,
H: ErrorHandler<<M as ConnectionManager>::Error>,
<M as ConnectionManager>::Connection: fmt::Debug {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "PooledConnection {{ pool: {:?}, connection: {:?} }}", self.pool,
self.conn.as_ref().unwrap())
}
}
#[unsafe_destructor]
impl<'a, M, H> Drop for PooledConnection<'a, M, H>
where M: ConnectionManager, H: ErrorHandler<<M as ConnectionManager>::Error> {
fn drop(&mut self) {
self.pool.put_back(self.conn.take().unwrap());
}
}
impl<'a, M, H> Deref for PooledConnection<'a, M, H>
where M: ConnectionManager, H: ErrorHandler<<M as ConnectionManager>::Error> {
type Target = <M as ConnectionManager>::Connection;
fn deref(&self) -> &<M as ConnectionManager>::Connection {
self.conn.as_ref().unwrap()
}
}
impl<'a, M, H> DerefMut for PooledConnection<'a, M, H>
where M: ConnectionManager, H: ErrorHandler<<M as ConnectionManager>::Error> {
fn deref_mut(&mut self) -> &mut <M as ConnectionManager>::Connection {
self.conn.as_mut().unwrap()
}
}