1use actix::prelude::*;
2use bb8_postgres::{
3 bb8::{Pool, RunError},
4 tokio_postgres::{
5 config::Config,
6 error::Error,
7 tls::{MakeTlsConnect, TlsConnect},
8 Socket,
9 },
10 PostgresConnectionManager,
11};
12use std::marker::PhantomData;
13use std::marker::Unpin;
14use std::str::FromStr;
15use thiserror::Error;
16
17pub struct PostgresActor<Tls>
19where
20 Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + Unpin,
21 <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
22 <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
23 <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send + Unpin,
24{
25 config: Config,
26 tls: Tls,
27 pool: Option<Pool<PostgresConnectionManager<Tls>>>,
28}
29
30impl<Tls> PostgresActor<Tls>
31where
32 Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + Unpin,
33 <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
34 <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
35 <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send + Unpin,
36{
37 pub fn start(path: &str, tls: Tls) -> Result<Addr<PostgresActor<Tls>>, Error> {
39 let config = Config::from_str(path)?;
40 Ok(Supervisor::start(|_| PostgresActor {
41 config: config,
42 tls: tls,
43 pool: None,
44 }))
45 }
46}
47
48impl<Tls> Actor for PostgresActor<Tls>
49where
50 Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + Unpin,
51 <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
52 <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
53 <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send + Unpin,
54{
55 type Context = Context<Self>;
56
57 fn started(&mut self, ctx: &mut Context<Self>) {
58 let mgr = PostgresConnectionManager::new(self.config.clone(), self.tls.clone());
59 Pool::builder()
60 .build(mgr)
61 .into_actor(self)
62 .then(|res, act, _ctx| {
63 act.pool = Some(res.unwrap());
64 async {}.into_actor(act)
65 })
66 .wait(ctx);
67 }
68}
69
70impl<Tls> Supervised for PostgresActor<Tls>
71where
72 Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + Unpin,
73 <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
74 <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
75 <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send + Unpin,
76{
77 fn restarting(&mut self, _: &mut Self::Context) {
78 self.pool.take();
79 }
80}
81
82#[derive(Debug)]
84pub struct PostgresMessage<F, Tls, R>
85where
86 Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + Unpin,
87 <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
88 <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
89 <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send + Unpin,
90 F: FnOnce(Pool<PostgresConnectionManager<Tls>>) -> ResponseFuture<Result<R, PostgresError>>
91 + 'static,
92{
93 query: F,
94 phantom: PhantomData<Tls>,
95}
96
97impl<F, Tls, R> Message for PostgresMessage<F, Tls, R>
98where
99 R: 'static,
100 Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + Unpin,
101 <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
102 <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
103 <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send + Unpin,
104 F: FnOnce(Pool<PostgresConnectionManager<Tls>>) -> ResponseFuture<Result<R, PostgresError>>
105 + 'static
106 + Send
107 + Sync,
108{
109 type Result = Result<R, PostgresError>;
110}
111
112impl<F, Tls, R> PostgresMessage<F, Tls, R>
113where
114 Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + Unpin,
115 <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
116 <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
117 <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send + Unpin,
118 F: FnOnce(Pool<PostgresConnectionManager<Tls>>) -> ResponseFuture<Result<R, PostgresError>>
119 + 'static
120 + Send
121 + Sync,
122{
123 pub fn new(query: F) -> Self {
124 Self {
125 query: query,
126 phantom: PhantomData,
127 }
128 }
129}
130
131impl<F, Tls, R> Handler<PostgresMessage<F, Tls, R>> for PostgresActor<Tls>
132where
133 R: 'static + Send,
134 Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + Unpin,
135 <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
136 <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
137 <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send + Unpin,
138 F: FnOnce(Pool<PostgresConnectionManager<Tls>>) -> ResponseFuture<Result<R, PostgresError>>
139 + 'static
140 + Send
141 + Sync,
142{
143 type Result = ResponseFuture<Result<R, PostgresError>>;
144
145 fn handle(
146 &mut self,
147 msg: PostgresMessage<F, Tls, R>,
148 _ctx: &mut Self::Context,
149 ) -> Self::Result {
150 if let Some(pool) = &self.pool {
151 let pool = pool.clone();
152 Box::pin(async move { (msg.query)(pool).await })
153 } else {
154 Box::pin(async { Err(PostgresError::PoolNone) })
155 }
156 }
157}
158
159#[derive(Debug, Error)]
161pub enum PostgresError {
162 #[error(transparent)]
164 PgError(#[from] Error),
165 #[error(transparent)]
167 BB8Error(#[from] RunError<Error>),
168 #[error("connection pool not initialized")]
170 PoolNone,
171 #[error("user error: {0}")]
173 Other(String),
174}