opentalk_database/
db.rs

1// SPDX-FileCopyrightText: OpenTalk GmbH <mail@opentalk.eu>
2//
3// SPDX-License-Identifier: EUPL-1.2
4
5use std::{sync::Arc, time::Duration};
6
7use deadpool_runtime::Runtime;
8use diesel_async::{
9    pooled_connection::{deadpool::Pool, AsyncDieselConnectionManager},
10    AsyncPgConnection,
11};
12use snafu::Report;
13
14use crate::{
15    metrics::{DatabaseMetrics, MetricsConnection},
16    DatabaseError, DbConnection,
17};
18
19type DbPool = Pool<AsyncPgConnection>;
20
21/// Db container that uses a connection pool to hand out connections
22///
23/// Uses an deadpool connection pool to manage multiple established connections.
24pub struct Db {
25    metrics: Option<Arc<DatabaseMetrics>>,
26    pool: DbPool,
27}
28
29impl Db {
30    /// Creates a new Db instance from the specified database settings.
31    #[tracing::instrument(skip(db_settings))]
32    pub fn connect(db_settings: &opentalk_controller_settings::Database) -> crate::Result<Self> {
33        Self::connect_url(&db_settings.url, db_settings.max_connections)
34    }
35
36    /// Creates a new Db instance from the specified database url.
37    pub fn connect_url(db_url: &str, max_conns: u32) -> crate::Result<Self> {
38        let manager = AsyncDieselConnectionManager::<AsyncPgConnection>::new(db_url);
39
40        let pool = Pool::builder(manager)
41            .max_size(max_conns as usize)
42            .create_timeout(Some(Duration::from_secs(10)))
43            .runtime(Runtime::Tokio1)
44            .build()?;
45
46        Ok(Self {
47            metrics: None,
48            pool,
49        })
50    }
51
52    /// Set the metrics to use for this database pool
53    pub fn set_metrics(&mut self, metrics: Arc<DatabaseMetrics>) {
54        self.metrics = Some(metrics);
55    }
56
57    /// Returns an established connection from the connection pool
58    #[tracing::instrument(skip_all)]
59    pub async fn get_conn(&self) -> crate::Result<DbConnection> {
60        let res = self.pool.get().await;
61        let state = self.pool.status();
62
63        if let Some(metrics) = &self.metrics {
64            metrics.dbpool_connections.record(state.size as u64, &[]);
65
66            metrics
67                .dbpool_connections_idle
68                .record(u64::try_from(state.available).unwrap_or_default(), &[]);
69        }
70
71        match res {
72            Ok(conn) => {
73                let conn = MetricsConnection {
74                    metrics: self.metrics.clone(),
75                    conn,
76                    instrumentation: Arc::new(std::sync::Mutex::new(
77                        diesel::connection::get_default_instrumentation(),
78                    )),
79                };
80
81                Ok(conn)
82            }
83            Err(e) => {
84                let state = self.pool.status();
85                log::error!(
86                    "Unable to get connection from connection pool.
87                                Error: {}
88                                Pool State:
89                                    {state:?}",
90                    Report::from_error(&e)
91                );
92                Err(DatabaseError::DeadpoolError { source: e })
93            }
94        }
95    }
96}