orma_mobc/
lib.rs

1use async_trait::async_trait;
2use mobc::Manager;
3use orma::tls::{MakeTlsConnect, TlsConnect};
4use orma::{Config, Connection, DbError, Socket};
5
6#[allow(clippy::needless_doctest_main)]
7/// A `mobc::Manager` for `orma::Connection`s.
8/// This crate is a strict derivation of [mobc-postgres](https://github.com/importcjj/mobc-postgres/)
9///
10/// ## Example
11///
12/// ```no_run
13/// use mobc::Pool;
14/// use orma_mobc::PgConnectionManager;
15/// use orma::{Config, NoTls};
16/// use std::time::Instant;
17/// use std::str::FromStr;
18/// #[tokio::main]
19/// async fn main() {
20///     let config = Config::from_str("postgres://user:passwd@localhost:5432").unwrap();
21///     let manager = PgConnectionManager::new(config, NoTls);
22///     let pool = Pool::builder().max_open(20).build(manager);
23///     const MAX: usize = 5000;
24///
25///     let now = Instant::now();
26///     let (tx, mut rx) = tokio::sync::mpsc::channel::<usize>(16);
27///     for i in 0..MAX {
28///         let pool = pool.clone();
29///         let mut tx_c = tx.clone();
30///         tokio::spawn(async move {
31///             let connection = pool.get().await.unwrap();
32///             let rows = connection.query("SELECT 1 + 2", &[]).await.unwrap();
33///             let value: i32 = rows[0].get(0);
34///             assert_eq!(value, 3);
35///             tx_c.send(i).await.unwrap();
36///         });
37///     }
38///     for _ in 0..MAX {
39///         rx.recv().await.unwrap();
40///     }
41///
42///     println!("cost: {:?}", now.elapsed());
43/// }
44/// ```
45pub struct PgConnectionManager<Tls> {
46    config: Config,
47    tls: Tls,
48}
49
50impl<Tls> PgConnectionManager<Tls> {
51    pub fn new(config: Config, tls: Tls) -> Self {
52        Self { config, tls }
53    }
54}
55
56#[async_trait]
57impl<Tls> Manager for PgConnectionManager<Tls>
58where
59    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
60    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
61    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
62    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
63{
64    type Connection = Connection;
65    type Error = DbError;
66
67    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
68        let tls = self.tls.clone();
69        let (client, conn) = self.config.connect(tls).await?;
70        mobc::spawn(conn);
71        Ok(client.into())
72    }
73
74    async fn check(&self, conn: Self::Connection) -> Result<Self::Connection, Self::Error> {
75        conn.simple_query("").await?;
76        Ok(conn)
77    }
78}