Skip to main content

transact/state/merkle/sql/backend/
postgres.rs

1/*
2 * Copyright 2021 Cargill Incorporated
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 * -----------------------------------------------------------------------------
16 */
17
18//! Postgres Database Backend
19//!
20//! Available if the feature "postgres" is enabled.
21
22use diesel::r2d2::{ConnectionManager, Pool, PooledConnection};
23
24use crate::error::{InternalError, InvalidStateError};
25
26use super::{Backend, Connection, Execute};
27
28/// A connection to a Postgres database.
29///
30/// Available if the feature "postgres" is enabled.
31pub struct PostgresConnection(
32    pub(in crate::state::merkle::sql) PooledConnection<ConnectionManager<diesel::pg::PgConnection>>,
33);
34
35impl Connection for PostgresConnection {
36    type ConnectionType = diesel::pg::PgConnection;
37
38    fn as_inner(&self) -> &Self::ConnectionType {
39        &self.0
40    }
41}
42
43/// The Postgres Backend
44///
45/// This struct provides the [Backend] implementation details for Postgres databases.
46///
47/// Available if the feature "postgres" is enabled.
48#[derive(Clone)]
49pub struct PostgresBackend {
50    connection_pool: Pool<ConnectionManager<diesel::pg::PgConnection>>,
51}
52
53impl Backend for PostgresBackend {
54    type Connection = PostgresConnection;
55
56    fn connection(&self) -> Result<Self::Connection, InternalError> {
57        self.connection_pool
58            .get()
59            .map(PostgresConnection)
60            .map_err(|err| InternalError::from_source(Box::new(err)))
61    }
62}
63
64impl Execute for PostgresBackend {
65    fn execute<F, T>(&self, f: F) -> Result<T, InternalError>
66    where
67        F: Fn(&Self::Connection) -> Result<T, InternalError>,
68    {
69        let conn = self
70            .connection_pool
71            .get()
72            .map(PostgresConnection)
73            .map_err(|err| InternalError::from_source(Box::new(err)))?;
74
75        f(&conn)
76    }
77}
78
79impl From<Pool<ConnectionManager<diesel::pg::PgConnection>>> for PostgresBackend {
80    fn from(pool: Pool<ConnectionManager<diesel::pg::PgConnection>>) -> Self {
81        Self {
82            connection_pool: pool,
83        }
84    }
85}
86
87impl From<PostgresBackend> for Pool<ConnectionManager<diesel::pg::PgConnection>> {
88    fn from(backend: PostgresBackend) -> Self {
89        backend.connection_pool
90    }
91}
92
93/// A borrowed Postgres connection.
94///
95/// Available if the features "state-merkle-sql-in-transaction" "postgres" are enabled.
96#[cfg(feature = "state-merkle-sql-in-transaction")]
97pub struct BorrowedPostgresConnection<'a>(&'a diesel::pg::PgConnection);
98
99#[cfg(feature = "state-merkle-sql-in-transaction")]
100impl<'a> Connection for BorrowedPostgresConnection<'a> {
101    type ConnectionType = diesel::pg::PgConnection;
102
103    fn as_inner(&self) -> &Self::ConnectionType {
104        self.0
105    }
106}
107
108/// A Postgres Backend that wraps a borrowed connection.
109///
110/// This backend is neither `Sync` nor `Send`.
111///
112/// Available if the features "state-merkle-sql-in-transaction" "postgres" are enabled.
113#[cfg(feature = "state-merkle-sql-in-transaction")]
114pub struct InTransactionPostgresBackend<'a> {
115    connection: &'a diesel::pg::PgConnection,
116}
117
118#[cfg(feature = "state-merkle-sql-in-transaction")]
119impl<'a> InTransactionPostgresBackend<'a> {
120    /// Wrap a reference to a [`diesel::pg::PgConnection`].
121    pub fn new(connection: &'a diesel::pg::PgConnection) -> Self {
122        Self { connection }
123    }
124}
125
126#[cfg(feature = "state-merkle-sql-in-transaction")]
127impl<'a> Backend for InTransactionPostgresBackend<'a> {
128    type Connection = BorrowedPostgresConnection<'a>;
129
130    fn connection(&self) -> Result<Self::Connection, InternalError> {
131        Ok(BorrowedPostgresConnection(self.connection))
132    }
133}
134
135#[cfg(feature = "state-merkle-sql-in-transaction")]
136impl<'a> Execute for InTransactionPostgresBackend<'a> {
137    fn execute<F, T>(&self, f: F) -> Result<T, InternalError>
138    where
139        F: Fn(&Self::Connection) -> Result<T, InternalError>,
140    {
141        f(&BorrowedPostgresConnection(self.connection))
142    }
143}
144
145#[cfg(feature = "state-merkle-sql-in-transaction")]
146impl<'a> From<&'a diesel::pg::PgConnection> for InTransactionPostgresBackend<'a> {
147    fn from(conn: &'a diesel::pg::PgConnection) -> Self {
148        Self::new(conn)
149    }
150}
151
152/// A Builder for the PostgresBackend.
153///
154/// Available if the feature "postgres" is enabled.
155#[derive(Default)]
156pub struct PostgresBackendBuilder {
157    url: Option<String>,
158}
159
160impl PostgresBackendBuilder {
161    /// Constructs a new builder.
162    pub fn new() -> Self {
163        Self::default()
164    }
165
166    /// Sets the URL for the postgres database instance.
167    ///
168    /// This URL should follow the format as documented at
169    /// [https://www.postgresql.org/docs/9.4/libpq-connect.html#LIBPQ-CONNSTRING](https://www.postgresql.org/docs/9.4/libpq-connect.html#LIBPQ-CONNSTRING).
170    ///
171    /// This is a required field.
172    pub fn with_url<S: Into<String>>(mut self, url: S) -> Self {
173        self.url = Some(url.into());
174        self
175    }
176
177    /// Constructs the [PostgresBackend] instance.
178    ///
179    /// # Errors
180    ///
181    /// This may return a [InvalidStateError] for a variety of reasons:
182    ///
183    /// * No URL provided
184    /// * Unable to connect to the database.
185    pub fn build(self) -> Result<PostgresBackend, InvalidStateError> {
186        let url = self.url.ok_or_else(|| {
187            InvalidStateError::with_message("must provide a postgres connection URL".into())
188        })?;
189
190        let connection_manager = ConnectionManager::<diesel::pg::PgConnection>::new(url);
191        let pool = Pool::builder()
192            .build(connection_manager)
193            .map_err(|err| InvalidStateError::with_message(err.to_string()))?;
194
195        // Validate that connections can be made
196        let _conn = pool
197            .get()
198            .map_err(|err| InvalidStateError::with_message(err.to_string()))?;
199
200        Ok(PostgresBackend {
201            connection_pool: pool,
202        })
203    }
204}
205
206#[cfg(feature = "state-merkle-sql-postgres-tests")]
207pub mod test {
208    use std::env;
209    use std::error::Error;
210    use std::panic;
211    use std::path::PathBuf;
212    use std::sync::{Arc, Mutex};
213
214    use diesel::prelude::*;
215    use diesel_migrations::{
216        revert_latest_migration_in_directory, MigrationError, RunMigrationsError,
217    };
218    use lazy_static::lazy_static;
219
220    use crate::state::merkle::sql::migration;
221
222    lazy_static! {
223        static ref SERIAL_TEST_LOCK: Arc<Mutex<()>> = Arc::new(Mutex::new(()));
224    }
225
226    /// Execute a test against a postgres database.
227    ///
228    /// This function will run the migrations against the database provided by the environment
229    /// variable STATE_MERKLE_SQL_POSTGRES_TEST_URL.  After the test completes, it will revert the
230    /// migrations to a clean state. In other words, each test can assume a clean state, after the
231    /// migrations have been applied.
232    ///
233    /// Additionally, tests will be run serially. This does not guarantee any order to the
234    /// tests, but does guarantee that they will not overlap and interfere with the test data.
235    pub fn run_postgres_test<T>(test: T) -> Result<(), Box<dyn Error>>
236    where
237        T: FnOnce(&str) -> Result<(), Box<dyn Error>> + panic::UnwindSafe,
238    {
239        let (migration_result, test_result) = {
240            let _guard = SERIAL_TEST_LOCK.lock()?;
241
242            let url = env::var("STATE_MERKLE_SQL_POSTGRES_TEST_URL")
243                .ok()
244                .unwrap_or_else(|| "postgres://postgres:test@localhost:5432/transact".into());
245            {
246                let conn = PgConnection::establish(&url)?;
247                migration::postgres::run_migrations(&conn)?;
248            }
249
250            let test_url = url.clone();
251            let result = panic::catch_unwind(move || test(&test_url));
252
253            // rollback
254            let conn = PgConnection::establish(&url)?;
255            let migration_result: Result<(), Box<dyn Error>> = loop {
256                match revert_latest_migration_in_directory(
257                    &conn,
258                    &PathBuf::from("./src/state/merkle/sql/migration/postgres/migrations"),
259                ) {
260                    Ok(_s) => (),
261                    Err(RunMigrationsError::MigrationError(MigrationError::NoMigrationRun)) => {
262                        break Ok(())
263                    }
264                    Err(err) => break Err(Box::new(err)),
265                }
266            };
267
268            (migration_result, result)
269        };
270
271        match test_result {
272            Ok(res) => migration_result.and(res),
273            Err(err) => {
274                panic::resume_unwind(err);
275            }
276        }
277    }
278}