orma_mobc/lib.rs
1use async_trait::async_trait;
2use mobc::Manager;
3use orma::tls::{MakeTlsConnect, TlsConnect};
4use orma::{Config, Connection, DbError, Socket};
5
6#[allow(clippy::needless_doctest_main)]
7/// A `mobc::Manager` for `orma::Connection`s.
8/// This crate is a strict derivation of [mobc-postgres](https://github.com/importcjj/mobc-postgres/)
9///
10/// ## Example
11///
12/// ```no_run
13/// use mobc::Pool;
14/// use orma_mobc::PgConnectionManager;
15/// use orma::{Config, NoTls};
16/// use std::time::Instant;
17/// use std::str::FromStr;
18/// #[tokio::main]
19/// async fn main() {
20/// let config = Config::from_str("postgres://user:passwd@localhost:5432").unwrap();
21/// let manager = PgConnectionManager::new(config, NoTls);
22/// let pool = Pool::builder().max_open(20).build(manager);
23/// const MAX: usize = 5000;
24///
25/// let now = Instant::now();
26/// let (tx, mut rx) = tokio::sync::mpsc::channel::<usize>(16);
27/// for i in 0..MAX {
28/// let pool = pool.clone();
29/// let mut tx_c = tx.clone();
30/// tokio::spawn(async move {
31/// let connection = pool.get().await.unwrap();
32/// let rows = connection.query("SELECT 1 + 2", &[]).await.unwrap();
33/// let value: i32 = rows[0].get(0);
34/// assert_eq!(value, 3);
35/// tx_c.send(i).await.unwrap();
36/// });
37/// }
38/// for _ in 0..MAX {
39/// rx.recv().await.unwrap();
40/// }
41///
42/// println!("cost: {:?}", now.elapsed());
43/// }
44/// ```
45pub struct PgConnectionManager<Tls> {
46 config: Config,
47 tls: Tls,
48}
49
50impl<Tls> PgConnectionManager<Tls> {
51 pub fn new(config: Config, tls: Tls) -> Self {
52 Self { config, tls }
53 }
54}
55
56#[async_trait]
57impl<Tls> Manager for PgConnectionManager<Tls>
58where
59 Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
60 <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
61 <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
62 <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
63{
64 type Connection = Connection;
65 type Error = DbError;
66
67 async fn connect(&self) -> Result<Self::Connection, Self::Error> {
68 let tls = self.tls.clone();
69 let (client, conn) = self.config.connect(tls).await?;
70 mobc::spawn(conn);
71 Ok(client.into())
72 }
73
74 async fn check(&self, conn: Self::Connection) -> Result<Self::Connection, Self::Error> {
75 conn.simple_query("").await?;
76 Ok(conn)
77 }
78}