pub trait ResourceManager {
type CreateAux: ?Sized;
type Error: From<crate::Error>;
type RecycleAux: ?Sized;
type Resource;
fn create(
&self,
aux: &Self::CreateAux,
) -> impl Future<Output = Result<Self::Resource, Self::Error>>;
fn is_invalid(&self, resource: &Self::Resource) -> bool;
fn recycle(
&self,
aux: &Self::RecycleAux,
resource: &mut Self::Resource,
) -> impl Future<Output = Result<(), Self::Error>>;
}
impl ResourceManager for () {
type CreateAux = ();
type Error = crate::Error;
type RecycleAux = ();
type Resource = ();
#[inline]
async fn create(&self, _: &Self::CreateAux) -> Result<Self::Resource, Self::Error> {
Ok(())
}
#[inline]
fn is_invalid(&self, _: &Self::Resource) -> bool {
false
}
#[inline]
async fn recycle(&self, _: &Self::RecycleAux, _: &mut Self::Resource) -> Result<(), Self::Error> {
Ok(())
}
}
#[derive(Debug)]
pub struct SimpleRM<F> {
pub cb: F,
}
impl<F> SimpleRM<F> {
#[inline]
pub const fn new(cb: F) -> Self {
Self { cb }
}
}
impl<E, F, R> ResourceManager for SimpleRM<F>
where
E: From<crate::Error>,
F: Fn() -> Result<R, E>,
{
type CreateAux = ();
type Error = E;
type RecycleAux = ();
type Resource = R;
#[inline]
async fn create(&self, _: &Self::CreateAux) -> Result<Self::Resource, Self::Error> {
(self.cb)()
}
#[inline]
fn is_invalid(&self, _: &Self::Resource) -> bool {
false
}
#[inline]
async fn recycle(&self, _: &Self::RecycleAux, _: &mut Self::Resource) -> Result<(), Self::Error> {
Ok(())
}
}
#[cfg(all(feature = "postgres", feature = "secret"))]
pub(crate) mod database {
use crate::{
collection::Vector,
database::{
DEFAULT_MAX_STMTS, Executor,
client::postgres::{ExecutorBuffer, PostgresExecutor},
},
misc::{Secret, SecretContext},
pool::ResourceManager,
rng::{ChaCha20, CryptoSeedableRng},
sync::AtomicCell,
};
use core::{marker::PhantomData, mem};
#[derive(Debug)]
pub struct PostgresRM<E, S> {
_certs: Option<Vector<u8>>,
_error: PhantomData<fn() -> E>,
_max_stmts: usize,
_rng: AtomicCell<ChaCha20>,
_stream: PhantomData<S>,
_uri: Secret,
}
macro_rules! _executor {
($uri_secret:expr, |$config:ident, $uri:ident| $cb:expr) => {{
$uri_secret
.peek(&mut Vector::new(), |secret| async move {
let string = unsafe { core::str::from_utf8_unchecked(&*secret) };
let $uri = crate::misc::UriRef::new(string);
let config_rslt = crate::database::client::postgres::Config::from_uri(&$uri);
let $config = config_rslt?;
$cb.await
})?
.await?
}};
}
impl<E> PostgresRM<E, ()> {
#[inline]
pub fn unit(
mut rng: ChaCha20,
secret_context: SecretContext,
uri: &mut [u8],
) -> crate::Result<Self> {
let _uri = Secret::new(uri, &mut rng, secret_context)?;
Ok(Self {
_certs: None,
_error: PhantomData,
_max_stmts: DEFAULT_MAX_STMTS,
_rng: AtomicCell::new(rng),
_stream: PhantomData,
_uri,
})
}
}
impl<E> ResourceManager for PostgresRM<E, ()>
where
E: From<crate::Error>,
{
type CreateAux = ();
type Error = E;
type RecycleAux = ();
type Resource = PostgresExecutor<E, ExecutorBuffer, ()>;
#[inline]
async fn create(&self, _: &Self::CreateAux) -> Result<Self::Resource, Self::Error> {
let mut rng = ChaCha20::from_crypto_rng(&mut &self._rng)?;
Ok(_executor!(&self._uri, |config, uri| {
PostgresExecutor::connect(
&config,
ExecutorBuffer::new(self._max_stmts, &mut rng),
&mut rng,
(),
)
}))
}
#[inline]
fn is_invalid(&self, resource: &Self::Resource) -> bool {
resource.connection_state().is_closed()
}
#[inline]
async fn recycle(
&self,
_: &Self::RecycleAux,
resource: &mut Self::Resource,
) -> Result<(), Self::Error> {
let mut rng = ChaCha20::from_crypto_rng(&mut &self._rng)?;
let mut buffer = ExecutorBuffer::new(self._max_stmts, &mut rng);
mem::swap(&mut buffer, &mut resource.eb);
*resource = _executor!(&self._uri, |config, uri| {
PostgresExecutor::connect(&config, buffer, &mut rng, ())
});
Ok(())
}
}
#[cfg(feature = "tokio")]
mod tokio {
use crate::{
collection::Vector,
database::{
DEFAULT_MAX_STMTS, Executor as _,
client::postgres::{ExecutorBuffer, PostgresExecutor},
},
misc::{Secret, SecretContext},
pool::{PostgresRM, ResourceManager},
rng::{ChaCha20, CryptoSeedableRng},
sync::AtomicCell,
};
use core::{marker::PhantomData, mem};
use tokio::net::TcpStream;
impl<E> PostgresRM<E, TcpStream> {
#[inline]
pub fn tokio(
mut rng: ChaCha20,
secret_context: SecretContext,
uri: &mut [u8],
) -> crate::Result<Self> {
let _uri = Secret::new(uri, &mut rng, secret_context)?;
Ok(Self {
_certs: None,
_error: PhantomData,
_max_stmts: DEFAULT_MAX_STMTS,
_rng: AtomicCell::new(rng),
_stream: PhantomData,
_uri,
})
}
}
impl<E> ResourceManager for PostgresRM<E, TcpStream>
where
E: From<crate::Error>,
{
type CreateAux = ();
type Error = E;
type RecycleAux = ();
type Resource = PostgresExecutor<E, ExecutorBuffer, TcpStream>;
#[inline]
async fn create(&self, _: &Self::CreateAux) -> Result<Self::Resource, Self::Error> {
let mut rng = ChaCha20::from_crypto_rng(&mut &self._rng)?;
Ok(_executor!(&self._uri, |config, uri| {
PostgresExecutor::connect(
&config,
ExecutorBuffer::new(self._max_stmts, &mut rng),
&mut rng,
TcpStream::connect(uri.hostname_with_implied_port())
.await
.map_err(crate::Error::from)?,
)
}))
}
#[inline]
fn is_invalid(&self, resource: &Self::Resource) -> bool {
resource.connection_state().is_closed()
}
#[inline]
async fn recycle(
&self,
_: &Self::RecycleAux,
resource: &mut Self::Resource,
) -> Result<(), Self::Error> {
let mut rng = ChaCha20::from_crypto_rng(&mut &self._rng)?;
let mut buffer = ExecutorBuffer::new(self._max_stmts, &mut rng);
mem::swap(&mut buffer, &mut resource.eb);
*resource = _executor!(&self._uri, |config, uri| {
PostgresExecutor::connect(
&config,
buffer,
&mut rng,
TcpStream::connect(uri.hostname_with_implied_port())
.await
.map_err(crate::Error::from)?,
)
});
Ok(())
}
}
}
#[cfg(feature = "tokio-rustls")]
mod tokio_rustls {
use crate::{
collection::Vector,
database::{
DEFAULT_MAX_STMTS, Executor as _,
client::postgres::{ExecutorBuffer, PostgresExecutor},
},
misc::{Secret, SecretContext, TokioRustlsConnector},
pool::{PostgresRM, ResourceManager},
rng::{ChaCha20, CryptoSeedableRng},
sync::AtomicCell,
};
use core::{marker::PhantomData, mem};
use tokio::net::TcpStream;
use tokio_rustls::client::TlsStream;
impl<E> PostgresRM<E, TlsStream<TcpStream>> {
#[inline]
pub fn tokio_rustls(
certs: Option<Vector<u8>>,
mut rng: ChaCha20,
secret_context: SecretContext,
uri: &mut [u8],
) -> crate::Result<Self> {
let _uri = Secret::new(uri, &mut rng, secret_context)?;
Ok(Self {
_certs: certs,
_error: PhantomData,
_max_stmts: DEFAULT_MAX_STMTS,
_rng: AtomicCell::new(rng),
_stream: PhantomData,
_uri,
})
}
}
impl<E> ResourceManager for PostgresRM<E, TlsStream<TcpStream>>
where
E: From<crate::Error>,
{
type CreateAux = ();
type Error = E;
type RecycleAux = ();
type Resource = PostgresExecutor<E, ExecutorBuffer, TlsStream<TcpStream>>;
#[inline]
async fn create(&self, _: &Self::CreateAux) -> Result<Self::Resource, Self::Error> {
let mut rng = ChaCha20::from_crypto_rng(&mut &self._rng)?;
Ok(_executor!(&self._uri, |config, uri| {
PostgresExecutor::connect_encrypted(
&config,
ExecutorBuffer::new(self._max_stmts, &mut rng),
&mut rng,
TcpStream::connect(uri.hostname_with_implied_port())
.await
.map_err(crate::Error::from)?,
|stream| async {
let mut rslt = TokioRustlsConnector::from_auto()?;
if let Some(elem) = &self._certs {
rslt = rslt.push_certs(elem.as_slice())?;
}
rslt.connect_without_client_auth(uri.hostname(), stream).await
},
)
}))
}
#[inline]
fn is_invalid(&self, resource: &Self::Resource) -> bool {
resource.connection_state().is_closed()
}
#[inline]
async fn recycle(
&self,
_: &Self::RecycleAux,
resource: &mut Self::Resource,
) -> Result<(), Self::Error> {
let mut rng = ChaCha20::from_crypto_rng(&mut &self._rng)?;
let mut buffer = ExecutorBuffer::new(self._max_stmts, &mut rng);
mem::swap(&mut buffer, &mut resource.eb);
*resource = _executor!(&self._uri, |config, uri| {
PostgresExecutor::connect_encrypted(
&config,
buffer,
&mut rng,
TcpStream::connect(uri.hostname_with_implied_port())
.await
.map_err(crate::Error::from)?,
|stream| async {
let mut rslt = TokioRustlsConnector::from_auto()?;
if let Some(elem) = &self._certs {
rslt = rslt.push_certs(elem.as_slice())?;
}
rslt.connect_without_client_auth(uri.hostname(), stream).await
},
)
});
Ok(())
}
}
}
}