mod builder;
mod config;
mod dropguard;
mod errors;
mod hooks;
mod metrics;
use std::{
collections::VecDeque,
fmt,
future::Future,
ops::{Deref, DerefMut},
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex, Weak,
},
time::{Duration, Instant},
};
use crate::Conn;
use tokio::sync::{Semaphore, TryAcquireError};
use crate::deadpool::Status;
use self::dropguard::DropGuard;
pub use self::{
builder::{BuildError, PoolBuilder},
config::{PoolConfig, Timeouts},
errors::PoolError,
hooks::{Hook, HookError},
metrics::Metrics,
};
pub(crate) use self::{
config::{CreatePoolError, QueueMode},
errors::{RecycleError, TimeoutType},
};
pub(crate) type RecycleResult = Result<(), RecycleError>;
#[must_use]
pub struct PoolConn {
inner: Option<PoolConnInner>,
pool: Weak<PoolInner>,
}
impl fmt::Debug for PoolConn {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Object")
.field("inner", &self.inner)
.finish()
}
}
struct UnreadyObject<'a> {
inner: Option<PoolConnInner>,
pool: &'a PoolInner,
}
impl<'a> UnreadyObject<'a> {
fn ready(mut self) -> PoolConnInner {
self.inner.take().unwrap()
}
fn inner(&mut self) -> &mut PoolConnInner {
return self.inner.as_mut().unwrap();
}
}
impl<'a> Drop for UnreadyObject<'a> {
fn drop(&mut self) {
if let Some(mut inner) = self.inner.take() {
self.pool.slots.lock().unwrap().size -= 1;
self.pool.manager.detach(&mut inner.obj);
}
}
}
#[derive(Debug)]
pub(crate) struct PoolConnInner {
obj: Conn,
metrics: Metrics,
}
impl PoolConn {
#[must_use]
pub fn take(mut this: Self) -> Conn {
let mut inner = this.inner.take().unwrap().obj;
if let Some(pool) = PoolConn::pool(&this) {
pool.inner.detach_object(&mut inner)
}
inner
}
pub fn metrics(this: &Self) -> &Metrics {
&this.inner.as_ref().unwrap().metrics
}
pub fn pool(this: &Self) -> Option<Pool> {
this.pool.upgrade().map(|inner| Pool { inner })
}
}
impl Drop for PoolConn {
fn drop(&mut self) {
if let Some(inner) = self.inner.take() {
if let Some(pool) = self.pool.upgrade() {
pool.return_object(inner)
}
}
}
}
impl Deref for PoolConn {
type Target = Conn;
fn deref(&self) -> &Conn {
&self.inner.as_ref().unwrap().obj
}
}
impl DerefMut for PoolConn {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner.as_mut().unwrap().obj
}
}
impl AsRef<Conn> for PoolConn {
fn as_ref(&self) -> &Conn {
self
}
}
impl AsMut<Conn> for PoolConn {
fn as_mut(&mut self) -> &mut Conn {
self
}
}
pub struct Pool {
inner: Arc<PoolInner>,
}
impl fmt::Debug for Pool {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Pool").field("inner", &self.inner).finish()
}
}
impl Clone for Pool {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl Pool {
pub fn builder(manager: crate::Manager) -> PoolBuilder {
PoolBuilder::new(manager)
}
pub(crate) fn from_builder(builder: PoolBuilder) -> Self {
Self {
inner: Arc::new(PoolInner {
manager: builder.manager,
slots: Mutex::new(Slots {
vec: VecDeque::with_capacity(builder.config.max_size),
size: 0,
max_size: builder.config.max_size,
}),
users: AtomicUsize::new(0),
semaphore: Semaphore::new(builder.config.max_size),
config: builder.config,
hooks: builder.hooks,
}),
}
}
pub async fn get(&self) -> Result<PoolConn, PoolError> {
self.timeout_get(&self.inner.config.timeouts).await
}
pub async fn timeout_get(&self, timeouts: &Timeouts) -> Result<PoolConn, PoolError> {
let _ = self.inner.users.fetch_add(1, Ordering::Relaxed);
let users_guard = DropGuard(|| {
let _ = self.inner.users.fetch_sub(1, Ordering::Relaxed);
});
let non_blocking = match timeouts.wait {
Some(t) => t.as_nanos() == 0,
None => false,
};
let permit = if non_blocking {
self.inner.semaphore.try_acquire().map_err(|e| match e {
TryAcquireError::Closed => PoolError::Closed,
TryAcquireError::NoPermits => PoolError::Timeout(TimeoutType::Wait),
})?
} else {
apply_timeout(TimeoutType::Wait, timeouts.wait, async {
self.inner
.semaphore
.acquire()
.await
.map_err(|_| PoolError::Closed)
})
.await?
};
let inner_obj = loop {
let inner_obj = match self.inner.config.queue_mode {
QueueMode::Fifo => self.inner.slots.lock().unwrap().vec.pop_front(),
QueueMode::Lifo => self.inner.slots.lock().unwrap().vec.pop_back(),
};
let inner_obj = if let Some(inner_obj) = inner_obj {
self.try_recycle(timeouts, inner_obj).await?
} else {
self.try_create(timeouts).await?
};
if let Some(inner_obj) = inner_obj {
break inner_obj;
}
};
users_guard.disarm();
permit.forget();
Ok(PoolConn {
inner: Some(inner_obj),
pool: Arc::downgrade(&self.inner),
}
.into())
}
#[inline]
async fn try_recycle(
&self,
timeouts: &Timeouts,
inner_obj: PoolConnInner,
) -> Result<Option<PoolConnInner>, PoolError> {
let mut unready_obj = UnreadyObject {
inner: Some(inner_obj),
pool: &self.inner,
};
let inner = unready_obj.inner();
if apply_timeout(
TimeoutType::Recycle,
timeouts.recycle,
self.inner.manager.recycle(&mut inner.obj, &inner.metrics),
)
.await
.is_err()
{
return Ok(None);
}
inner.metrics.recycle_count += 1;
inner.metrics.recycled = Some(Instant::now());
Ok(Some(unready_obj.ready()))
}
#[inline]
async fn try_create(&self, timeouts: &Timeouts) -> Result<Option<PoolConnInner>, PoolError> {
let mut unready_obj = UnreadyObject {
inner: Some(PoolConnInner {
obj: apply_timeout(
TimeoutType::Create,
timeouts.create,
self.inner.manager.create(),
)
.await?,
metrics: Metrics::default(),
}),
pool: &self.inner,
};
self.inner.slots.lock().unwrap().size += 1;
if let Err(e) = self
.inner
.hooks
.post_create
.apply(unready_obj.inner())
.await
{
return Err(PoolError::PostCreateHook(e));
}
Ok(Some(unready_obj.ready()))
}
pub fn resize(&self, max_size: usize) {
if self.inner.semaphore.is_closed() {
return;
}
let mut slots = self.inner.slots.lock().unwrap();
let old_max_size = slots.max_size;
slots.max_size = max_size;
if max_size < old_max_size {
while slots.size > slots.max_size {
if let Ok(permit) = self.inner.semaphore.try_acquire() {
permit.forget();
if slots.vec.pop_front().is_some() {
slots.size -= 1;
}
} else {
break;
}
}
let mut vec = VecDeque::with_capacity(max_size);
for obj in slots.vec.drain(..) {
vec.push_back(obj);
}
slots.vec = vec;
}
if max_size > old_max_size {
let additional = slots.max_size - slots.size;
slots.vec.reserve_exact(additional);
self.inner.semaphore.add_permits(additional);
}
}
pub fn retain(&self, f: impl Fn(&Conn, Metrics) -> bool) {
let mut guard = self.inner.slots.lock().unwrap();
let len_before = guard.vec.len();
guard.vec.retain_mut(|obj| {
if f(&obj.obj, obj.metrics) {
true
} else {
self.manager().detach(&mut obj.obj);
false
}
});
guard.size -= len_before - guard.vec.len();
}
pub fn timeouts(&self) -> Timeouts {
self.inner.config.timeouts
}
pub fn close(&self) {
self.resize(0);
self.inner.semaphore.close();
}
pub fn is_closed(&self) -> bool {
self.inner.semaphore.is_closed()
}
#[must_use]
pub fn status(&self) -> Status {
let slots = self.inner.slots.lock().unwrap();
let users = self.inner.users.load(Ordering::Relaxed);
let (available, waiting) = if users < slots.size {
(slots.size - users, 0)
} else {
(0, users - slots.size)
};
Status {
max_size: slots.max_size,
size: slots.size,
available,
waiting,
}
}
#[must_use]
pub fn manager(&self) -> &crate::Manager {
&self.inner.manager
}
}
struct PoolInner {
manager: crate::Manager,
slots: Mutex<Slots<PoolConnInner>>,
users: AtomicUsize,
semaphore: Semaphore,
config: PoolConfig,
hooks: hooks::Hooks,
}
#[derive(Debug)]
struct Slots<T> {
vec: VecDeque<T>,
size: usize,
max_size: usize,
}
impl fmt::Debug for PoolInner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PoolInner")
.field("manager", &self.manager)
.field("slots", &self.slots)
.field("used", &self.users)
.field("semaphore", &self.semaphore)
.field("config", &self.config)
.field("hooks", &self.hooks)
.finish()
}
}
impl PoolInner {
fn return_object(&self, mut inner: PoolConnInner) {
let _ = self.users.fetch_sub(1, Ordering::Relaxed);
let mut slots = self.slots.lock().unwrap();
if slots.size <= slots.max_size {
slots.vec.push_back(inner);
drop(slots);
self.semaphore.add_permits(1);
} else {
slots.size -= 1;
drop(slots);
self.manager.detach(&mut inner.obj);
}
}
fn detach_object(&self, obj: &mut Conn) {
let _ = self.users.fetch_sub(1, Ordering::Relaxed);
let mut slots = self.slots.lock().unwrap();
let add_permits = slots.size <= slots.max_size;
slots.size -= 1;
drop(slots);
if add_permits {
self.semaphore.add_permits(1);
}
self.manager.detach(obj);
}
}
async fn apply_timeout<O>(
timeout_type: TimeoutType,
duration: Option<Duration>,
future: impl Future<Output = Result<O, impl Into<PoolError>>>,
) -> Result<O, PoolError> {
match duration {
None => future.await.map_err(Into::into),
Some(duration) => tokio::time::timeout(duration, future)
.await
.ok()
.ok_or(PoolError::Timeout(timeout_type))?
.map_err(Into::into),
}
}