use core::fmt;
use core::future::Future;
use core::ops::{Deref, DerefMut};
use core::sync::atomic::{AtomicBool, Ordering};
#[cfg(not(feature = "no-send"))]
use std::sync::{Arc as WrapPointer, Weak};
use std::time::Instant;
#[cfg(feature = "no-send")]
use std::{
rc::{Rc as WrapPointer, Weak},
thread::{self, ThreadId},
};
use crate::builder::Builder;
use crate::manager::Manager;
use crate::pool_inner::{PoolLock, State};
pub struct Conn<M: Manager> {
conn: M::Connection,
marker: usize,
birth: Instant,
}
impl<M: Manager> Conn<M> {
pub(crate) fn marker(&self) -> usize {
self.marker
}
}
pub struct IdleConn<M: Manager> {
conn: Conn<M>,
idle_start: Instant,
}
impl<M: Manager> IdleConn<M> {
fn new(conn: M::Connection, marker: usize) -> Self {
let now = Instant::now();
IdleConn {
conn: Conn {
conn,
marker,
birth: now,
},
idle_start: now,
}
}
#[inline]
pub(crate) fn marker(&self) -> usize {
self.conn.marker
}
}
impl<M: Manager> From<Conn<M>> for IdleConn<M> {
fn from(conn: Conn<M>) -> IdleConn<M> {
let now = Instant::now();
IdleConn {
conn,
idle_start: now,
}
}
}
impl<M: Manager> From<IdleConn<M>> for Conn<M> {
fn from(conn: IdleConn<M>) -> Conn<M> {
Conn {
conn: conn.conn.conn,
birth: conn.conn.birth,
marker: conn.conn.marker,
}
}
}
pub struct ManagedPool<M: Manager> {
builder: Builder,
manager: M,
running: AtomicBool,
pool_lock: PoolLock<M>,
}
impl<M: Manager> ManagedPool<M> {
fn new(builder: Builder, manager: M) -> Self {
let pool_lock = PoolLock::from_builder(&builder);
Self {
builder,
manager,
running: AtomicBool::new(true),
pool_lock,
}
}
#[inline]
async fn get_conn<'a, R>(&'a self, shared_pool: &'a SharedManagedPool<M>) -> Result<R, M::Error>
where
R: PoolRefBehavior<'a, M> + Unpin,
{
let fut = self.pool_lock.lock::<R>(shared_pool);
let timeout = self.builder.wait_timeout;
let mut pool_ref = self.manager.timeout(fut, timeout).await?;
if self.builder.always_check {
let mut retry = 0u8;
loop {
let result = self.check_conn(&mut pool_ref).await;
match result {
Ok(Ok(_)) => break,
Ok(Err(e)) => {
pool_ref.take_drop();
if retry == 3 {
return Err(e);
} else {
retry += 1;
};
}
Err(timeout_err) => {
pool_ref.take_drop();
return Err(timeout_err);
}
}
let fut = self.pool_lock.lock::<R>(shared_pool);
pool_ref = self.manager.timeout(fut, timeout).await?;
}
};
Ok(pool_ref)
}
fn drop_conn(&self, marker: usize, should_spawn_new: bool) -> Option<usize> {
self.pool_lock.dec_spawned(marker, should_spawn_new)
}
pub(crate) async fn add_idle_conn(&self, marker: usize) -> Result<(), M::Error> {
let fut = self.manager.connect();
let timeout = self.builder.connection_timeout;
let conn = self
.manager
.timeout(fut, timeout)
.await
.map_err(|e| {
self.pool_lock.dec_pending(1);
e
})?
.map_err(|e| {
self.pool_lock.dec_pending(1);
e
})?;
self.pool_lock
.put_back_inc_spawned(IdleConn::new(conn, marker));
Ok(())
}
async fn check_conn(&self, conn: &mut M::Connection) -> Result<Result<(), M::Error>, M::Error> {
let fut = self.manager.is_valid(conn);
let timeout = self.builder.connection_timeout;
let res = self.manager.timeout(fut, timeout).await?;
Ok(res)
}
async fn replenish_idle_conn(
&self,
pending_count: usize,
marker: usize,
) -> Result<(), M::Error> {
for i in 0..pending_count {
self.add_idle_conn(marker).await.map_err(|e| {
let count = pending_count - i - 1;
if count > 0 {
self.pool_lock.dec_pending(count);
};
e
})?;
}
Ok(())
}
fn if_running(&self, running: bool) {
self.running.store(running, Ordering::Release);
}
pub(crate) fn is_running(&self) -> bool {
self.running.load(Ordering::Acquire)
}
#[cfg(not(feature = "no-send"))]
pub(crate) fn spawn<Fut>(&self, fut: Fut)
where
Fut: Future<Output = ()> + Send + 'static,
{
self.manager.spawn(fut);
}
#[cfg(feature = "no-send")]
pub(crate) fn spawn<Fut>(&self, fut: Fut)
where
Fut: Future<Output = ()> + 'static,
{
self.manager.spawn(fut);
}
pub async fn reap_idle_conn(&self) -> Result<(), M::Error> {
let now = Instant::now();
let pending_new = self.pool_lock.try_drop_conn(|conn| {
let mut should_drop = false;
if let Some(timeout) = self.builder.idle_timeout {
should_drop |= now >= conn.idle_start + timeout;
}
if let Some(lifetime) = self.builder.max_lifetime {
should_drop |= now >= conn.conn.birth + lifetime;
}
should_drop
});
match pending_new {
Some((pending_new, marker)) => self.replenish_idle_conn(pending_new, marker).await,
None => Ok(()),
}
}
pub fn garbage_collect(&self) {
self.pool_lock
.drop_pending(|pending| pending.should_remove(self.builder.connection_timeout));
}
pub fn get_builder(&self) -> &Builder {
&self.builder
}
}
pub type SharedManagedPool<M> = WrapPointer<ManagedPool<M>>;
pub type WeakSharedManagedPool<M> = Weak<ManagedPool<M>>;
pub struct Pool<M: Manager> {
pool: SharedManagedPool<M>,
#[cfg(feature = "no-send")]
id: ThreadId,
}
impl<M: Manager> Clone for Pool<M> {
fn clone(&self) -> Self {
Self {
pool: self.pool.clone(),
#[cfg(feature = "no-send")]
id: self.id,
}
}
}
impl<M: Manager> fmt::Debug for Pool<M> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_fmt(format_args!("Pool({:p})", self.pool))
}
}
impl<M: Manager> Drop for Pool<M> {
fn drop(&mut self) {
self.pool.manager.on_stop();
}
}
impl<M: Manager> Pool<M> {
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub async fn get(&self) -> Result<PoolRef<'_, M>, M::Error> {
let shared_pool = &self.pool;
shared_pool.get_conn(shared_pool).await
}
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub async fn get_owned(&self) -> Result<PoolRefOwned<M>, M::Error> {
let shared_pool = &self.pool;
shared_pool.get_conn(shared_pool).await
}
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub async fn run<'a, T, E, F, FF>(&'a self, f: F) -> Result<T, E>
where
F: FnOnce(PoolRef<'a, M>) -> FF,
FF: Future<Output = Result<T, E>> + Send + 'a,
E: From<M::Error>,
T: Send + 'static,
{
let pool_ref = self.get().await?;
f(pool_ref).await
}
pub fn pause(&self) {
self.pool.if_running(false);
}
pub fn resume(&self) {
self.pool.if_running(true);
}
pub fn running(&self) -> bool {
self.pool.is_running()
}
pub fn clear(&self) {
self.pool.pool_lock.clear()
}
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub async fn init(&self) -> Result<(), M::Error> {
let shared_pool = &self.pool;
let marker = shared_pool.pool_lock.marker();
shared_pool
.replenish_idle_conn(shared_pool.builder.min_idle, marker)
.await?;
shared_pool.manager.on_start(shared_pool);
Ok(())
}
pub fn set_max_size(&self, size: usize) {
self.pool.pool_lock.set_max_size(size);
}
pub fn set_min_idle(&self, size: usize) {
self.pool.pool_lock.set_min_idle(size);
}
pub fn get_manager(&self) -> &M {
&self.pool.manager
}
pub fn state(&self) -> State {
self.pool.pool_lock.state()
}
#[cfg(feature = "no-send")]
pub fn thread_id(&self) -> ThreadId {
self.id
}
pub(crate) fn new(builder: Builder, manager: M) -> Self {
let pool = ManagedPool::new(builder, manager);
Pool {
pool: WrapPointer::new(pool),
#[cfg(feature = "no-send")]
id: thread::current().id(),
}
}
}
pub struct PoolRef<'a, M: Manager> {
conn: Option<Conn<M>>,
shared_pool: &'a SharedManagedPool<M>,
marker: Option<usize>,
}
pub struct PoolRefOwned<M: Manager> {
conn: Option<Conn<M>>,
shared_pool: WeakSharedManagedPool<M>,
marker: Option<usize>,
}
impl<M: Manager> PoolRef<'_, M> {
pub fn get_conn(&mut self) -> &mut M::Connection {
&mut *self
}
pub fn take_conn(&mut self) -> Option<M::Connection> {
self.conn.take().map(|c| {
self.marker = Some(c.marker);
c.conn
})
}
pub fn push_conn(&mut self, conn: M::Connection) {
let marker = match self.marker {
Some(marker) => marker,
None => self.conn.as_ref().map(|c| c.marker()).unwrap(),
};
self.conn = Some(Conn {
conn,
marker,
birth: Instant::now(),
});
}
pub fn get_manager(&self) -> &M {
&self.shared_pool.manager
}
}
impl<M: Manager> PoolRefOwned<M> {
pub fn get_conn(&mut self) -> &mut M::Connection {
&mut *self
}
pub fn take_conn(&mut self) -> Option<M::Connection> {
self.conn.take().map(|c| {
self.marker = Some(c.marker);
c.conn
})
}
pub fn push_conn(&mut self, conn: M::Connection) {
let marker = match self.marker {
Some(marker) => marker,
None => self.conn.as_ref().map(|c| c.marker()).unwrap(),
};
self.conn = Some(Conn {
conn,
marker,
birth: Instant::now(),
});
}
}
pub(crate) trait PoolRefBehavior<'a, M: Manager>
where
Self: DerefMut<Target = M::Connection> + Sized,
{
fn from_idle(conn: IdleConn<M>, shared_pool: &'a SharedManagedPool<M>) -> Self;
fn take_drop(self) {}
}
impl<'re, M: Manager> PoolRefBehavior<'re, M> for PoolRef<'re, M> {
fn from_idle(conn: IdleConn<M>, shared_pool: &'re SharedManagedPool<M>) -> Self {
Self {
conn: Some(conn.into()),
shared_pool,
marker: None,
}
}
fn take_drop(mut self) {
let _ = self.take_conn();
}
}
impl<M: Manager> PoolRefBehavior<'_, M> for PoolRefOwned<M> {
fn from_idle(conn: IdleConn<M>, shared_pool: &SharedManagedPool<M>) -> Self {
Self {
conn: Some(conn.into()),
shared_pool: WrapPointer::downgrade(shared_pool),
marker: None,
}
}
fn take_drop(mut self) {
let _ = self.take_conn();
}
}
impl<M: Manager> Deref for PoolRef<'_, M> {
type Target = M::Connection;
fn deref(&self) -> &Self::Target {
&self
.conn
.as_ref()
.expect("Connection has already been taken")
.conn
}
}
impl<M: Manager> DerefMut for PoolRef<'_, M> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self
.conn
.as_mut()
.expect("Connection has already been taken")
.conn
}
}
impl<M: Manager> Deref for PoolRefOwned<M> {
type Target = M::Connection;
fn deref(&self) -> &Self::Target {
&self
.conn
.as_ref()
.expect("Connection has already been taken")
.conn
}
}
impl<M: Manager> DerefMut for PoolRefOwned<M> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self
.conn
.as_mut()
.expect("Connection has already been taken")
.conn
}
}
impl<M: Manager> Drop for PoolRef<'_, M> {
#[inline]
fn drop(&mut self) {
self.shared_pool.drop_pool_ref(&mut self.conn, self.marker);
}
}
impl<M: Manager> Drop for PoolRefOwned<M> {
#[inline]
fn drop(&mut self) {
if let Some(shared_pool) = self.shared_pool.upgrade() {
shared_pool.drop_pool_ref(&mut self.conn, self.marker);
}
}
}
trait DropAndSpawn<M: Manager> {
fn drop_pool_ref(&self, conn: &mut Option<Conn<M>>, marker: Option<usize>);
fn spawn_drop(&self, marker: usize);
}
impl<M: Manager> DropAndSpawn<M> for SharedManagedPool<M> {
#[inline]
fn drop_pool_ref(&self, conn: &mut Option<Conn<M>>, marker: Option<usize>) {
if !self.is_running() {
self.drop_conn(0, false);
return;
}
let mut conn = match conn.take() {
Some(conn) => conn,
None => {
self.spawn_drop(marker.unwrap());
return;
}
};
let is_closed = self.manager.is_closed(&mut conn.conn);
if is_closed {
self.spawn_drop(conn.marker);
} else {
self.pool_lock.put_back(conn.into());
};
}
#[cold]
fn spawn_drop(&self, marker: usize) {
let opt = self.drop_conn(marker, true);
if let Some(pending) = opt {
let shared_clone = self.clone();
self.spawn(async move {
let _ = shared_clone.replenish_idle_conn(pending, marker).await;
});
}
}
}