datafusion_dist_cluster_postgres/
builder.rs

1use std::time::Duration;
2
3use bb8::Pool;
4use bb8_postgres::PostgresConnectionManager;
5use bb8_postgres::tokio_postgres::NoTls;
6use datafusion_dist::DistError;
7
8use crate::{PostgresCluster, PostgresClusterError};
9
10pub struct PostgresClusterBuilder {
11    host: String,
12    port: u16,
13    user: String,
14    password: String,
15    dbname: Option<String>,
16    pool_max_size: u32,
17    pool_min_idle: Option<u32>,
18    pool_idle_timeout: Option<Duration>,
19}
20
21impl PostgresClusterBuilder {
22    pub fn new(
23        host: impl Into<String>,
24        port: u16,
25        user: impl Into<String>,
26        password: impl Into<String>,
27    ) -> Self {
28        Self {
29            host: host.into(),
30            port,
31            user: user.into(),
32            password: password.into(),
33            dbname: None,
34            pool_max_size: 100,
35            pool_min_idle: Some(5),
36            pool_idle_timeout: Some(Duration::from_secs(60)),
37        }
38    }
39
40    pub fn dbname(mut self, dbname: impl Into<String>) -> Self {
41        self.dbname = Some(dbname.into());
42        self
43    }
44
45    pub fn pool_max_size(mut self, pool_max_size: u32) -> Self {
46        self.pool_max_size = pool_max_size;
47        self
48    }
49
50    pub fn pool_min_idle(mut self, pool_min_idle: Option<u32>) -> Self {
51        self.pool_min_idle = pool_min_idle;
52        self
53    }
54
55    pub fn pool_idle_timeout(mut self, pool_idle_timeout: Option<Duration>) -> Self {
56        self.pool_idle_timeout = pool_idle_timeout;
57        self
58    }
59
60    pub async fn build(self) -> Result<PostgresCluster, DistError> {
61        let mut config = bb8_postgres::tokio_postgres::config::Config::new();
62        config
63            .host(&self.host)
64            .port(self.port)
65            .user(&self.user)
66            .password(&self.password);
67        if let Some(dbname) = self.dbname {
68            config.dbname(dbname);
69        }
70        let manager = PostgresConnectionManager::new(config, NoTls);
71        let pool = Pool::builder()
72            .max_size(self.pool_max_size)
73            .min_idle(self.pool_min_idle)
74            .idle_timeout(self.pool_idle_timeout)
75            .build(manager)
76            .await
77            .map_err(|e| DistError::cluster(Box::new(PostgresClusterError::Connection(e))))?;
78
79        let cluster = PostgresCluster::new(pool);
80
81        cluster.ensure_schema().await?;
82
83        Ok(cluster)
84    }
85}