use core::future::Future;
pub trait ResourceManager {
type CreateAux: ?Sized;
type Error: From<crate::Error>;
type RecycleAux: ?Sized;
type Resource;
fn create(
&self,
aux: &Self::CreateAux,
) -> impl Future<Output = Result<Self::Resource, Self::Error>>;
fn is_invalid(&self, resource: &Self::Resource) -> impl Future<Output = bool>;
fn recycle(
&self,
aux: &Self::RecycleAux,
resource: &mut Self::Resource,
) -> impl Future<Output = Result<(), Self::Error>>;
}
impl ResourceManager for () {
type CreateAux = ();
type Error = crate::Error;
type RecycleAux = ();
type Resource = ();
#[inline]
async fn create(&self, _: &Self::CreateAux) -> Result<Self::Resource, Self::Error> {
Ok(())
}
#[inline]
async fn is_invalid(&self, _: &Self::Resource) -> bool {
false
}
#[inline]
async fn recycle(&self, _: &Self::RecycleAux, _: &mut Self::Resource) -> Result<(), Self::Error> {
Ok(())
}
}
#[derive(Debug)]
pub struct SimpleRM<F> {
pub cb: F,
}
impl<F> SimpleRM<F> {
#[inline]
pub fn new(cb: F) -> Self {
Self { cb }
}
}
impl<E, F, R> ResourceManager for SimpleRM<F>
where
E: From<crate::Error>,
F: Fn() -> Result<R, E>,
{
type CreateAux = ();
type Error = E;
type RecycleAux = ();
type Resource = R;
#[inline]
async fn create(&self, _: &Self::CreateAux) -> Result<Self::Resource, Self::Error> {
(self.cb)()
}
#[inline]
async fn is_invalid(&self, _: &Self::Resource) -> bool {
false
}
#[inline]
async fn recycle(&self, _: &Self::RecycleAux, _: &mut Self::Resource) -> Result<(), Self::Error> {
Ok(())
}
}
#[cfg(feature = "postgres")]
pub(crate) mod database {
use alloc::string::String;
use core::marker::PhantomData;
#[derive(Debug)]
pub struct PostgresRM<E, RNG, S> {
_certs: Option<&'static [u8]>,
_error: PhantomData<fn() -> E>,
_max_stmts: usize,
_rng: RNG,
_stream: PhantomData<S>,
_uri: String,
}
macro_rules! _executor {
($uri_str:expr, |$config:ident, $uri:ident| $cb:expr) => {{
let $uri = crate::misc::UriRef::new($uri_str);
let config_rslt = crate::database::client::postgres::Config::from_uri(&$uri);
let $config = config_rslt?;
Ok($cb.await?)
}};
}
#[cfg(feature = "tokio")]
mod tokio {
use crate::{
database::{
DEFAULT_MAX_STMTS, Executor as _,
client::postgres::{ExecutorBuffer, PostgresExecutor},
},
misc::CryptoRng,
pool::{PostgresRM, ResourceManager},
};
use alloc::string::String;
use core::{marker::PhantomData, mem};
use tokio::net::TcpStream;
impl<E, RNG> PostgresRM<E, RNG, TcpStream> {
#[inline]
pub const fn tokio(rng: RNG, uri: String) -> Self {
Self {
_certs: None,
_error: PhantomData,
_max_stmts: DEFAULT_MAX_STMTS,
_rng: rng,
_stream: PhantomData,
_uri: uri,
}
}
}
impl<E, RNG> ResourceManager for PostgresRM<E, RNG, TcpStream>
where
E: From<crate::Error>,
RNG: Clone + CryptoRng,
{
type CreateAux = ();
type Error = E;
type RecycleAux = ();
type Resource = PostgresExecutor<E, ExecutorBuffer, TcpStream>;
#[inline]
async fn create(&self, _: &Self::CreateAux) -> Result<Self::Resource, Self::Error> {
let mut rng = self._rng.clone();
_executor!(&self._uri, |config, uri| {
PostgresExecutor::connect(
&config,
ExecutorBuffer::new(self._max_stmts, &mut rng),
&mut rng,
TcpStream::connect(uri.hostname_with_implied_port()).await.map_err(Into::into)?,
)
})
}
#[inline]
async fn is_invalid(&self, resource: &Self::Resource) -> bool {
resource.connection_state().is_closed()
}
#[inline]
async fn recycle(
&self,
_: &Self::RecycleAux,
resource: &mut Self::Resource,
) -> Result<(), Self::Error> {
let mut rng = self._rng.clone();
let mut buffer = ExecutorBuffer::new(self._max_stmts, &mut rng);
mem::swap(&mut buffer, &mut resource.eb);
*resource = _executor!(&self._uri, |config, uri| {
PostgresExecutor::connect(
&config,
buffer,
&mut rng,
TcpStream::connect(uri.hostname_with_implied_port()).await.map_err(Into::into)?,
)
})?;
Ok(())
}
}
}
#[cfg(feature = "tokio-rustls")]
mod tokio_rustls {
use crate::{
database::{
DEFAULT_MAX_STMTS, Executor as _,
client::postgres::{ExecutorBuffer, PostgresExecutor},
},
misc::{CryptoRng, TokioRustlsConnector},
pool::{PostgresRM, ResourceManager},
};
use alloc::string::String;
use core::{marker::PhantomData, mem};
use tokio::net::TcpStream;
use tokio_rustls::client::TlsStream;
impl<E, RNG> PostgresRM<E, RNG, TlsStream<TcpStream>> {
#[inline]
pub const fn tokio_rustls(certs: Option<&'static [u8]>, rng: RNG, uri: String) -> Self {
Self {
_certs: certs,
_error: PhantomData,
_max_stmts: DEFAULT_MAX_STMTS,
_rng: rng,
_stream: PhantomData,
_uri: uri,
}
}
}
impl<E, RNG> ResourceManager for PostgresRM<E, RNG, TlsStream<TcpStream>>
where
E: From<crate::Error>,
RNG: Clone + CryptoRng,
{
type CreateAux = ();
type Error = E;
type RecycleAux = ();
type Resource = PostgresExecutor<E, ExecutorBuffer, TlsStream<TcpStream>>;
#[inline]
async fn create(&self, _: &Self::CreateAux) -> Result<Self::Resource, Self::Error> {
let mut rng = self._rng.clone();
_executor!(&self._uri, |config, uri| {
PostgresExecutor::connect_encrypted(
&config,
ExecutorBuffer::new(self._max_stmts, &mut rng),
&mut rng,
TcpStream::connect(uri.hostname_with_implied_port()).await.map_err(Into::into)?,
|stream| async {
let mut rslt = TokioRustlsConnector::from_auto()?;
if let Some(elem) = self._certs {
rslt = rslt.push_certs(elem)?;
}
rslt.connect_without_client_auth(uri.hostname(), stream).await
},
)
})
}
#[inline]
async fn is_invalid(&self, resource: &Self::Resource) -> bool {
resource.connection_state().is_closed()
}
#[inline]
async fn recycle(
&self,
_: &Self::RecycleAux,
resource: &mut Self::Resource,
) -> Result<(), Self::Error> {
let mut rng = self._rng.clone();
let mut buffer = ExecutorBuffer::new(self._max_stmts, &mut rng);
mem::swap(&mut buffer, &mut resource.eb);
*resource = _executor!(&self._uri, |config, uri| {
PostgresExecutor::connect_encrypted(
&config,
buffer,
&mut rng,
TcpStream::connect(uri.hostname_with_implied_port()).await.map_err(Into::into)?,
|stream| async {
let mut rslt = TokioRustlsConnector::from_auto()?;
if let Some(elem) = self._certs {
rslt = rslt.push_certs(elem)?;
}
rslt.connect_without_client_auth(uri.hostname(), stream).await
},
)
})?;
Ok(())
}
}
}
}