use diesel::r2d2::{ConnectionManager, Pool, PooledConnection};
use crate::error::{InternalError, InvalidStateError};
use super::{Backend, Connection, Execute};
pub struct PostgresConnection(
pub(in crate::state::merkle::sql) PooledConnection<ConnectionManager<diesel::pg::PgConnection>>,
);
impl Connection for PostgresConnection {
type ConnectionType = diesel::pg::PgConnection;
fn as_inner(&self) -> &Self::ConnectionType {
&self.0
}
}
#[derive(Clone)]
pub struct PostgresBackend {
connection_pool: Pool<ConnectionManager<diesel::pg::PgConnection>>,
}
impl Backend for PostgresBackend {
type Connection = PostgresConnection;
fn connection(&self) -> Result<Self::Connection, InternalError> {
self.connection_pool
.get()
.map(PostgresConnection)
.map_err(|err| InternalError::from_source(Box::new(err)))
}
}
impl Execute for PostgresBackend {
fn execute<F, T>(&self, f: F) -> Result<T, InternalError>
where
F: Fn(&Self::Connection) -> Result<T, InternalError>,
{
let conn = self
.connection_pool
.get()
.map(PostgresConnection)
.map_err(|err| InternalError::from_source(Box::new(err)))?;
f(&conn)
}
}
impl From<Pool<ConnectionManager<diesel::pg::PgConnection>>> for PostgresBackend {
fn from(pool: Pool<ConnectionManager<diesel::pg::PgConnection>>) -> Self {
Self {
connection_pool: pool,
}
}
}
impl From<PostgresBackend> for Pool<ConnectionManager<diesel::pg::PgConnection>> {
fn from(backend: PostgresBackend) -> Self {
backend.connection_pool
}
}
#[cfg(feature = "state-merkle-sql-in-transaction")]
pub struct BorrowedPostgresConnection<'a>(&'a diesel::pg::PgConnection);
#[cfg(feature = "state-merkle-sql-in-transaction")]
impl<'a> Connection for BorrowedPostgresConnection<'a> {
type ConnectionType = diesel::pg::PgConnection;
fn as_inner(&self) -> &Self::ConnectionType {
self.0
}
}
#[cfg(feature = "state-merkle-sql-in-transaction")]
pub struct InTransactionPostgresBackend<'a> {
connection: &'a diesel::pg::PgConnection,
}
#[cfg(feature = "state-merkle-sql-in-transaction")]
impl<'a> InTransactionPostgresBackend<'a> {
pub fn new(connection: &'a diesel::pg::PgConnection) -> Self {
Self { connection }
}
}
#[cfg(feature = "state-merkle-sql-in-transaction")]
impl<'a> Backend for InTransactionPostgresBackend<'a> {
type Connection = BorrowedPostgresConnection<'a>;
fn connection(&self) -> Result<Self::Connection, InternalError> {
Ok(BorrowedPostgresConnection(self.connection))
}
}
#[cfg(feature = "state-merkle-sql-in-transaction")]
impl<'a> Execute for InTransactionPostgresBackend<'a> {
fn execute<F, T>(&self, f: F) -> Result<T, InternalError>
where
F: Fn(&Self::Connection) -> Result<T, InternalError>,
{
f(&BorrowedPostgresConnection(self.connection))
}
}
#[cfg(feature = "state-merkle-sql-in-transaction")]
impl<'a> From<&'a diesel::pg::PgConnection> for InTransactionPostgresBackend<'a> {
fn from(conn: &'a diesel::pg::PgConnection) -> Self {
Self::new(conn)
}
}
#[derive(Default)]
pub struct PostgresBackendBuilder {
url: Option<String>,
}
impl PostgresBackendBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn with_url<S: Into<String>>(mut self, url: S) -> Self {
self.url = Some(url.into());
self
}
pub fn build(self) -> Result<PostgresBackend, InvalidStateError> {
let url = self.url.ok_or_else(|| {
InvalidStateError::with_message("must provide a postgres connection URL".into())
})?;
let connection_manager = ConnectionManager::<diesel::pg::PgConnection>::new(url);
let pool = Pool::builder()
.build(connection_manager)
.map_err(|err| InvalidStateError::with_message(err.to_string()))?;
let _conn = pool
.get()
.map_err(|err| InvalidStateError::with_message(err.to_string()))?;
Ok(PostgresBackend {
connection_pool: pool,
})
}
}
#[cfg(feature = "state-merkle-sql-postgres-tests")]
pub mod test {
use std::env;
use std::error::Error;
use std::panic;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use diesel::prelude::*;
use diesel_migrations::{
revert_latest_migration_in_directory, MigrationError, RunMigrationsError,
};
use lazy_static::lazy_static;
use crate::state::merkle::sql::migration;
lazy_static! {
static ref SERIAL_TEST_LOCK: Arc<Mutex<()>> = Arc::new(Mutex::new(()));
}
pub fn run_postgres_test<T>(test: T) -> Result<(), Box<dyn Error>>
where
T: FnOnce(&str) -> Result<(), Box<dyn Error>> + panic::UnwindSafe,
{
let (migration_result, test_result) = {
let _guard = SERIAL_TEST_LOCK.lock()?;
let url = env::var("STATE_MERKLE_SQL_POSTGRES_TEST_URL")
.ok()
.unwrap_or_else(|| "postgres://postgres:test@localhost:5432/transact".into());
{
let conn = PgConnection::establish(&url)?;
migration::postgres::run_migrations(&conn)?;
}
let test_url = url.clone();
let result = panic::catch_unwind(move || test(&test_url));
let conn = PgConnection::establish(&url)?;
let migration_result: Result<(), Box<dyn Error>> = loop {
match revert_latest_migration_in_directory(
&conn,
&PathBuf::from("./src/state/merkle/sql/migration/postgres/migrations"),
) {
Ok(_s) => (),
Err(RunMigrationsError::MigrationError(MigrationError::NoMigrationRun)) => {
break Ok(())
}
Err(err) => break Err(Box::new(err)),
}
};
(migration_result, result)
};
match test_result {
Ok(res) => migration_result.and(res),
Err(err) => {
panic::resume_unwind(err);
}
}
}
}