1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
use mobc::futures::{FutureExt, TryFutureExt};
use mobc::AnyFuture;
use mobc::ConnectionManager;
use tokio_executor::DefaultExecutor;
use tokio_executor::Executor as TkExecutor;
pub use tokio_postgres;
use tokio_postgres::tls::{MakeTlsConnect, TlsConnect};
use tokio_postgres::Client;
use tokio_postgres::Config;
use tokio_postgres::Error;
use tokio_postgres::Socket;

pub struct PostgresConnectionManager<Tls, U>
where
    U: TkExecutor + Send + Sync + 'static + Clone,
{
    config: Config,
    tls: Tls,
    executor: U,
}

impl<Tls> PostgresConnectionManager<Tls, DefaultExecutor> {
    pub fn new(config: Config, tls: Tls) -> Self {
        PostgresConnectionManager {
            config,
            tls,
            executor: DefaultExecutor::current(),
        }
    }
}

impl<Tls, U> PostgresConnectionManager<Tls, U>
where
    U: TkExecutor + Send + Sync + 'static + Clone,
{
    pub fn new_with_executor(config: Config, tls: Tls, executor: U) -> Self {
        PostgresConnectionManager {
            config,
            tls,
            executor,
        }
    }
}

impl<Tls, U> ConnectionManager for PostgresConnectionManager<Tls, U>
where
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
    U: TkExecutor + Send + Sync + 'static + Clone,
{
    type Connection = Client;
    type Executor = U;
    type Error = Error;

    fn get_executor(&self) -> Self::Executor {
        self.executor.clone()
    }

    fn connect(&self) -> AnyFuture<Self::Connection, Self::Error> {
        let mut executor = self.get_executor().clone();
        let config = self.config.clone();
        let tls = self.tls.clone();
        let connect_fut = async move { config.connect(tls).await };
        Box::pin(connect_fut.map_ok(move |(client, conn)| {
            let _ = executor.spawn(Box::pin(conn.map(|_| ())));
            client
        }))
    }

    fn is_valid(&self, conn: Self::Connection) -> AnyFuture<Self::Connection, Self::Error> {
        let simple_query_fut = async move {
            conn.execute("", &[]).await?;
            Ok(conn)
        };
        Box::pin(simple_query_fut)
    }

    fn has_broken(&self, conn: &mut Option<Self::Connection>) -> bool {
        match conn {
            Some(ref raw) => raw.is_closed(),
            None => false,
        }
    }
}