use std::fmt::{self, Debug, Formatter};
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::time::{Duration, Instant};
use crate::sync::AsyncSemaphoreReleaser;
use crate::connection::Connection;
use crate::database::Database;
use crate::error::Error;
use super::inner::{is_beyond_max_lifetime, DecrementSizeGuard, PoolInner};
use crate::pool::options::PoolConnectionMetadata;
use std::future::Future;
const CLOSE_ON_DROP_TIMEOUT: Duration = Duration::from_secs(5);
pub struct PoolConnection<DB: Database> {
live: Option<Live<DB>>,
close_on_drop: bool,
pub(crate) pool: Arc<PoolInner<DB>>,
}
pub(super) struct Live<DB: Database> {
pub(super) raw: DB::Connection,
pub(super) created_at: Instant,
}
pub(super) struct Idle<DB: Database> {
pub(super) live: Live<DB>,
pub(super) idle_since: Instant,
}
pub(super) struct Floating<DB: Database, C> {
pub(super) inner: C,
pub(super) guard: DecrementSizeGuard<DB>,
}
const EXPECT_MSG: &str = "BUG: inner connection already taken!";
impl<DB: Database> Debug for PoolConnection<DB> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("PoolConnection").finish()
}
}
impl<DB: Database> Deref for PoolConnection<DB> {
type Target = DB::Connection;
fn deref(&self) -> &Self::Target {
&self.live.as_ref().expect(EXPECT_MSG).raw
}
}
impl<DB: Database> DerefMut for PoolConnection<DB> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.live.as_mut().expect(EXPECT_MSG).raw
}
}
impl<DB: Database> AsRef<DB::Connection> for PoolConnection<DB> {
fn as_ref(&self) -> &DB::Connection {
self
}
}
impl<DB: Database> AsMut<DB::Connection> for PoolConnection<DB> {
fn as_mut(&mut self) -> &mut DB::Connection {
self
}
}
impl<DB: Database> PoolConnection<DB> {
pub async fn close(mut self) -> Result<(), Error> {
let floating = self.take_live().float(self.pool.clone());
floating.inner.raw.close().await
}
#[inline(always)]
pub fn close_on_drop(&mut self) {
self.close_on_drop = true;
}
pub fn detach(mut self) -> DB::Connection {
self.take_live().float(self.pool.clone()).detach()
}
pub fn leak(mut self) -> DB::Connection {
self.take_live().raw
}
fn take_live(&mut self) -> Live<DB> {
self.live.take().expect(EXPECT_MSG)
}
#[doc(hidden)]
pub fn return_to_pool(&mut self) -> impl Future<Output = ()> + Send + 'static {
let floating: Option<Floating<DB, Live<DB>>> =
self.live.take().map(|live| live.float(self.pool.clone()));
let pool = self.pool.clone();
async move {
let returned_to_pool = if let Some(floating) = floating {
floating.return_to_pool().await
} else {
false
};
if !returned_to_pool {
pool.min_connections_maintenance(None).await;
}
}
}
fn take_and_close(&mut self) -> impl Future<Output = ()> + Send + 'static {
let floating = self.live.take().map(|live| live.float(self.pool.clone()));
let pool = self.pool.clone();
async move {
if let Some(floating) = floating {
crate::rt::timeout(CLOSE_ON_DROP_TIMEOUT, floating.close())
.await
.ok();
}
pool.min_connections_maintenance(None).await;
}
}
}
impl<'c, DB: Database> crate::acquire::Acquire<'c> for &'c mut PoolConnection<DB> {
type Database = DB;
type Connection = &'c mut <DB as Database>::Connection;
#[inline]
fn acquire(self) -> futures_core::future::BoxFuture<'c, Result<Self::Connection, Error>> {
Box::pin(futures_util::future::ok(&mut **self))
}
#[inline]
fn begin(
self,
) -> futures_core::future::BoxFuture<'c, Result<crate::transaction::Transaction<'c, DB>, Error>>
{
crate::transaction::Transaction::begin(&mut **self, None)
}
}
impl<DB: Database> Drop for PoolConnection<DB> {
fn drop(&mut self) {
if self.close_on_drop {
crate::rt::spawn(self.take_and_close());
return;
}
if self.live.is_some() || self.pool.options.min_connections > 0 {
crate::rt::spawn(self.return_to_pool());
}
}
}
impl<DB: Database> Live<DB> {
pub fn float(self, pool: Arc<PoolInner<DB>>) -> Floating<DB, Self> {
Floating {
inner: self,
guard: DecrementSizeGuard::new_permit(pool),
}
}
pub fn into_idle(self) -> Idle<DB> {
Idle {
live: self,
idle_since: Instant::now(),
}
}
}
impl<DB: Database> Deref for Idle<DB> {
type Target = Live<DB>;
fn deref(&self) -> &Self::Target {
&self.live
}
}
impl<DB: Database> DerefMut for Idle<DB> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.live
}
}
impl<DB: Database> Floating<DB, Live<DB>> {
pub fn new_live(conn: DB::Connection, guard: DecrementSizeGuard<DB>) -> Self {
Self {
inner: Live {
raw: conn,
created_at: Instant::now(),
},
guard,
}
}
pub fn reattach(self) -> PoolConnection<DB> {
let Floating { inner, guard } = self;
let pool = Arc::clone(&guard.pool);
guard.cancel();
PoolConnection {
live: Some(inner),
close_on_drop: false,
pool,
}
}
pub fn release(self) {
self.guard.pool.clone().release(self);
}
async fn return_to_pool(mut self) -> bool {
if self.guard.pool.is_closed() {
self.close().await;
return false;
}
if is_beyond_max_lifetime(&self.inner, &self.guard.pool.options) {
self.close().await;
return false;
}
if let Some(test) = &self.guard.pool.options.after_release {
let meta = self.metadata();
match (test)(&mut self.inner.raw, meta).await {
Ok(true) => (),
Ok(false) => {
self.close().await;
return false;
}
Err(error) => {
tracing::warn!(%error, "error from `after_release`");
self.close_hard().await;
return false;
}
}
}
if let Err(error) = self.raw.ping().await {
tracing::warn!(
%error,
"error occurred while testing the connection on-release",
);
self.close_hard().await;
false
} else {
self.release();
true
}
}
pub async fn close(self) {
let _ = self.inner.raw.close().await;
}
pub async fn close_hard(self) {
let _ = self.inner.raw.close_hard().await;
}
pub fn detach(self) -> DB::Connection {
self.inner.raw
}
pub fn into_idle(self) -> Floating<DB, Idle<DB>> {
Floating {
inner: self.inner.into_idle(),
guard: self.guard,
}
}
pub fn metadata(&self) -> PoolConnectionMetadata {
PoolConnectionMetadata {
age: self.created_at.elapsed(),
idle_for: Duration::ZERO,
}
}
}
impl<DB: Database> Floating<DB, Idle<DB>> {
pub fn from_idle(
idle: Idle<DB>,
pool: Arc<PoolInner<DB>>,
permit: AsyncSemaphoreReleaser<'_>,
) -> Self {
Self {
inner: idle,
guard: DecrementSizeGuard::from_permit(pool, permit),
}
}
pub async fn ping(&mut self) -> Result<(), Error> {
self.live.raw.ping().await
}
pub fn into_live(self) -> Floating<DB, Live<DB>> {
Floating {
inner: self.inner.live,
guard: self.guard,
}
}
pub async fn close(self) -> DecrementSizeGuard<DB> {
if let Err(error) = self.inner.live.raw.close().await {
tracing::debug!(%error, "error occurred while closing the pool connection");
}
self.guard
}
pub async fn close_hard(self) -> DecrementSizeGuard<DB> {
let _ = self.inner.live.raw.close_hard().await;
self.guard
}
pub fn metadata(&self) -> PoolConnectionMetadata {
let now = Instant::now();
PoolConnectionMetadata {
age: now.saturating_duration_since(self.created_at),
idle_for: now.saturating_duration_since(self.idle_since),
}
}
}
impl<DB: Database, C> Deref for Floating<DB, C> {
type Target = C;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl<DB: Database, C> DerefMut for Floating<DB, C> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}