actix_postgres/
postgres.rs

1use actix::prelude::*;
2use bb8_postgres::{
3    bb8::{Pool, RunError},
4    tokio_postgres::{
5        config::Config,
6        error::Error,
7        tls::{MakeTlsConnect, TlsConnect},
8        Socket,
9    },
10    PostgresConnectionManager,
11};
12use std::marker::PhantomData;
13use std::marker::Unpin;
14use std::str::FromStr;
15use thiserror::Error;
16
17/// PostgreSQL Actor
18pub struct PostgresActor<Tls>
19where
20    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + Unpin,
21    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
22    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
23    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send + Unpin,
24{
25    config: Config,
26    tls: Tls,
27    pool: Option<Pool<PostgresConnectionManager<Tls>>>,
28}
29
30impl<Tls> PostgresActor<Tls>
31where
32    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + Unpin,
33    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
34    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
35    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send + Unpin,
36{
37    /// Start new Supervisor with PostgresActor
38    pub fn start(path: &str, tls: Tls) -> Result<Addr<PostgresActor<Tls>>, Error> {
39        let config = Config::from_str(path)?;
40        Ok(Supervisor::start(|_| PostgresActor {
41            config: config,
42            tls: tls,
43            pool: None,
44        }))
45    }
46}
47
48impl<Tls> Actor for PostgresActor<Tls>
49where
50    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + Unpin,
51    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
52    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
53    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send + Unpin,
54{
55    type Context = Context<Self>;
56
57    fn started(&mut self, ctx: &mut Context<Self>) {
58        let mgr = PostgresConnectionManager::new(self.config.clone(), self.tls.clone());
59        Pool::builder()
60            .build(mgr)
61            .into_actor(self)
62            .then(|res, act, _ctx| {
63                act.pool = Some(res.unwrap());
64                async {}.into_actor(act)
65            })
66            .wait(ctx);
67    }
68}
69
70impl<Tls> Supervised for PostgresActor<Tls>
71where
72    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + Unpin,
73    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
74    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
75    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send + Unpin,
76{
77    fn restarting(&mut self, _: &mut Self::Context) {
78        self.pool.take();
79    }
80}
81
82/// PostgreSQL Message
83#[derive(Debug)]
84pub struct PostgresMessage<F, Tls, R>
85where
86    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + Unpin,
87    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
88    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
89    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send + Unpin,
90    F: FnOnce(Pool<PostgresConnectionManager<Tls>>) -> ResponseFuture<Result<R, PostgresError>>
91        + 'static,
92{
93    query: F,
94    phantom: PhantomData<Tls>,
95}
96
97impl<F, Tls, R> Message for PostgresMessage<F, Tls, R>
98where
99    R: 'static,
100    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + Unpin,
101    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
102    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
103    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send + Unpin,
104    F: FnOnce(Pool<PostgresConnectionManager<Tls>>) -> ResponseFuture<Result<R, PostgresError>>
105        + 'static
106        + Send
107        + Sync,
108{
109    type Result = Result<R, PostgresError>;
110}
111
112impl<F, Tls, R> PostgresMessage<F, Tls, R>
113where
114    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + Unpin,
115    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
116    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
117    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send + Unpin,
118    F: FnOnce(Pool<PostgresConnectionManager<Tls>>) -> ResponseFuture<Result<R, PostgresError>>
119        + 'static
120        + Send
121        + Sync,
122{
123    pub fn new(query: F) -> Self {
124        Self {
125            query: query,
126            phantom: PhantomData,
127        }
128    }
129}
130
131impl<F, Tls, R> Handler<PostgresMessage<F, Tls, R>> for PostgresActor<Tls>
132where
133    R: 'static + Send,
134    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + Unpin,
135    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
136    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
137    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send + Unpin,
138    F: FnOnce(Pool<PostgresConnectionManager<Tls>>) -> ResponseFuture<Result<R, PostgresError>>
139        + 'static
140        + Send
141        + Sync,
142{
143    type Result = ResponseFuture<Result<R, PostgresError>>;
144
145    fn handle(
146        &mut self,
147        msg: PostgresMessage<F, Tls, R>,
148        _ctx: &mut Self::Context,
149    ) -> Self::Result {
150        if let Some(pool) = &self.pool {
151            let pool = pool.clone();
152            Box::pin(async move { (msg.query)(pool).await })
153        } else {
154            Box::pin(async { Err(PostgresError::PoolNone) })
155        }
156    }
157}
158
159/// PostgreSQL Errors
160#[derive(Debug, Error)]
161pub enum PostgresError {
162    /// An error returned from postgres.
163    #[error(transparent)]
164    PgError(#[from] Error),
165    /// An error returned from bb8.
166    #[error(transparent)]
167    BB8Error(#[from] RunError<Error>),
168    /// An error returned at pool none.
169    #[error("connection pool not initialized")]
170    PoolNone,
171    /// An error returned from user.
172    #[error("user error: {0}")]
173    Other(String),
174}