mobc_postgres/lib.rs
1pub use tokio_postgres;
2pub use mobc;
3use mobc::Manager;
4use mobc::async_trait;
5use tokio_postgres::tls::{MakeTlsConnect, TlsConnect};
6use tokio_postgres::Client;
7use tokio_postgres::Config;
8use tokio_postgres::Error;
9use tokio_postgres::Socket;
10
11
12/// An `mobc::Manager` for `tokio_postgres::Client`s.
13///
14/// ## Example
15///
16/// ```no_run
17/// use mobc::Pool;
18/// use std::str::FromStr;
19/// use std::time::Instant;
20/// use mobc_postgres::PgConnectionManager;
21/// use tokio_postgres::Config;
22/// use tokio_postgres::NoTls;
23///
24/// #[tokio::main]
25/// async fn main() {
26/// let config = Config::from_str("postgres://jiaju:jiaju@localhost:5432").unwrap();
27/// let manager = PgConnectionManager::new(config, NoTls);
28/// let pool = Pool::builder().max_open(20).build(manager);
29/// const MAX: usize = 5000;
30///
31/// let now = Instant::now();
32/// let (tx, mut rx) = tokio::sync::mpsc::channel::<usize>(16);
33/// for i in 0..MAX {
34/// let pool = pool.clone();
35/// let mut tx_c = tx.clone();
36/// tokio::spawn(async move {
37/// let client = pool.get().await.unwrap();
38/// let rows = client.query("SELECT 1 + 2", &[]).await.unwrap();
39/// let value: i32 = rows[0].get(0);
40/// assert_eq!(value, 3);
41/// tx_c.send(i).await.unwrap();
42/// });
43/// }
44/// for _ in 0..MAX {
45/// rx.recv().await.unwrap();
46/// }
47///
48/// println!("cost: {:?}", now.elapsed());
49/// }
50/// ```
51pub struct PgConnectionManager<Tls> {
52 config: Config,
53 tls: Tls,
54}
55
56impl<Tls> PgConnectionManager<Tls> {
57 pub fn new(config: Config, tls: Tls) -> Self {
58 Self { config, tls }
59 }
60}
61
62#[async_trait]
63impl<Tls> Manager for PgConnectionManager<Tls>
64where
65 Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
66 <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
67 <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
68 <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
69{
70 type Connection = Client;
71 type Error = Error;
72
73 async fn connect(&self) -> Result<Self::Connection, Self::Error> {
74 let tls = self.tls.clone();
75 let (client, conn) = self.config.connect(tls).await?;
76 mobc::spawn(conn);
77 Ok(client)
78 }
79
80 async fn check(&self, conn: Self::Connection) -> Result<Self::Connection, Self::Error> {
81 conn.simple_query("").await?;
82 Ok(conn)
83 }
84}