datafusion-dist-cluster-postgres 0.1.0

A PostgreSQL cluster implementation for datafusion-dist
Documentation
use std::time::Duration;

use bb8::Pool;
use bb8_postgres::PostgresConnectionManager;
use bb8_postgres::tokio_postgres::NoTls;
use datafusion_dist::DistError;

use crate::{PostgresCluster, PostgresClusterError};

pub struct PostgresClusterBuilder {
    host: String,
    port: u16,
    user: String,
    password: String,
    dbname: Option<String>,
    pool_max_size: u32,
    pool_min_idle: Option<u32>,
    pool_idle_timeout: Option<Duration>,
}

impl PostgresClusterBuilder {
    pub fn new(
        host: impl Into<String>,
        port: u16,
        user: impl Into<String>,
        password: impl Into<String>,
    ) -> Self {
        Self {
            host: host.into(),
            port,
            user: user.into(),
            password: password.into(),
            dbname: None,
            pool_max_size: 100,
            pool_min_idle: Some(5),
            pool_idle_timeout: Some(Duration::from_secs(60)),
        }
    }

    pub fn dbname(mut self, dbname: impl Into<String>) -> Self {
        self.dbname = Some(dbname.into());
        self
    }

    pub fn pool_max_size(mut self, pool_max_size: u32) -> Self {
        self.pool_max_size = pool_max_size;
        self
    }

    pub fn pool_min_idle(mut self, pool_min_idle: Option<u32>) -> Self {
        self.pool_min_idle = pool_min_idle;
        self
    }

    pub fn pool_idle_timeout(mut self, pool_idle_timeout: Option<Duration>) -> Self {
        self.pool_idle_timeout = pool_idle_timeout;
        self
    }

    pub async fn build(self) -> Result<PostgresCluster, DistError> {
        let mut config = bb8_postgres::tokio_postgres::config::Config::new();
        config
            .host(&self.host)
            .port(self.port)
            .user(&self.user)
            .password(&self.password);
        if let Some(dbname) = self.dbname {
            config.dbname(dbname);
        }
        let manager = PostgresConnectionManager::new(config, NoTls);
        let pool = Pool::builder()
            .max_size(self.pool_max_size)
            .min_idle(self.pool_min_idle)
            .idle_timeout(self.pool_idle_timeout)
            .build(manager)
            .await
            .map_err(|e| DistError::cluster(Box::new(PostgresClusterError::Connection(e))))?;

        let cluster = PostgresCluster::new(pool);

        cluster.ensure_schema().await?;

        Ok(cluster)
    }
}