use std::collections::VecDeque;
use std::sync::{Arc, Condvar, Mutex};
use std::time::{Duration, Instant};
use crate::config::ConnectConfig;
use crate::connection::Connection;
use crate::error::{Error, Result};
#[derive(Debug, Clone)]
pub struct PoolConfig {
pub max_size: usize,
pub acquisition_timeout: Option<Duration>,
}
impl Default for PoolConfig {
fn default() -> Self {
PoolConfig {
max_size: 10,
acquisition_timeout: Some(Duration::from_secs(30)),
}
}
}
struct PoolState {
config: ConnectConfig,
idle: Mutex<VecDeque<Connection>>,
permits: Arc<PermitPool>,
acquisition_timeout: Option<Duration>,
}
struct PermitPool {
available: Mutex<usize>,
changed: Condvar,
}
impl PermitPool {
fn new(max: usize) -> Self {
PermitPool {
available: Mutex::new(max),
changed: Condvar::new(),
}
}
fn acquire(self: &Arc<Self>, timeout: Option<Duration>) -> Result<Permit> {
let mut available = self
.available
.lock()
.map_err(|_| Error::Pool("pool lock poisoned".into()))?;
match timeout {
None => {
while *available == 0 {
available = self
.changed
.wait(available)
.map_err(|_| Error::Pool("pool lock poisoned".into()))?;
}
}
Some(timeout) => {
let deadline = Instant::now() + timeout;
while *available == 0 {
let now = Instant::now();
if now >= deadline {
return Err(Error::Timeout);
}
let wait = deadline.saturating_duration_since(now);
let (guard, result) = self
.changed
.wait_timeout(available, wait)
.map_err(|_| Error::Pool("pool lock poisoned".into()))?;
available = guard;
if result.timed_out() && *available == 0 {
return Err(Error::Timeout);
}
}
}
}
*available -= 1;
Ok(Permit {
permits: Arc::clone(self),
})
}
fn release(&self) {
if let Ok(mut available) = self.available.lock() {
*available += 1;
self.changed.notify_one();
}
}
}
struct Permit {
permits: Arc<PermitPool>,
}
impl Drop for Permit {
fn drop(&mut self) {
self.permits.release();
}
}
#[derive(Clone)]
pub struct Pool(Arc<PoolState>);
impl Pool {
pub fn new(config: ConnectConfig, pool_config: PoolConfig) -> Self {
Pool(Arc::new(PoolState {
config,
idle: Mutex::new(VecDeque::new()),
permits: Arc::new(PermitPool::new(pool_config.max_size)),
acquisition_timeout: pool_config.acquisition_timeout,
}))
}
pub fn get(&self) -> Result<PooledConnection> {
let permit = self.acquire_permit()?;
while let Some(conn) = self.pop_idle() {
if conn_is_alive(&conn) {
return Ok(PooledConnection {
conn: Some(conn),
pool: self.clone(),
permit: Some(permit),
});
}
}
let conn = Connection::connect(&self.0.config)?;
Ok(PooledConnection {
conn: Some(conn),
pool: self.clone(),
permit: Some(permit),
})
}
fn return_conn(&self, conn: Connection) {
if !conn.is_healthy() {
return;
}
if let Ok(mut idle) = self.0.idle.lock() {
idle.push_back(conn);
}
}
fn pop_idle(&self) -> Option<Connection> {
self.0.idle.lock().ok()?.pop_front()
}
fn acquire_permit(&self) -> Result<Permit> {
self.0.permits.acquire(self.0.acquisition_timeout)
}
}
fn conn_is_alive(conn: &Connection) -> bool {
conn.is_healthy()
}
pub struct PooledConnection {
conn: Option<Connection>,
pool: Pool,
permit: Option<Permit>,
}
impl PooledConnection {
pub fn discard(mut self) {
self.conn = None; }
}
impl std::ops::Deref for PooledConnection {
type Target = Connection;
fn deref(&self) -> &Connection {
self.conn.as_ref().expect("conexão já descartada")
}
}
impl std::ops::DerefMut for PooledConnection {
fn deref_mut(&mut self) -> &mut Connection {
self.conn.as_mut().expect("conexão já descartada")
}
}
impl Drop for PooledConnection {
fn drop(&mut self) {
if let Some(conn) = self.conn.take() {
self.pool.return_conn(conn);
}
drop(self.permit.take());
}
}