datafusion_dist_cluster_postgres/
builder.rs1use std::time::Duration;
2
3use bb8::Pool;
4use bb8_postgres::PostgresConnectionManager;
5use bb8_postgres::tokio_postgres::NoTls;
6use datafusion_dist::DistError;
7
8use crate::{PostgresCluster, PostgresClusterError};
9
10pub struct PostgresClusterBuilder {
11 host: String,
12 port: u16,
13 user: String,
14 password: String,
15 dbname: Option<String>,
16 pool_max_size: u32,
17 pool_min_idle: Option<u32>,
18 pool_idle_timeout: Option<Duration>,
19}
20
21impl PostgresClusterBuilder {
22 pub fn new(
23 host: impl Into<String>,
24 port: u16,
25 user: impl Into<String>,
26 password: impl Into<String>,
27 ) -> Self {
28 Self {
29 host: host.into(),
30 port,
31 user: user.into(),
32 password: password.into(),
33 dbname: None,
34 pool_max_size: 100,
35 pool_min_idle: Some(5),
36 pool_idle_timeout: Some(Duration::from_secs(60)),
37 }
38 }
39
40 pub fn dbname(mut self, dbname: impl Into<String>) -> Self {
41 self.dbname = Some(dbname.into());
42 self
43 }
44
45 pub fn pool_max_size(mut self, pool_max_size: u32) -> Self {
46 self.pool_max_size = pool_max_size;
47 self
48 }
49
50 pub fn pool_min_idle(mut self, pool_min_idle: Option<u32>) -> Self {
51 self.pool_min_idle = pool_min_idle;
52 self
53 }
54
55 pub fn pool_idle_timeout(mut self, pool_idle_timeout: Option<Duration>) -> Self {
56 self.pool_idle_timeout = pool_idle_timeout;
57 self
58 }
59
60 pub async fn build(self) -> Result<PostgresCluster, DistError> {
61 let mut config = bb8_postgres::tokio_postgres::config::Config::new();
62 config
63 .host(&self.host)
64 .port(self.port)
65 .user(&self.user)
66 .password(&self.password);
67 if let Some(dbname) = self.dbname {
68 config.dbname(dbname);
69 }
70 let manager = PostgresConnectionManager::new(config, NoTls);
71 let pool = Pool::builder()
72 .max_size(self.pool_max_size)
73 .min_idle(self.pool_min_idle)
74 .idle_timeout(self.pool_idle_timeout)
75 .build(manager)
76 .await
77 .map_err(|e| DistError::cluster(Box::new(PostgresClusterError::Connection(e))))?;
78
79 let cluster = PostgresCluster::new(pool);
80
81 cluster.ensure_schema().await?;
82
83 Ok(cluster)
84 }
85}