1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#![deny(unused_crate_dependencies)]

pub mod queries;
pub mod types {
    pub use fuel_indexer_database_types::*;
}

pub use fuel_indexer_database_types::DbType;
use fuel_indexer_lib::{
    defaults,
    utils::{attempt_database_connection, ServiceStatus},
};
use fuel_indexer_postgres as postgres;
use sqlx::{
    pool::PoolConnection, postgres::PgConnectOptions, ConnectOptions, Error as SqlxError,
};
use std::{cmp::Ordering, collections::HashMap, str::FromStr};
use thiserror::Error;

#[derive(Debug, Error)]
pub enum IndexerDatabaseError {
    #[error("Invalid connection string: {0:?}")]
    InvalidConnectionString(String),
    #[error("Database backend not supported: {0:?}")]
    BackendNotSupported(String),
    #[error("No transaction is open.")]
    NoTransactionError,
    #[error("Error from sqlx: {0:#?}")]
    SqlxError(#[from] SqlxError),
    #[error("Unknown error")]
    Unknown,
}

#[derive(Debug)]
pub enum IndexerConnection {
    Postgres(Box<PoolConnection<sqlx::Postgres>>),
}

#[derive(Clone, Debug)]
pub enum IndexerConnectionPool {
    Postgres(sqlx::Pool<sqlx::Postgres>),
}

impl IndexerConnectionPool {
    pub fn database_type(&self) -> DbType {
        match self {
            IndexerConnectionPool::Postgres(_) => DbType::Postgres,
        }
    }

    pub async fn connect(
        database_url: &str,
    ) -> Result<IndexerConnectionPool, IndexerDatabaseError> {
        let url = url::Url::parse(database_url);
        if url.is_err() {
            return Err(IndexerDatabaseError::InvalidConnectionString(
                database_url.into(),
            ));
        }
        let mut url = url.expect("Database URL should be correctly formed");
        let mut query: HashMap<_, _> = url.query_pairs().into_owned().collect();

        let verbose = query
            .remove("verbose")
            .unwrap_or(defaults::VERBOSE_DB_LOGGING.to_string());

        let query = query
            .iter()
            .map(|(k, v)| format!("{k}={v}"))
            .collect::<Vec<String>>()
            .join("&");

        url.set_query(Some(&query));

        match url.scheme() {
            "postgres" => {
                let opts = match verbose.as_ref() {
                    "true" => PgConnectOptions::from_str(url.as_str())?,
                    "false" => {
                        let mut o = PgConnectOptions::from_str(url.as_str())?;
                        o.disable_statement_logging().clone()
                    }
                    _ => unimplemented!(),
                };

                let pool = attempt_database_connection(|| {
                    sqlx::postgres::PgPoolOptions::new().connect_with(opts.clone())
                })
                .await;

                Ok(IndexerConnectionPool::Postgres(pool))
            }
            err => Err(IndexerDatabaseError::BackendNotSupported(err.into())),
        }
    }

    pub async fn is_connected(&self) -> sqlx::Result<ServiceStatus> {
        match self {
            IndexerConnectionPool::Postgres(p) => {
                let mut conn = p.acquire().await?;
                let result =
                    postgres::execute_query(&mut conn, "SELECT true;".to_string())
                        .await?;

                match result.cmp(&1) {
                    Ordering::Equal => Ok(ServiceStatus::OK),
                    _ => Ok(ServiceStatus::NotOk),
                }
            }
        }
    }

    pub async fn acquire(&self) -> sqlx::Result<IndexerConnection> {
        match self {
            IndexerConnectionPool::Postgres(p) => {
                Ok(IndexerConnection::Postgres(Box::new(p.acquire().await?)))
            }
        }
    }
}