use std::fmt::{self, Debug, Formatter};
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::time::{Duration, Instant};
use futures_intrusive::sync::SemaphoreReleaser;
use crate::connection::Connection;
use crate::database::Database;
use crate::error::Error;
use super::inner::{DecrementSizeGuard, PoolInner};
use crate::pool::options::PoolConnectionMetadata;
use std::future::Future;
pub struct PoolConnection<DB: Database> {
live: Option<Live<DB>>,
pub(crate) pool: Arc<PoolInner<DB>>,
}
pub(super) struct Live<DB: Database> {
pub(super) raw: DB::Connection,
pub(super) created_at: Instant,
}
pub(super) struct Idle<DB: Database> {
pub(super) live: Live<DB>,
pub(super) idle_since: Instant,
}
pub(super) struct Floating<DB: Database, C> {
pub(super) inner: C,
pub(super) guard: DecrementSizeGuard<DB>,
}
const EXPECT_MSG: &str = "BUG: inner connection already taken!";
impl<DB: Database> Debug for PoolConnection<DB> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("PoolConnection").finish()
}
}
impl<DB: Database> Deref for PoolConnection<DB> {
type Target = DB::Connection;
fn deref(&self) -> &Self::Target {
&self.live.as_ref().expect(EXPECT_MSG).raw
}
}
impl<DB: Database> DerefMut for PoolConnection<DB> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.live.as_mut().expect(EXPECT_MSG).raw
}
}
impl<DB: Database> AsRef<DB::Connection> for PoolConnection<DB> {
fn as_ref(&self) -> &DB::Connection {
self
}
}
impl<DB: Database> AsMut<DB::Connection> for PoolConnection<DB> {
fn as_mut(&mut self) -> &mut DB::Connection {
self
}
}
impl<DB: Database> PoolConnection<DB> {
pub fn detach(mut self) -> DB::Connection {
self.take_live().float(self.pool.clone()).detach()
}
pub fn leak(mut self) -> DB::Connection {
self.take_live().raw
}
fn take_live(&mut self) -> Live<DB> {
self.live.take().expect(EXPECT_MSG)
}
pub(crate) fn return_to_pool(&mut self) -> impl Future<Output = ()> + Send + 'static {
let floating: Option<Floating<DB, Live<DB>>> =
self.live.take().map(|live| live.float(self.pool.clone()));
let pool = self.pool.clone();
async move {
let returned_to_pool = if let Some(floating) = floating {
floating.return_to_pool().await
} else {
false
};
if !returned_to_pool {
pool.min_connections_maintenance(None).await;
}
}
}
}
impl<DB: Database> Drop for PoolConnection<DB> {
fn drop(&mut self) {
if self.live.is_some() || self.pool.options.min_connections > 0 {
#[cfg(not(feature = "_rt-async-std"))]
if let Ok(handle) = sqlx_rt::Handle::try_current() {
handle.spawn(self.return_to_pool());
}
#[cfg(feature = "_rt-async-std")]
sqlx_rt::spawn(self.return_to_pool());
}
}
}
impl<DB: Database> Live<DB> {
pub fn float(self, pool: Arc<PoolInner<DB>>) -> Floating<DB, Self> {
Floating {
inner: self,
guard: DecrementSizeGuard::new_permit(pool),
}
}
pub fn into_idle(self) -> Idle<DB> {
Idle {
live: self,
idle_since: Instant::now(),
}
}
}
impl<DB: Database> Deref for Idle<DB> {
type Target = Live<DB>;
fn deref(&self) -> &Self::Target {
&self.live
}
}
impl<DB: Database> DerefMut for Idle<DB> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.live
}
}
impl<DB: Database> Floating<DB, Live<DB>> {
pub fn new_live(conn: DB::Connection, guard: DecrementSizeGuard<DB>) -> Self {
Self {
inner: Live {
raw: conn,
created_at: Instant::now(),
},
guard,
}
}
pub fn reattach(self) -> PoolConnection<DB> {
let Floating { inner, guard } = self;
let pool = Arc::clone(&guard.pool);
guard.cancel();
PoolConnection {
live: Some(inner),
pool,
}
}
pub fn release(self) {
self.guard.pool.clone().release(self);
}
async fn return_to_pool(mut self) -> bool {
if self.guard.pool.is_closed() {
self.close().await;
return false;
}
if let Some(test) = &self.guard.pool.options.after_release {
let meta = self.metadata();
match (test)(&mut self.inner.raw, meta).await {
Ok(true) => (),
Ok(false) => {
self.close().await;
return false;
}
Err(e) => {
log::warn!("error from after_release: {}", e);
self.close_hard().await;
return false;
}
}
}
if let Err(e) = self.raw.ping().await {
log::warn!(
"error occurred while testing the connection on-release: {}",
e
);
self.close_hard().await;
false
} else {
self.release();
true
}
}
pub async fn close(self) {
let _ = self.inner.raw.close().await;
}
pub async fn close_hard(self) {
let _ = self.inner.raw.close_hard().await;
}
pub fn detach(self) -> DB::Connection {
self.inner.raw
}
pub fn into_idle(self) -> Floating<DB, Idle<DB>> {
Floating {
inner: self.inner.into_idle(),
guard: self.guard,
}
}
pub fn metadata(&self) -> PoolConnectionMetadata {
PoolConnectionMetadata {
age: self.created_at.elapsed(),
idle_for: Duration::ZERO,
}
}
}
impl<DB: Database> Floating<DB, Idle<DB>> {
pub fn from_idle(
idle: Idle<DB>,
pool: Arc<PoolInner<DB>>,
permit: SemaphoreReleaser<'_>,
) -> Self {
Self {
inner: idle,
guard: DecrementSizeGuard::from_permit(pool, permit),
}
}
pub async fn ping(&mut self) -> Result<(), Error> {
self.live.raw.ping().await
}
pub fn into_live(self) -> Floating<DB, Live<DB>> {
Floating {
inner: self.inner.live,
guard: self.guard,
}
}
pub async fn close(self) -> DecrementSizeGuard<DB> {
if let Err(e) = self.inner.live.raw.close().await {
log::debug!("error occurred while closing the pool connection: {}", e);
}
self.guard
}
pub async fn close_hard(self) -> DecrementSizeGuard<DB> {
let _ = self.inner.live.raw.close_hard().await;
self.guard
}
pub fn metadata(&self) -> PoolConnectionMetadata {
let now = Instant::now();
PoolConnectionMetadata {
age: now.saturating_duration_since(self.created_at),
idle_for: now.saturating_duration_since(self.idle_since),
}
}
}
impl<DB: Database, C> Deref for Floating<DB, C> {
type Target = C;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl<DB: Database, C> DerefMut for Floating<DB, C> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}