1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
use mobc::futures::{FutureExt, TryFutureExt}; use mobc::AnyFuture; use mobc::ConnectionManager; use tokio_executor::DefaultExecutor; use tokio_executor::Executor as TkExecutor; pub use tokio_postgres; use tokio_postgres::tls::{MakeTlsConnect, TlsConnect}; use tokio_postgres::Client; use tokio_postgres::Config; use tokio_postgres::Error; use tokio_postgres::Socket; pub struct PostgresConnectionManager<Tls, U> where U: TkExecutor + Send + Sync + 'static + Clone, { config: Config, tls: Tls, executor: U, } impl<Tls> PostgresConnectionManager<Tls, DefaultExecutor> { pub fn new(config: Config, tls: Tls) -> Self { PostgresConnectionManager { config, tls, executor: DefaultExecutor::current(), } } } impl<Tls, U> PostgresConnectionManager<Tls, U> where U: TkExecutor + Send + Sync + 'static + Clone, { pub fn new_with_executor(config: Config, tls: Tls, executor: U) -> Self { PostgresConnectionManager { config, tls, executor, } } } impl<Tls, U> ConnectionManager for PostgresConnectionManager<Tls, U> where Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static, <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync, <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send, <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send, U: TkExecutor + Send + Sync + 'static + Clone, { type Connection = Client; type Executor = U; type Error = Error; fn get_executor(&self) -> Self::Executor { self.executor.clone() } fn connect(&self) -> AnyFuture<Self::Connection, Self::Error> { let mut executor = self.get_executor().clone(); let config = self.config.clone(); let tls = self.tls.clone(); let connect_fut = async move { config.connect(tls).await }; Box::pin(connect_fut.map_ok(move |(client, conn)| { let _ = executor.spawn(Box::pin(conn.map(|_| ()))); client })) } fn is_valid(&self, conn: Self::Connection) -> AnyFuture<Self::Connection, Self::Error> { let simple_query_fut = async move { conn.execute("", &[]).await?; Ok(conn) }; Box::pin(simple_query_fut) } fn has_broken(&self, conn: &mut Option<Self::Connection>) -> bool { match conn { Some(ref raw) => raw.is_closed(), None => false, } } }