1use qp::async_trait;
3use qp::resource::Manage;
4use qp::Pool;
5use tokio_postgres::tls::{MakeTlsConnect, TlsConnect};
6use tokio_postgres::{Client, Config, Error, Socket};
7
8pub use qp;
9pub use tokio_postgres;
10
11pub type PgPool<T> = Pool<PgConnManager<T>>;
13
14pub struct PgConnManager<T>
16where
17 T: MakeTlsConnect<Socket> + Clone + Send + Sync,
18 T::Stream: Send + Sync + 'static,
19 T::TlsConnect: Send + Sync,
20 <T::TlsConnect as TlsConnect<Socket>>::Future: Send,
21{
22 config: Config,
23 tls: T,
24}
25
26#[async_trait]
27impl<T> Manage for PgConnManager<T>
28where
29 T: MakeTlsConnect<Socket> + Clone + Send + Sync,
30 T::Stream: Send + Sync + 'static,
31 T::TlsConnect: Send + Sync,
32 <T::TlsConnect as TlsConnect<Socket>>::Future: Send,
33{
34 type Output = Client;
35 type Error = Error;
36
37 async fn try_create(&self) -> Result<Self::Output, Self::Error> {
38 let (client, conn) = self.config.connect(self.tls.clone()).await?;
39 tokio::spawn(conn);
40 Ok(client)
41 }
42
43 async fn validate(&self, client: &Self::Output) -> bool {
44 !client.is_closed()
45 }
46}
47
48impl<T> PgConnManager<T>
49where
50 T: MakeTlsConnect<Socket> + Clone + Send + Sync,
51 T::Stream: Send + Sync + 'static,
52 T::TlsConnect: Send + Sync,
53 <T::TlsConnect as TlsConnect<Socket>>::Future: Send,
54{
55 pub fn new(config: Config, tls: T) -> Self {
57 Self { config, tls }
58 }
59}
60
61pub fn connect<T>(config: Config, tls: T, pool_size: usize) -> PgPool<T>
63where
64 T: MakeTlsConnect<Socket> + Clone + Send + Sync,
65 T::Stream: Send + Sync + 'static,
66 T::TlsConnect: Send + Sync,
67 <T::TlsConnect as TlsConnect<Socket>>::Future: Send,
68{
69 Pool::new(PgConnManager::new(config, tls), pool_size)
70}
71
72#[cfg(test)]
73mod tests {
74 use super::*;
75 use tokio_postgres::NoTls;
76
77 #[tokio::test]
78 async fn test_connect() {
79 let config = "postgresql://postgres:postgres@localhost".parse().unwrap();
80 let pool = connect(config, NoTls, 1);
81 let client = pool.acquire().await.unwrap();
82 let row = client.query_one("SELECT 1", &[]).await.unwrap();
83 let value: i32 = row.get(0);
84 assert_eq!(value, 1);
85 }
86}