datafusion_dist_cluster_postgres/
builder.rs1use std::time::Duration;
2
3use bb8::Pool;
4use bb8_postgres::PostgresConnectionManager;
5use bb8_postgres::tokio_postgres::NoTls;
6use datafusion_dist::DistError;
7
8use crate::{PostgresCluster, PostgresClusterError};
9
10#[derive(Debug, derive_with::With)]
11pub struct PostgresClusterBuilder {
12 host: String,
13 port: u16,
14 user: String,
15 password: String,
16 dbname: Option<String>,
17 table: String,
18 pool_max_size: u32,
19 pool_min_idle: Option<u32>,
20 pool_idle_timeout: Option<Duration>,
21 heartbeat_timeout_seconds: i32,
22}
23
24impl PostgresClusterBuilder {
25 pub fn new(
26 host: impl Into<String>,
27 port: u16,
28 user: impl Into<String>,
29 password: impl Into<String>,
30 ) -> Self {
31 Self {
32 host: host.into(),
33 port,
34 user: user.into(),
35 password: password.into(),
36 dbname: None,
37 table: "cluster_nodes".to_string(),
38 pool_max_size: 100,
39 pool_min_idle: Some(5),
40 pool_idle_timeout: Some(Duration::from_secs(60)),
41 heartbeat_timeout_seconds: 60,
42 }
43 }
44
45 pub async fn build(self) -> Result<PostgresCluster, DistError> {
46 let mut config = bb8_postgres::tokio_postgres::config::Config::new();
47 config
48 .host(&self.host)
49 .port(self.port)
50 .user(&self.user)
51 .password(&self.password);
52 if let Some(dbname) = self.dbname {
53 config.dbname(dbname);
54 }
55 let manager = PostgresConnectionManager::new(config, NoTls);
56 let pool = Pool::builder()
57 .max_size(self.pool_max_size)
58 .min_idle(self.pool_min_idle)
59 .idle_timeout(self.pool_idle_timeout)
60 .build(manager)
61 .await
62 .map_err(|e| DistError::cluster(Box::new(PostgresClusterError::Connection(e))))?;
63
64 let cluster = PostgresCluster {
65 table: self.table,
66 pool,
67 heartbeat_timeout_seconds: self.heartbeat_timeout_seconds,
68 };
69
70 cluster.create_table_if_not_exists().await?;
71
72 Ok(cluster)
73 }
74}