1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
// SPDX-FileCopyrightText: OpenTalk GmbH <mail@opentalk.eu>
//
// SPDX-License-Identifier: EUPL-1.2

use crate::metrics::{DatabaseMetrics, MetricsConnection};
use crate::{DatabaseError, DbConnection};
use deadpool_runtime::Runtime;
use diesel_async::pooled_connection::deadpool::Pool;
use diesel_async::pooled_connection::AsyncDieselConnectionManager;
use diesel_async::AsyncPgConnection;
use std::sync::Arc;
use std::time::Duration;

type DbPool = Pool<AsyncPgConnection>;

/// Db container that uses a connection pool to hand out connections
///
/// Uses an deadpool connection pool to manage multiple established connections.
pub struct Db {
    metrics: Option<Arc<DatabaseMetrics>>,
    pool: DbPool,
}

impl Db {
    /// Creates a new Db instance from the specified database settings.
    #[tracing::instrument(skip(db_settings))]
    pub fn connect(db_settings: &opentalk_controller_settings::Database) -> crate::Result<Self> {
        Self::connect_url(&db_settings.url, db_settings.max_connections)
    }

    /// Creates a new Db instance from the specified database url.
    pub fn connect_url(db_url: &str, max_conns: u32) -> crate::Result<Self> {
        let manager = AsyncDieselConnectionManager::<AsyncPgConnection>::new(db_url);

        let pool = Pool::builder(manager)
            .max_size(max_conns as usize)
            .create_timeout(Some(Duration::from_secs(10)))
            .runtime(Runtime::Tokio1)
            .build()?;

        Ok(Self {
            metrics: None,
            pool,
        })
    }

    /// Set the metrics to use for this database pool
    pub fn set_metrics(&mut self, metrics: Arc<DatabaseMetrics>) {
        self.metrics = Some(metrics);
    }

    /// Returns an established connection from the connection pool
    #[tracing::instrument(skip_all)]
    pub async fn get_conn(&self) -> crate::Result<DbConnection> {
        let res = self.pool.get().await;
        let state = self.pool.status();

        if let Some(metrics) = &self.metrics {
            metrics.dbpool_connections.record(state.size as u64, &[]);

            metrics
                .dbpool_connections_idle
                .record(u64::try_from(state.available).unwrap_or_default(), &[]);
        }

        match res {
            Ok(conn) => {
                let conn = MetricsConnection {
                    metrics: self.metrics.clone(),
                    conn,
                };

                Ok(conn)
            }
            Err(e) => {
                let state = self.pool.status();
                let msg = format!(
                    "Unable to get connection from connection pool.
                                Error: {e}
                                Pool State:
                                    {state:?}",
                );
                log::error!("{}", &msg);
                Err(DatabaseError::DeadpoolError { source: e })
            }
        }
    }
}