transact 0.4.7

Transact is a transaction execution platform designed to be used as a library or component when implementing distributed ledgers, including blockchains.
Documentation
/*
 * Copyright 2021 Cargill Incorporated
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * -----------------------------------------------------------------------------
 */

//! Postgres Database Backend
//!
//! Available if the feature "postgres" is enabled.

use diesel::r2d2::{ConnectionManager, Pool, PooledConnection};

use crate::error::{InternalError, InvalidStateError};

use super::{Backend, Connection, Execute};

/// A connection to a Postgres database.
///
/// Available if the feature "postgres" is enabled.
pub struct PostgresConnection(
    pub(in crate::state::merkle::sql) PooledConnection<ConnectionManager<diesel::pg::PgConnection>>,
);

impl Connection for PostgresConnection {
    type ConnectionType = diesel::pg::PgConnection;

    fn as_inner(&self) -> &Self::ConnectionType {
        &self.0
    }
}

/// The Postgres Backend
///
/// This struct provides the [Backend] implementation details for Postgres databases.
///
/// Available if the feature "postgres" is enabled.
#[derive(Clone)]
pub struct PostgresBackend {
    connection_pool: Pool<ConnectionManager<diesel::pg::PgConnection>>,
}

impl Backend for PostgresBackend {
    type Connection = PostgresConnection;

    fn connection(&self) -> Result<Self::Connection, InternalError> {
        self.connection_pool
            .get()
            .map(PostgresConnection)
            .map_err(|err| InternalError::from_source(Box::new(err)))
    }
}

impl Execute for PostgresBackend {
    fn execute<F, T>(&self, f: F) -> Result<T, InternalError>
    where
        F: Fn(&Self::Connection) -> Result<T, InternalError>,
    {
        let conn = self
            .connection_pool
            .get()
            .map(PostgresConnection)
            .map_err(|err| InternalError::from_source(Box::new(err)))?;

        f(&conn)
    }
}

impl From<Pool<ConnectionManager<diesel::pg::PgConnection>>> for PostgresBackend {
    fn from(pool: Pool<ConnectionManager<diesel::pg::PgConnection>>) -> Self {
        Self {
            connection_pool: pool,
        }
    }
}

impl From<PostgresBackend> for Pool<ConnectionManager<diesel::pg::PgConnection>> {
    fn from(backend: PostgresBackend) -> Self {
        backend.connection_pool
    }
}

/// A borrowed Postgres connection.
///
/// Available if the features "state-merkle-sql-in-transaction" "postgres" are enabled.
#[cfg(feature = "state-merkle-sql-in-transaction")]
pub struct BorrowedPostgresConnection<'a>(&'a diesel::pg::PgConnection);

#[cfg(feature = "state-merkle-sql-in-transaction")]
impl<'a> Connection for BorrowedPostgresConnection<'a> {
    type ConnectionType = diesel::pg::PgConnection;

    fn as_inner(&self) -> &Self::ConnectionType {
        self.0
    }
}

/// A Postgres Backend that wraps a borrowed connection.
///
/// This backend is neither `Sync` nor `Send`.
///
/// Available if the features "state-merkle-sql-in-transaction" "postgres" are enabled.
#[cfg(feature = "state-merkle-sql-in-transaction")]
pub struct InTransactionPostgresBackend<'a> {
    connection: &'a diesel::pg::PgConnection,
}

#[cfg(feature = "state-merkle-sql-in-transaction")]
impl<'a> InTransactionPostgresBackend<'a> {
    /// Wrap a reference to a [`diesel::pg::PgConnection`].
    pub fn new(connection: &'a diesel::pg::PgConnection) -> Self {
        Self { connection }
    }
}

#[cfg(feature = "state-merkle-sql-in-transaction")]
impl<'a> Backend for InTransactionPostgresBackend<'a> {
    type Connection = BorrowedPostgresConnection<'a>;

    fn connection(&self) -> Result<Self::Connection, InternalError> {
        Ok(BorrowedPostgresConnection(self.connection))
    }
}

#[cfg(feature = "state-merkle-sql-in-transaction")]
impl<'a> Execute for InTransactionPostgresBackend<'a> {
    fn execute<F, T>(&self, f: F) -> Result<T, InternalError>
    where
        F: Fn(&Self::Connection) -> Result<T, InternalError>,
    {
        f(&BorrowedPostgresConnection(self.connection))
    }
}

#[cfg(feature = "state-merkle-sql-in-transaction")]
impl<'a> From<&'a diesel::pg::PgConnection> for InTransactionPostgresBackend<'a> {
    fn from(conn: &'a diesel::pg::PgConnection) -> Self {
        Self::new(conn)
    }
}

/// A Builder for the PostgresBackend.
///
/// Available if the feature "postgres" is enabled.
#[derive(Default)]
pub struct PostgresBackendBuilder {
    url: Option<String>,
}

impl PostgresBackendBuilder {
    /// Constructs a new builder.
    pub fn new() -> Self {
        Self::default()
    }

    /// Sets the URL for the postgres database instance.
    ///
    /// This URL should follow the format as documented at
    /// [https://www.postgresql.org/docs/9.4/libpq-connect.html#LIBPQ-CONNSTRING](https://www.postgresql.org/docs/9.4/libpq-connect.html#LIBPQ-CONNSTRING).
    ///
    /// This is a required field.
    pub fn with_url<S: Into<String>>(mut self, url: S) -> Self {
        self.url = Some(url.into());
        self
    }

    /// Constructs the [PostgresBackend] instance.
    ///
    /// # Errors
    ///
    /// This may return a [InvalidStateError] for a variety of reasons:
    ///
    /// * No URL provided
    /// * Unable to connect to the database.
    pub fn build(self) -> Result<PostgresBackend, InvalidStateError> {
        let url = self.url.ok_or_else(|| {
            InvalidStateError::with_message("must provide a postgres connection URL".into())
        })?;

        let connection_manager = ConnectionManager::<diesel::pg::PgConnection>::new(url);
        let pool = Pool::builder()
            .build(connection_manager)
            .map_err(|err| InvalidStateError::with_message(err.to_string()))?;

        // Validate that connections can be made
        let _conn = pool
            .get()
            .map_err(|err| InvalidStateError::with_message(err.to_string()))?;

        Ok(PostgresBackend {
            connection_pool: pool,
        })
    }
}

#[cfg(feature = "state-merkle-sql-postgres-tests")]
pub mod test {
    use std::env;
    use std::error::Error;
    use std::panic;
    use std::path::PathBuf;
    use std::sync::{Arc, Mutex};

    use diesel::prelude::*;
    use diesel_migrations::{
        revert_latest_migration_in_directory, MigrationError, RunMigrationsError,
    };
    use lazy_static::lazy_static;

    use crate::state::merkle::sql::migration;

    lazy_static! {
        static ref SERIAL_TEST_LOCK: Arc<Mutex<()>> = Arc::new(Mutex::new(()));
    }

    /// Execute a test against a postgres database.
    ///
    /// This function will run the migrations against the database provided by the environment
    /// variable STATE_MERKLE_SQL_POSTGRES_TEST_URL.  After the test completes, it will revert the
    /// migrations to a clean state. In other words, each test can assume a clean state, after the
    /// migrations have been applied.
    ///
    /// Additionally, tests will be run serially. This does not guarantee any order to the
    /// tests, but does guarantee that they will not overlap and interfere with the test data.
    pub fn run_postgres_test<T>(test: T) -> Result<(), Box<dyn Error>>
    where
        T: FnOnce(&str) -> Result<(), Box<dyn Error>> + panic::UnwindSafe,
    {
        let (migration_result, test_result) = {
            let _guard = SERIAL_TEST_LOCK.lock()?;

            let url = env::var("STATE_MERKLE_SQL_POSTGRES_TEST_URL")
                .ok()
                .unwrap_or_else(|| "postgres://postgres:test@localhost:5432/transact".into());
            {
                let conn = PgConnection::establish(&url)?;
                migration::postgres::run_migrations(&conn)?;
            }

            let test_url = url.clone();
            let result = panic::catch_unwind(move || test(&test_url));

            // rollback
            let conn = PgConnection::establish(&url)?;
            let migration_result: Result<(), Box<dyn Error>> = loop {
                match revert_latest_migration_in_directory(
                    &conn,
                    &PathBuf::from("./src/state/merkle/sql/migration/postgres/migrations"),
                ) {
                    Ok(_s) => (),
                    Err(RunMigrationsError::MigrationError(MigrationError::NoMigrationRun)) => {
                        break Ok(())
                    }
                    Err(err) => break Err(Box::new(err)),
                }
            };

            (migration_result, result)
        };

        match test_result {
            Ok(res) => migration_result.and(res),
            Err(err) => {
                panic::resume_unwind(err);
            }
        }
    }
}