use std::fmt::{self, Debug, Formatter};
use std::future::Future;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::time::{Duration, Instant};
use crate::conn::{Connection, ConnectOptions};
use crate::error::Error;
use crate::PoolConnectionMetadata;
use crate::sync::AsyncSemaphoreReleaser;
use super::inner::{DecrementSizeGuard, PoolInner};
pub struct PoolConnection<C: Connection> {
live: Option<Live<C>>,
pub(crate) pool: Arc<PoolInner<C>>,
}
pub(super) struct Live<C: Connection> {
pub(super) raw: C,
pub(super) created_at: Instant,
}
pub(super) struct Idle<C: Connection> {
pub(super) live: Live<C>,
pub(super) idle_since: Instant,
}
pub(super) struct Floating<Conn: Connection, C> {
pub(super) inner: C,
pub(super) guard: DecrementSizeGuard<Conn>,
}
const EXPECT_MSG: &str = "BUG: inner connection already taken!";
impl<C: Connection> Debug for PoolConnection<C> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("PoolConnection").finish()
}
}
impl<C: Connection> Deref for PoolConnection<C> {
type Target = C;
fn deref(&self) -> &Self::Target {
&self.live.as_ref().expect(EXPECT_MSG).raw
}
}
impl<C: Connection> DerefMut for PoolConnection<C> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.live.as_mut().expect(EXPECT_MSG).raw
}
}
impl<C: Connection> AsRef<C> for PoolConnection<C> {
fn as_ref(&self) -> &C {
self
}
}
impl<C: Connection> AsMut<C> for PoolConnection<C> {
fn as_mut(&mut self) -> &mut C {
self
}
}
impl<C: Connection> PoolConnection<C> {
pub async fn close(mut self) -> Result<(), Error> {
let floating = self.take_live().float(self.pool.clone());
floating.inner.raw.close().await
}
pub fn detach(mut self) -> C {
self.take_live().float(self.pool.clone()).detach()
}
pub fn leak(mut self) -> C {
self.take_live().raw
}
fn take_live(&mut self) -> Live<C> {
self.live.take().expect(EXPECT_MSG)
}
#[doc(hidden)]
pub fn return_to_pool(&mut self) -> impl Future<Output=()> + Send + 'static {
let floating: Option<Floating<C, Live<C>>> =
self.live.take().map(|live| live.float(self.pool.clone()));
let pool = self.pool.clone();
async move {
let returned_to_pool = if let Some(floating) = floating {
floating.return_to_pool().await
} else {
false
};
if !returned_to_pool {
pool.min_connections_maintenance(None).await;
}
}
}
}
impl<C: Connection> Drop for PoolConnection<C> {
fn drop(&mut self) {
if self.live.is_some() || self.pool.options.min_connections > 0 {
tokio::spawn(self.return_to_pool());
}
}
}
impl<C: Connection> Live<C> {
pub fn float(self, pool: Arc<PoolInner<C>>) -> Floating<C, Self> {
Floating {
inner: self,
guard: DecrementSizeGuard::new_permit(pool),
}
}
pub fn into_idle(self) -> Idle<C> {
Idle {
live: self,
idle_since: Instant::now(),
}
}
}
impl<C: Connection> Deref for Idle<C> {
type Target = Live<C>;
fn deref(&self) -> &Self::Target {
&self.live
}
}
impl<C: Connection> DerefMut for Idle<C> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.live
}
}
impl<C: Connection> Floating<C, Live<C>> {
pub fn new_live(conn: C, guard: DecrementSizeGuard<C>) -> Self {
Self {
inner: Live {
raw: conn,
created_at: Instant::now(),
},
guard,
}
}
pub fn reattach(self) -> PoolConnection<C> {
let Floating { inner, guard } = self;
let pool = Arc::clone(&guard.pool);
guard.cancel();
PoolConnection {
live: Some(inner),
pool,
}
}
pub fn release(self) {
self.guard.pool.clone().release(self);
}
async fn return_to_pool(mut self) -> bool {
if self.guard.pool.is_closed() {
self.close().await;
return false;
}
if let Some(test) = &self.guard.pool.options.after_release {
let meta = self.metadata();
match (test)(&mut self.inner.raw, meta).await {
Ok(true) => (),
Ok(false) => {
self.close().await;
return false;
}
Err(error) => {
tracing::warn!(%error, "error from `after_release`");
self.close_hard().await;
return false;
}
}
}
self.release();
true
}
pub async fn close(self) {
let _ = self.inner.raw.close().await;
}
pub async fn close_hard(self) {
let _ = self.inner.raw.close_hard().await;
}
pub fn detach(self) -> C {
self.inner.raw
}
pub fn into_idle(self) -> Floating<C, Idle<C>> {
Floating {
inner: self.inner.into_idle(),
guard: self.guard,
}
}
pub fn metadata(&self) -> PoolConnectionMetadata {
PoolConnectionMetadata {
age: self.created_at.elapsed(),
idle_for: Duration::ZERO,
}
}
}
impl<C: Connection> Floating<C, Idle<C>> {
pub fn from_idle(
idle: Idle<C>,
pool: Arc<PoolInner<C>>,
permit: AsyncSemaphoreReleaser<'_>,
) -> Self {
Self {
inner: idle,
guard: DecrementSizeGuard::from_permit(pool, permit),
}
}
pub async fn ping(&mut self) -> Result<(), Error> {
self.live.raw.ping().await
}
pub fn into_live(self) -> Floating<C, Live<C>> {
Floating {
inner: self.inner.live,
guard: self.guard,
}
}
pub async fn close(self) -> DecrementSizeGuard<C> {
if let Err(error) = self.inner.live.raw.close().await {
tracing::debug!(%error, "error occurred while closing the pool connection");
}
self.guard
}
pub async fn close_hard(self) -> DecrementSizeGuard<C> {
let _ = self.inner.live.raw.close_hard().await;
self.guard
}
pub fn metadata(&self) -> PoolConnectionMetadata {
let now = Instant::now();
PoolConnectionMetadata {
age: now.saturating_duration_since(self.created_at),
idle_for: now.saturating_duration_since(self.idle_since),
}
}
}
impl<Conn: Connection, C> Deref for Floating<Conn, C> {
type Target = C;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl<Conn: Connection, C> DerefMut for Floating<Conn, C> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}