use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use sqlx::postgres::{PgConnectOptions, PgPool, PgPoolOptions};
use super::backend::{
classify, BackendError, BackendKind, CatalogBackend, IsolationLevel, Transaction, TxOptions,
};
pub struct PostgresBackend {
pool: PgPool,
}
impl PostgresBackend {
pub async fn open_with_options(
url: &str,
pool_size: u32,
max_lifetime_secs: Option<u32>,
) -> Result<Arc<Self>, BackendError> {
let opts: PgConnectOptions = url.parse().map_err(classify)?;
let mut builder = PgPoolOptions::new().max_connections(pool_size);
if let Some(secs) = max_lifetime_secs {
builder = builder.max_lifetime(Duration::from_secs(secs as u64));
}
let pool = builder.connect_with(opts).await.map_err(classify)?;
Ok(Arc::new(Self { pool }))
}
}
impl CatalogBackend for PostgresBackend {
fn transaction<'a, F, R>(
&'a self,
opts: TxOptions,
f: F,
) -> Pin<Box<dyn Future<Output = Result<R, BackendError>> + Send + 'a>>
where
F: for<'tx> FnOnce(
&'tx mut Transaction<'tx>,
)
-> Pin<Box<dyn Future<Output = Result<R, BackendError>> + Send + 'tx>>
+ Send
+ 'a,
R: Send + 'a,
{
Box::pin(async move {
let mut tx = self.pool.begin().await.map_err(classify)?;
let iso_sql = match opts.isolation {
IsolationLevel::ReadCommitted => "SET TRANSACTION ISOLATION LEVEL READ COMMITTED",
IsolationLevel::RepeatableRead => "SET TRANSACTION ISOLATION LEVEL REPEATABLE READ",
IsolationLevel::Serializable => "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE",
};
sqlx::query(iso_sql)
.execute(&mut *tx)
.await
.map_err(classify)?;
if opts.read_only {
sqlx::query("SET TRANSACTION READ ONLY")
.execute(&mut *tx)
.await
.map_err(classify)?;
}
let outcome = {
let mut wrapper = Transaction::new_postgres(&mut tx);
f(&mut wrapper).await
};
match outcome {
Ok(value) => {
tx.commit().await.map_err(classify)?;
Ok(value)
}
Err(err) => {
let _ = tx.rollback().await;
Err(err)
}
}
})
}
fn migrate(&self) -> Pin<Box<dyn Future<Output = Result<(), BackendError>> + Send + '_>> {
Box::pin(async move { super::migrations::run(self).await })
}
fn ping(&self) -> Pin<Box<dyn Future<Output = Result<(), BackendError>> + Send + '_>> {
Box::pin(async move {
sqlx::query("SELECT 1")
.execute(&self.pool)
.await
.map_err(classify)?;
Ok(())
})
}
fn backend_kind(&self) -> BackendKind {
BackendKind::Postgres
}
}