transact/state/merkle/sql/backend/
postgres.rs1use diesel::r2d2::{ConnectionManager, Pool, PooledConnection};
23
24use crate::error::{InternalError, InvalidStateError};
25
26use super::{Backend, Connection, Execute};
27
28pub struct PostgresConnection(
32 pub(in crate::state::merkle::sql) PooledConnection<ConnectionManager<diesel::pg::PgConnection>>,
33);
34
35impl Connection for PostgresConnection {
36 type ConnectionType = diesel::pg::PgConnection;
37
38 fn as_inner(&self) -> &Self::ConnectionType {
39 &self.0
40 }
41}
42
43#[derive(Clone)]
49pub struct PostgresBackend {
50 connection_pool: Pool<ConnectionManager<diesel::pg::PgConnection>>,
51}
52
53impl Backend for PostgresBackend {
54 type Connection = PostgresConnection;
55
56 fn connection(&self) -> Result<Self::Connection, InternalError> {
57 self.connection_pool
58 .get()
59 .map(PostgresConnection)
60 .map_err(|err| InternalError::from_source(Box::new(err)))
61 }
62}
63
64impl Execute for PostgresBackend {
65 fn execute<F, T>(&self, f: F) -> Result<T, InternalError>
66 where
67 F: Fn(&Self::Connection) -> Result<T, InternalError>,
68 {
69 let conn = self
70 .connection_pool
71 .get()
72 .map(PostgresConnection)
73 .map_err(|err| InternalError::from_source(Box::new(err)))?;
74
75 f(&conn)
76 }
77}
78
79impl From<Pool<ConnectionManager<diesel::pg::PgConnection>>> for PostgresBackend {
80 fn from(pool: Pool<ConnectionManager<diesel::pg::PgConnection>>) -> Self {
81 Self {
82 connection_pool: pool,
83 }
84 }
85}
86
87impl From<PostgresBackend> for Pool<ConnectionManager<diesel::pg::PgConnection>> {
88 fn from(backend: PostgresBackend) -> Self {
89 backend.connection_pool
90 }
91}
92
93#[cfg(feature = "state-merkle-sql-in-transaction")]
97pub struct BorrowedPostgresConnection<'a>(&'a diesel::pg::PgConnection);
98
99#[cfg(feature = "state-merkle-sql-in-transaction")]
100impl<'a> Connection for BorrowedPostgresConnection<'a> {
101 type ConnectionType = diesel::pg::PgConnection;
102
103 fn as_inner(&self) -> &Self::ConnectionType {
104 self.0
105 }
106}
107
108#[cfg(feature = "state-merkle-sql-in-transaction")]
114pub struct InTransactionPostgresBackend<'a> {
115 connection: &'a diesel::pg::PgConnection,
116}
117
118#[cfg(feature = "state-merkle-sql-in-transaction")]
119impl<'a> InTransactionPostgresBackend<'a> {
120 pub fn new(connection: &'a diesel::pg::PgConnection) -> Self {
122 Self { connection }
123 }
124}
125
126#[cfg(feature = "state-merkle-sql-in-transaction")]
127impl<'a> Backend for InTransactionPostgresBackend<'a> {
128 type Connection = BorrowedPostgresConnection<'a>;
129
130 fn connection(&self) -> Result<Self::Connection, InternalError> {
131 Ok(BorrowedPostgresConnection(self.connection))
132 }
133}
134
135#[cfg(feature = "state-merkle-sql-in-transaction")]
136impl<'a> Execute for InTransactionPostgresBackend<'a> {
137 fn execute<F, T>(&self, f: F) -> Result<T, InternalError>
138 where
139 F: Fn(&Self::Connection) -> Result<T, InternalError>,
140 {
141 f(&BorrowedPostgresConnection(self.connection))
142 }
143}
144
145#[cfg(feature = "state-merkle-sql-in-transaction")]
146impl<'a> From<&'a diesel::pg::PgConnection> for InTransactionPostgresBackend<'a> {
147 fn from(conn: &'a diesel::pg::PgConnection) -> Self {
148 Self::new(conn)
149 }
150}
151
152#[derive(Default)]
156pub struct PostgresBackendBuilder {
157 url: Option<String>,
158}
159
160impl PostgresBackendBuilder {
161 pub fn new() -> Self {
163 Self::default()
164 }
165
166 pub fn with_url<S: Into<String>>(mut self, url: S) -> Self {
173 self.url = Some(url.into());
174 self
175 }
176
177 pub fn build(self) -> Result<PostgresBackend, InvalidStateError> {
186 let url = self.url.ok_or_else(|| {
187 InvalidStateError::with_message("must provide a postgres connection URL".into())
188 })?;
189
190 let connection_manager = ConnectionManager::<diesel::pg::PgConnection>::new(url);
191 let pool = Pool::builder()
192 .build(connection_manager)
193 .map_err(|err| InvalidStateError::with_message(err.to_string()))?;
194
195 let _conn = pool
197 .get()
198 .map_err(|err| InvalidStateError::with_message(err.to_string()))?;
199
200 Ok(PostgresBackend {
201 connection_pool: pool,
202 })
203 }
204}
205
206#[cfg(feature = "state-merkle-sql-postgres-tests")]
207pub mod test {
208 use std::env;
209 use std::error::Error;
210 use std::panic;
211 use std::path::PathBuf;
212 use std::sync::{Arc, Mutex};
213
214 use diesel::prelude::*;
215 use diesel_migrations::{
216 revert_latest_migration_in_directory, MigrationError, RunMigrationsError,
217 };
218 use lazy_static::lazy_static;
219
220 use crate::state::merkle::sql::migration;
221
222 lazy_static! {
223 static ref SERIAL_TEST_LOCK: Arc<Mutex<()>> = Arc::new(Mutex::new(()));
224 }
225
226 pub fn run_postgres_test<T>(test: T) -> Result<(), Box<dyn Error>>
236 where
237 T: FnOnce(&str) -> Result<(), Box<dyn Error>> + panic::UnwindSafe,
238 {
239 let (migration_result, test_result) = {
240 let _guard = SERIAL_TEST_LOCK.lock()?;
241
242 let url = env::var("STATE_MERKLE_SQL_POSTGRES_TEST_URL")
243 .ok()
244 .unwrap_or_else(|| "postgres://postgres:test@localhost:5432/transact".into());
245 {
246 let conn = PgConnection::establish(&url)?;
247 migration::postgres::run_migrations(&conn)?;
248 }
249
250 let test_url = url.clone();
251 let result = panic::catch_unwind(move || test(&test_url));
252
253 let conn = PgConnection::establish(&url)?;
255 let migration_result: Result<(), Box<dyn Error>> = loop {
256 match revert_latest_migration_in_directory(
257 &conn,
258 &PathBuf::from("./src/state/merkle/sql/migration/postgres/migrations"),
259 ) {
260 Ok(_s) => (),
261 Err(RunMigrationsError::MigrationError(MigrationError::NoMigrationRun)) => {
262 break Ok(())
263 }
264 Err(err) => break Err(Box::new(err)),
265 }
266 };
267
268 (migration_result, result)
269 };
270
271 match test_result {
272 Ok(res) => migration_result.and(res),
273 Err(err) => {
274 panic::resume_unwind(err);
275 }
276 }
277 }
278}