1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
use crate::error::Result; use deadpool::managed::Object; use deadpool_postgres::{ClientWrapper, Transaction as PoolTransaction}; use tokio_postgres::types::ToSql; use tokio_postgres::{Client, NoTls, ToStatement, Transaction as ClientTransaction}; pub async fn connect<'a>() -> Result<Connection<'a>> { let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL must be set!"); let (client, connection) = tokio_postgres::connect(&database_url, NoTls).await?; tokio::spawn(async move { if let Err(e) = connection.await { eprintln!("connection error: {}", e); } }); #[cfg(debug_assertions)] client.execute("SET log_statement = 'all'", &[]).await?; client.execute("SET TIME ZONE 'Japan'", &[]).await?; Ok(Connection::ClientConnection(client)) } pub enum Connection<'a> { ClientConnection(Client), ClientTransaction(ClientTransaction<'a>), PoolConnection(Object<ClientWrapper, tokio_postgres::Error>), PoolTransaction(PoolTransaction<'a>), } impl<'a> Connection<'a> { pub async fn commit(self) -> Result<()> { match self { Connection::ClientConnection(_) => { unimplemented!(); } Connection::ClientTransaction(x) => { x.commit().await?; } Connection::PoolConnection(_) => { unimplemented!(); } Connection::PoolTransaction(x) => { x.commit().await?; } } Ok(()) } pub async fn execute<T>( &self, statement: &T, params: &[&(dyn ToSql + Sync)], ) -> std::result::Result<u64, tokio_postgres::Error> where T: ?Sized + ToStatement, { match self { Connection::ClientConnection(x) => x.execute(statement, params).await, Connection::ClientTransaction(x) => x.execute(statement, params).await, Connection::PoolConnection(x) => x.execute(statement, params).await, Connection::PoolTransaction(x) => x.execute(statement, params).await, } } pub async fn query<T>( &self, statement: &T, params: &[&(dyn ToSql + Sync)], ) -> std::result::Result<Vec<tokio_postgres::Row>, tokio_postgres::Error> where T: ?Sized + ToStatement, { match self { Connection::ClientConnection(x) => x.query(statement, params).await, Connection::ClientTransaction(x) => x.query(statement, params).await, Connection::PoolConnection(x) => x.query(statement, params).await, Connection::PoolTransaction(x) => x.query(statement, params).await, } } pub async fn query_one<T>( &self, statement: &T, params: &[&(dyn ToSql + Sync)], ) -> std::result::Result<tokio_postgres::Row, tokio_postgres::Error> where T: ?Sized + ToStatement, { match self { Connection::ClientConnection(x) => x.query_one(statement, params).await, Connection::ClientTransaction(x) => x.query_one(statement, params).await, Connection::PoolConnection(x) => x.query_one(statement, params).await, Connection::PoolTransaction(x) => x.query_one(statement, params).await, } } pub async fn query_opt<T>( &self, statement: &T, params: &[&(dyn ToSql + Sync)], ) -> std::result::Result<Option<tokio_postgres::Row>, tokio_postgres::Error> where T: ?Sized + ToStatement, { match self { Connection::ClientConnection(x) => x.query_opt(statement, params).await, Connection::ClientTransaction(x) => x.query_opt(statement, params).await, Connection::PoolConnection(x) => x.query_opt(statement, params).await, Connection::PoolTransaction(x) => x.query_opt(statement, params).await, } } pub async fn transaction<'b>(&'b mut self) -> Result<Connection<'b>> { let transaction = match self { Connection::ClientConnection(x) => { Connection::ClientTransaction(x.transaction().await?) } Connection::ClientTransaction(x) => { Connection::ClientTransaction(x.transaction().await?) } Connection::PoolConnection(x) => Connection::PoolTransaction(x.transaction().await?), Connection::PoolTransaction(x) => Connection::PoolTransaction(x.transaction().await?), }; Ok(transaction) } }