use crate::metrics::{DatabaseMetrics, MetricsConnection};
use crate::{DatabaseError, DbConnection};
use deadpool_runtime::Runtime;
use diesel_async::pooled_connection::deadpool::Pool;
use diesel_async::pooled_connection::AsyncDieselConnectionManager;
use diesel_async::AsyncPgConnection;
use std::sync::Arc;
use std::time::Duration;
type DbPool = Pool<AsyncPgConnection>;
pub struct Db {
metrics: Option<Arc<DatabaseMetrics>>,
pool: DbPool,
}
impl Db {
#[tracing::instrument(skip(db_settings))]
pub fn connect(db_settings: &opentalk_controller_settings::Database) -> crate::Result<Self> {
Self::connect_url(&db_settings.url, db_settings.max_connections)
}
pub fn connect_url(db_url: &str, max_conns: u32) -> crate::Result<Self> {
let manager = AsyncDieselConnectionManager::<AsyncPgConnection>::new(db_url);
let pool = Pool::builder(manager)
.max_size(max_conns as usize)
.create_timeout(Some(Duration::from_secs(10)))
.runtime(Runtime::Tokio1)
.build()?;
Ok(Self {
metrics: None,
pool,
})
}
pub fn set_metrics(&mut self, metrics: Arc<DatabaseMetrics>) {
self.metrics = Some(metrics);
}
#[tracing::instrument(skip_all)]
pub async fn get_conn(&self) -> crate::Result<DbConnection> {
let res = self.pool.get().await;
let state = self.pool.status();
if let Some(metrics) = &self.metrics {
metrics.dbpool_connections.record(state.size as u64, &[]);
metrics
.dbpool_connections_idle
.record(u64::try_from(state.available).unwrap_or_default(), &[]);
}
match res {
Ok(conn) => {
let conn = MetricsConnection {
metrics: self.metrics.clone(),
conn,
};
Ok(conn)
}
Err(e) => {
let state = self.pool.status();
let msg = format!(
"Unable to get connection from connection pool.
Error: {e}
Pool State:
{state:?}",
);
log::error!("{}", &msg);
Err(DatabaseError::DeadpoolError { source: e })
}
}
}
}