use std::{
mem::ManuallyDrop,
ops::{Deref, DerefMut},
sync::{Arc, Mutex},
time::{Duration, Instant},
};
use crate::{
Executor,
connection::{Connection, ConnectionOptions, ConnectionResult},
handle_drop::HandleDrop,
};
pub struct PoolOptions {
clean_timeout: Duration,
reconnect_time: Duration,
max_connections: usize,
stale_connection_time: Duration,
ping_timeout: Duration,
}
impl PoolOptions {
pub fn new() -> Self {
PoolOptions::default()
}
pub fn clean_timeout(self, duration: Duration) -> Self {
PoolOptions {
clean_timeout: duration,
..self
}
}
pub fn reconnect_time(self, duration: Duration) -> Self {
PoolOptions {
reconnect_time: duration,
..self
}
}
pub fn max_connections(self, connection: usize) -> Self {
PoolOptions {
max_connections: connection,
..self
}
}
}
impl Default for PoolOptions {
fn default() -> Self {
Self {
clean_timeout: Duration::from_millis(200),
reconnect_time: Duration::from_secs(2),
stale_connection_time: Duration::from_secs(10 * 60),
ping_timeout: Duration::from_millis(200),
max_connections: 5,
}
}
}
struct PoolProtected {
connections: Vec<(Connection, Instant)>,
unallocated_connections: usize,
}
struct PoolInner {
protected: Mutex<PoolProtected>,
pool_options: PoolOptions,
connection_options: ConnectionOptions<'static>,
connection_available: tokio::sync::Notify,
}
#[derive(Clone)]
pub struct Pool(Arc<PoolInner>);
impl Pool {
pub async fn connect(
connection_options: ConnectionOptions<'static>,
pool_options: PoolOptions,
) -> ConnectionResult<Self> {
let connection = Connection::connect(&connection_options).await?;
Ok(Pool(Arc::new(PoolInner {
protected: Mutex::new(PoolProtected {
connections: vec![(connection, std::time::Instant::now())],
unallocated_connections: pool_options.max_connections - 1,
}),
pool_options,
connection_options,
connection_available: tokio::sync::Notify::new(),
})))
}
pub async fn acquire(&self) -> ConnectionResult<PoolConnection> {
enum Res<N, R> {
Wait,
New(N),
Reuse(R),
}
loop {
let res = {
let mut inner = self.0.protected.lock().unwrap();
if let Some((connection, last_use)) = inner.connections.pop() {
Res::Reuse(HandleDrop::new(
(connection, last_use, self.clone()),
|(connection, last_use, pool)| {
let mut inner = pool.0.protected.lock().unwrap();
inner.connections.push((connection, last_use));
},
))
} else if inner.unallocated_connections == 0 {
Res::Wait
} else {
inner.unallocated_connections -= 1;
Res::New(HandleDrop::new(self.clone(), |pool| {
pool.connection_dropped();
}))
}
};
match res {
Res::Wait => {
self.0.connection_available.notified().await
}
Res::New(handle) => {
let r = Connection::connect(&self.0.connection_options).await;
match r {
Ok(connection) => {
let pool = handle.release();
return Ok(PoolConnection {
pool,
connection: ManuallyDrop::new(connection),
});
}
Err(e) => {
tokio::task::spawn(async move {
tokio::time::sleep((*handle).0.pool_options.reconnect_time).await;
std::mem::drop(handle);
});
return Err(e);
}
}
}
Res::Reuse(mut handle) => {
let (connection, last_use, pool) = &mut *handle;
if last_use.elapsed() > pool.0.pool_options.stale_connection_time {
match tokio::time::timeout(
pool.0.pool_options.ping_timeout,
connection.ping(),
)
.await
{
Ok(Ok(())) => (),
Err(_) | Ok(Err(_)) => {
let (connection, _, pool) = handle.release();
std::mem::drop(connection);
pool.connection_dropped();
continue;
}
}
}
let (connection, _, pool) = handle.release();
let connection = PoolConnection {
pool,
connection: ManuallyDrop::new(connection),
};
return Ok(connection);
}
}
}
}
fn connection_dropped(&self) {
let mut inner = self.0.protected.lock().unwrap();
inner.unallocated_connections += 1;
self.0.connection_available.notify_one();
}
fn release(&self, connection: Connection) {
let mut inner = self.0.protected.lock().unwrap();
self.0.connection_available.notify_one();
inner
.connections
.push((connection, std::time::Instant::now()));
}
}
pub struct PoolConnection {
pool: Pool,
connection: ManuallyDrop<Connection>,
}
impl Deref for PoolConnection {
type Target = Connection;
fn deref(&self) -> &Self::Target {
&self.connection
}
}
impl DerefMut for PoolConnection {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.connection
}
}
impl Drop for PoolConnection {
fn drop(&mut self) {
let mut connection = unsafe { ManuallyDrop::take(&mut self.connection) };
if connection.is_clean() {
self.pool.release(connection);
} else {
let pool = self.pool.clone();
tokio::spawn(async move {
match tokio::time::timeout(pool.0.pool_options.clean_timeout, connection.cleanup())
.await
{
Ok(Ok(())) => {
pool.release(connection);
}
Ok(Err(_)) => {
std::mem::drop(connection);
pool.connection_dropped();
}
Err(_) => {
std::mem::drop(connection);
pool.connection_dropped();
}
}
});
}
}
}