use actix::prelude::*;
use bb8_postgres::{
bb8::{Pool, RunError},
tokio_postgres::{
config::Config,
error::Error,
tls::{MakeTlsConnect, TlsConnect},
Socket,
},
PostgresConnectionManager,
};
use std::marker::PhantomData;
use std::marker::Unpin;
use std::str::FromStr;
pub struct PostgresActor<Tls>
where
Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + Unpin,
<Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
<Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
<<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send + Unpin,
{
config: Config,
tls: Tls,
pool: Option<Pool<PostgresConnectionManager<Tls>>>,
}
impl<Tls> PostgresActor<Tls>
where
Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + Unpin,
<Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
<Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
<<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send + Unpin,
{
pub fn start(path: &str, tls: Tls) -> Result<Addr<PostgresActor<Tls>>, Error> {
let config = Config::from_str(path)?;
Ok(Supervisor::start(|_| PostgresActor {
config: config,
tls: tls,
pool: None,
}))
}
}
impl<Tls> Actor for PostgresActor<Tls>
where
Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + Unpin,
<Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
<Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
<<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send + Unpin,
{
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Context<Self>) {
let mgr = PostgresConnectionManager::new(self.config.clone(), self.tls.clone());
Pool::builder()
.build(mgr)
.into_actor(self)
.then(|res, act, _ctx| {
act.pool = Some(res.unwrap());
async {}.into_actor(act)
})
.wait(ctx);
}
}
impl<Tls> Supervised for PostgresActor<Tls>
where
Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + Unpin,
<Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
<Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
<<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send + Unpin,
{
fn restarting(&mut self, _: &mut Self::Context) {
self.pool.take();
}
}
#[derive(Debug)]
pub struct PostgresMessage<F, Tls, R>
where
Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + Unpin,
<Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
<Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
<<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send + Unpin,
F: FnOnce(Pool<PostgresConnectionManager<Tls>>) -> ResponseFuture<Result<R, PostgresError>>
+ 'static,
{
query: F,
phantom: PhantomData<Tls>,
}
impl<F, Tls, R> Message for PostgresMessage<F, Tls, R>
where
R: 'static,
Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + Unpin,
<Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
<Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
<<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send + Unpin,
F: FnOnce(Pool<PostgresConnectionManager<Tls>>) -> ResponseFuture<Result<R, PostgresError>>
+ 'static
+ Send
+ Sync,
{
type Result = Result<R, PostgresError>;
}
impl<F, Tls, R> PostgresMessage<F, Tls, R>
where
Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + Unpin,
<Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
<Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
<<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send + Unpin,
F: FnOnce(Pool<PostgresConnectionManager<Tls>>) -> ResponseFuture<Result<R, PostgresError>>
+ 'static
+ Send
+ Sync,
{
pub fn new(query: F) -> Self {
Self {
query: query,
phantom: PhantomData,
}
}
}
impl<F, Tls, R> Handler<PostgresMessage<F, Tls, R>> for PostgresActor<Tls>
where
R: 'static + Send,
Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + Unpin,
<Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
<Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
<<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send + Unpin,
F: FnOnce(Pool<PostgresConnectionManager<Tls>>) -> ResponseFuture<Result<R, PostgresError>>
+ 'static
+ Send
+ Sync,
{
type Result = ResponseFuture<Result<R, PostgresError>>;
fn handle(
&mut self,
msg: PostgresMessage<F, Tls, R>,
_ctx: &mut Self::Context,
) -> Self::Result {
if let Some(pool) = &self.pool {
let pool = pool.clone();
Box::pin(async move { (msg.query)(pool).await })
} else {
Box::pin(async { Err(PostgresError::PoolNone) })
}
}
}
#[derive(Debug)]
pub enum PostgresError {
PgError(Error),
BB8Error(RunError<Error>),
PoolNone,
Other(String),
}
impl From<Error> for PostgresError {
fn from(err: Error) -> Self {
Self::PgError(err)
}
}
impl From<RunError<Error>> for PostgresError {
fn from(err: RunError<Error>) -> Self {
Self::BB8Error(err)
}
}