mobc_postgres/
lib.rs

1pub use tokio_postgres;
2pub use mobc;
3use mobc::Manager;
4use mobc::async_trait;
5use tokio_postgres::tls::{MakeTlsConnect, TlsConnect};
6use tokio_postgres::Client;
7use tokio_postgres::Config;
8use tokio_postgres::Error;
9use tokio_postgres::Socket;
10
11
12/// An `mobc::Manager` for `tokio_postgres::Client`s.
13///
14/// ## Example
15///
16/// ```no_run
17/// use mobc::Pool;
18/// use std::str::FromStr;
19/// use std::time::Instant;
20/// use mobc_postgres::PgConnectionManager;
21/// use tokio_postgres::Config;
22/// use tokio_postgres::NoTls;
23///
24/// #[tokio::main]
25/// async fn main() {
26///     let config = Config::from_str("postgres://jiaju:jiaju@localhost:5432").unwrap();
27///     let manager = PgConnectionManager::new(config, NoTls);
28///     let pool = Pool::builder().max_open(20).build(manager);
29///     const MAX: usize = 5000;
30///
31///     let now = Instant::now();
32///     let (tx, mut rx) = tokio::sync::mpsc::channel::<usize>(16);
33///     for i in 0..MAX {
34///         let pool = pool.clone();
35///         let mut tx_c = tx.clone();
36///         tokio::spawn(async move {
37///             let client = pool.get().await.unwrap();
38///             let rows = client.query("SELECT 1 + 2", &[]).await.unwrap();
39///             let value: i32 = rows[0].get(0);
40///             assert_eq!(value, 3);
41///             tx_c.send(i).await.unwrap();
42///         });
43///     }
44///     for _ in 0..MAX {
45///         rx.recv().await.unwrap();
46///     }
47///
48///     println!("cost: {:?}", now.elapsed());
49/// }
50/// ```
51pub struct PgConnectionManager<Tls> {
52    config: Config,
53    tls: Tls,
54}
55
56impl<Tls> PgConnectionManager<Tls> {
57    pub fn new(config: Config, tls: Tls) -> Self {
58        Self { config, tls }
59    }
60}
61
62#[async_trait]
63impl<Tls> Manager for PgConnectionManager<Tls>
64where
65    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
66    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
67    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
68    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
69{
70    type Connection = Client;
71    type Error = Error;
72
73    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
74        let tls = self.tls.clone();
75        let (client, conn) = self.config.connect(tls).await?;
76        mobc::spawn(conn);
77        Ok(client)
78    }
79
80    async fn check(&self, conn: Self::Connection) -> Result<Self::Connection, Self::Error> {
81        conn.simple_query("").await?;
82        Ok(conn)
83    }
84}