datafusion_dist_cluster_postgres/
builder.rs

1use std::time::Duration;
2
3use bb8::Pool;
4use bb8_postgres::PostgresConnectionManager;
5use bb8_postgres::tokio_postgres::NoTls;
6use datafusion_dist::DistError;
7
8use crate::{PostgresCluster, PostgresClusterError};
9
10#[derive(Debug, derive_with::With)]
11pub struct PostgresClusterBuilder {
12    host: String,
13    port: u16,
14    user: String,
15    password: String,
16    dbname: Option<String>,
17    table: String,
18    pool_max_size: u32,
19    pool_min_idle: Option<u32>,
20    pool_idle_timeout: Option<Duration>,
21    heartbeat_timeout_seconds: i32,
22}
23
24impl PostgresClusterBuilder {
25    pub fn new(
26        host: impl Into<String>,
27        port: u16,
28        user: impl Into<String>,
29        password: impl Into<String>,
30    ) -> Self {
31        Self {
32            host: host.into(),
33            port,
34            user: user.into(),
35            password: password.into(),
36            dbname: None,
37            table: "cluster_nodes".to_string(),
38            pool_max_size: 100,
39            pool_min_idle: Some(5),
40            pool_idle_timeout: Some(Duration::from_secs(60)),
41            heartbeat_timeout_seconds: 60,
42        }
43    }
44
45    pub async fn build(self) -> Result<PostgresCluster, DistError> {
46        let mut config = bb8_postgres::tokio_postgres::config::Config::new();
47        config
48            .host(&self.host)
49            .port(self.port)
50            .user(&self.user)
51            .password(&self.password);
52        if let Some(dbname) = self.dbname {
53            config.dbname(dbname);
54        }
55        let manager = PostgresConnectionManager::new(config, NoTls);
56        let pool = Pool::builder()
57            .max_size(self.pool_max_size)
58            .min_idle(self.pool_min_idle)
59            .idle_timeout(self.pool_idle_timeout)
60            .build(manager)
61            .await
62            .map_err(|e| DistError::cluster(Box::new(PostgresClusterError::Connection(e))))?;
63
64        let cluster = PostgresCluster {
65            table: self.table,
66            pool,
67            heartbeat_timeout_seconds: self.heartbeat_timeout_seconds,
68        };
69
70        cluster.create_table_if_not_exists().await?;
71
72        Ok(cluster)
73    }
74}