use std::{
ops::{Deref, DerefMut},
sync::Arc,
time::{Duration, Instant},
};
use async_std::sync::Sender;
use futures_util::future::FutureExt;
use crate::Database;
use self::inner::SharedPool;
pub use self::options::Builder;
use self::options::Options;
mod executor;
mod inner;
mod options;
pub struct Pool<DB>
where
DB: Database,
{
inner: Arc<SharedPool<DB>>,
pool_tx: Sender<Idle<DB>>,
}
struct Connection<DB: Database> {
raw: Option<Raw<DB>>,
pool_tx: Sender<Idle<DB>>,
}
struct Raw<DB: Database> {
inner: DB::Connection,
created: Instant,
}
struct Idle<DB: Database> {
raw: Raw<DB>,
since: Instant,
}
impl<DB> Pool<DB>
where
DB: Database,
DB::Connection: crate::Connection<Database = DB>,
{
pub async fn new(url: &str) -> crate::Result<Self> {
Self::builder().build(url).await
}
async fn with_options(url: &str, options: Options) -> crate::Result<Self> {
let (inner, pool_tx) = SharedPool::new_arc(url, options).await?;
Ok(Pool { inner, pool_tx })
}
pub fn builder() -> Builder<DB> {
Builder::new()
}
pub async fn acquire(&self) -> crate::Result<impl DerefMut<Target = DB::Connection>> {
self.inner.acquire().await.map(|conn| Connection {
raw: Some(conn),
pool_tx: self.pool_tx.clone(),
})
}
pub fn try_acquire(&self) -> Option<impl DerefMut<Target = DB::Connection>> {
self.inner.try_acquire().map(|conn| Connection {
raw: Some(conn),
pool_tx: self.pool_tx.clone(),
})
}
pub async fn close(&self) {
self.inner.close().await;
}
pub fn size(&self) -> u32 {
self.inner.size()
}
pub fn idle(&self) -> usize {
self.inner.num_idle()
}
pub fn max_size(&self) -> u32 {
self.inner.options().max_size
}
pub fn connect_timeout(&self) -> Duration {
self.inner.options().connect_timeout
}
pub fn min_size(&self) -> u32 {
self.inner.options().min_size
}
pub fn max_lifetime(&self) -> Option<Duration> {
self.inner.options().max_lifetime
}
pub fn idle_timeout(&self) -> Option<Duration> {
self.inner.options().idle_timeout
}
}
impl<DB> Clone for Pool<DB>
where
DB: Database,
{
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
pool_tx: self.pool_tx.clone(),
}
}
}
const DEREF_ERR: &str = "(bug) connection already released to pool";
impl<DB: Database> Deref for Connection<DB> {
type Target = DB::Connection;
fn deref(&self) -> &Self::Target {
&self.raw.as_ref().expect(DEREF_ERR).inner
}
}
impl<DB: Database> DerefMut for Connection<DB> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.raw.as_mut().expect(DEREF_ERR).inner
}
}
impl<DB: Database> Drop for Connection<DB> {
fn drop(&mut self) {
if let Some(conn) = self.raw.take() {
self.pool_tx
.send(Idle {
raw: conn,
since: Instant::now(),
})
.now_or_never()
.expect("(bug) connection released into a full pool")
}
}
}