use core::{
future::Future,
mem,
num::NonZeroUsize,
ops::Deref,
pin::Pin,
task::{Context, Poll},
};
use lru::LruCache;
use tokio::sync::SemaphorePermit;
use xitca_io::bytes::BytesMut;
use crate::{
BoxedFuture,
client::{Client, ClientBorrow, ClientBorrowMut},
copy::{r#Copy, CopyIn, CopyOut},
driver::codec::AsParams,
error::Error,
execute::Execute,
query::{RowAffected, RowStreamOwned},
session::Session,
statement::{Statement, StatementNamed, StatementQuery},
transaction::{Transaction, TransactionBuilder},
};
use super::Pool;
pub struct PoolConnection<'a> {
pub(super) pool: &'a Pool,
pub(super) conn: Option<PoolClient>,
pub(super) _permit: SemaphorePermit<'a>,
}
impl<'a> PoolConnection<'a> {
#[inline]
pub fn transaction(&mut self) -> impl Future<Output = Result<Transaction<'_, Self>, Error>> + Send {
TransactionBuilder::new().begin(self)
}
#[inline]
pub fn transaction_owned(self) -> impl Future<Output = Result<Transaction<'a, Self>, Error>> + Send {
TransactionBuilder::new().begin_owned(self)
}
#[inline]
pub fn copy_in(&mut self, stmt: &Statement) -> impl Future<Output = Result<CopyIn<'_, Self>, Error>> + Send {
CopyIn::new(self, stmt)
}
#[inline]
pub async fn copy_out(&self, stmt: &Statement) -> Result<CopyOut, Error> {
CopyOut::new(self, stmt).await
}
#[inline(always)]
pub fn consume(self) -> Self {
self
}
pub fn cancel_token(&self) -> Session {
self.conn().client.cancel_token()
}
fn insert_cache<'c>(cache: &'c mut Cache, cli: &Client, named: &str, stmt: Statement) -> &'c CachedStatement {
if let Some((_, stmt)) = cache.push(Box::from(named), CachedStatement { stmt }) {
drop(stmt.stmt.into_guarded(&cli));
}
cache.peek_mru().unwrap().1
}
fn conn(&self) -> &PoolClient {
self.conn.as_ref().unwrap()
}
fn conn_mut(&mut self) -> &mut PoolClient {
self.conn.as_mut().unwrap()
}
}
impl ClientBorrow for PoolConnection<'_> {
#[inline]
fn borrow_cli_ref(&self) -> &Client {
&self.conn().client
}
}
impl ClientBorrowMut for PoolConnection<'_> {
#[inline]
fn borrow_cli_mut(&mut self) -> &mut Client {
&mut self.conn_mut().client
}
}
impl r#Copy for PoolConnection<'_> {
#[inline]
fn send_one_way<F>(&self, func: F) -> Result<(), Error>
where
F: FnOnce(&mut BytesMut) -> Result<(), Error>,
{
self.conn().client.send_one_way(func)
}
}
impl Drop for PoolConnection<'_> {
fn drop(&mut self) {
let conn = self.conn.take().unwrap();
self.pool.conn.lock().unwrap().push_back(conn);
}
}
pub struct CachedStatement {
stmt: Statement,
}
impl Clone for CachedStatement {
fn clone(&self) -> Self {
Self {
stmt: self.stmt.duplicate(),
}
}
}
impl Deref for CachedStatement {
type Target = Statement;
fn deref(&self) -> &Self::Target {
&self.stmt
}
}
pub(super) struct PoolClient {
client: Client,
cache: Cache,
}
impl PoolClient {
pub(super) fn closed(&self) -> bool {
self.client.closed()
}
}
type Cache = LruCache<Box<str>, CachedStatement>;
impl PoolClient {
pub(super) fn new(client: Client, cap: NonZeroUsize) -> Self {
Self {
client,
cache: LruCache::new(cap),
}
}
}
impl<'c, E> Execute<&'c PoolConnection<'_>> for E
where
E: Execute<&'c Client>,
{
type ExecuteOutput = E::ExecuteOutput;
type QueryOutput = E::QueryOutput;
#[inline]
fn execute(self, cli: &'c PoolConnection<'_>) -> Self::ExecuteOutput {
E::execute(self, cli.borrow_cli_ref())
}
#[inline]
fn query(self, cli: &'c PoolConnection<'_>) -> Self::QueryOutput {
E::query(self, cli.borrow_cli_ref())
}
}
impl<'c, 's> Execute<&'c mut PoolConnection<'_>> for StatementNamed<'s>
where
's: 'c,
{
type ExecuteOutput = StatementCacheFuture<'c>;
type QueryOutput = Self::ExecuteOutput;
fn execute(self, cli: &'c mut PoolConnection) -> Self::ExecuteOutput {
match cli.conn_mut().cache.get(self.stmt) {
Some(stmt) => StatementCacheFuture::Cached(stmt.clone()),
None => StatementCacheFuture::Prepared(Box::pin(async move {
let conn = cli.conn_mut();
let name = self.stmt;
let stmt = self.execute(&conn.client).await?.leak();
Ok(PoolConnection::insert_cache(&mut conn.cache, &conn.client, name, stmt).clone())
})),
}
}
#[inline]
fn query(self, cli: &'c mut PoolConnection) -> Self::QueryOutput {
self.execute(cli)
}
}
pub enum StatementCacheFuture<'c> {
Cached(CachedStatement),
Prepared(BoxedFuture<'c, Result<CachedStatement, Error>>),
Done,
}
impl Future for StatementCacheFuture<'_> {
type Output = Result<CachedStatement, Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
match mem::replace(this, Self::Done) {
Self::Cached(stmt) => Poll::Ready(Ok(stmt)),
Self::Prepared(mut fut) => {
let res = fut.as_mut().poll(cx);
if res.is_pending() {
drop(mem::replace(this, Self::Prepared(fut)));
}
res
}
Self::Done => panic!("StatementCacheFuture polled after finish"),
}
}
}
#[cfg(not(feature = "nightly"))]
impl<'c, 's, P> Execute<&'c mut PoolConnection<'_>> for StatementQuery<'s, P>
where
P: AsParams + Send + 'c,
's: 'c,
{
type ExecuteOutput = BoxedFuture<'c, Result<RowAffected, Error>>;
type QueryOutput = BoxedFuture<'c, Result<RowStreamOwned, Error>>;
fn execute(self, conn: &'c mut PoolConnection<'_>) -> Self::ExecuteOutput {
Box::pin(async move {
let StatementQuery { stmt, types, params } = self;
let conn = conn.conn_mut();
let stmt = match conn.cache.get(stmt) {
Some(stmt) => stmt,
None => {
let prepared_stmt = Statement::named(stmt, types).execute(&conn.client).await?.leak();
PoolConnection::insert_cache(&mut conn.cache, &conn.client, stmt, prepared_stmt)
}
};
stmt.bind(params).query(&conn.client).await.map(RowAffected::from)
})
}
fn query(self, conn: &'c mut PoolConnection<'_>) -> Self::QueryOutput {
Box::pin(async move {
let StatementQuery { stmt, types, params } = self;
let conn = conn.conn_mut();
let stmt = match conn.cache.get(stmt) {
Some(stmt) => stmt,
None => {
let prepared_stmt = Statement::named(stmt, types).execute(&conn.client).await?.leak();
PoolConnection::insert_cache(&mut conn.cache, &conn.client, stmt, prepared_stmt)
}
};
stmt.bind(params).into_owned().query(&conn.client).await
})
}
}
#[cfg(feature = "nightly")]
impl<'c, 's, 'p, P> Execute<&'c mut PoolConnection<'p>> for StatementQuery<'s, P>
where
P: AsParams + Send + 'c,
's: 'c,
'p: 'c,
{
type ExecuteOutput = impl Future<Output = Result<RowAffected, Error>> + Send + 'c;
type QueryOutput = impl Future<Output = Result<RowStreamOwned, Error>> + Send + 'c;
fn execute(self, conn: &'c mut PoolConnection<'p>) -> Self::ExecuteOutput {
async move {
let StatementQuery { stmt, types, params } = self;
let conn = conn.conn_mut();
let stmt = match conn.cache.get(stmt) {
Some(stmt) => stmt,
None => {
let prepared_stmt = Statement::named(stmt, types).execute(&conn.client).await?.leak();
PoolConnection::insert_cache(&mut conn.cache, &conn.client, stmt, prepared_stmt)
}
};
stmt.bind(params).query(&conn.client).await.map(RowAffected::from)
}
}
fn query(self, conn: &'c mut PoolConnection<'p>) -> Self::QueryOutput {
async move {
let StatementQuery { stmt, types, params } = self;
let conn = conn.conn_mut();
let stmt = match conn.cache.get(stmt) {
Some(stmt) => stmt,
None => {
let prepared_stmt = Statement::named(stmt, types).execute(&conn.client).await?.leak();
PoolConnection::insert_cache(&mut conn.cache, &conn.client, stmt, prepared_stmt)
}
};
stmt.bind(params).into_owned().query(&conn.client).await
}
}
}