pooly 0.2.1

A protobuf to Postgres adapter + connection pooling middleware.
Documentation
use std::sync::Arc;

use dashmap::DashMap;
use deadpool::managed::{Object, PoolConfig};
use deadpool_postgres::{Config, Manager, ManagerConfig, Pool, RecyclingMethod, Runtime, SslMode};
use tokio_postgres::NoTls;

use crate::models::errors::ConnectionError;
use crate::models::query::connections::ConnectionConfig;
use crate::services::connections::config::ConnectionConfigService;
use crate::services::limits::RateLimiter;
use crate::services::updatable::UpdatableService;

pub mod config;

pub struct ConnectionService {

    pools: DashMap<String, PoolEntry>,
    config_service: Arc<ConnectionConfigService>

}

pub type Connection = Object<Manager>;


impl ConnectionService {

    pub fn new(connection_config_service: Arc<ConnectionConfigService>) -> Self {
        ConnectionService {
            pools: DashMap::new(),
            config_service: connection_config_service
        }
    }

    pub async fn get(&self,
                     connection_id: &str) -> Option<Result<Connection, ConnectionError>> {
        match self.pools.get(connection_id) {
            Some(pool_entry) => {
                let rate_result = pool_entry.rate_limiter.acquire();

                if rate_result.is_err() {
                    return Some(Err(rate_result.unwrap_err().into()));
                }

                Some(pool_entry.pool.get().await.map_err(ConnectionError::PoolError))
            },
            None => self.create_or_empty(connection_id).await
        }
    }

    async fn create_or_empty(&self,
                             connection_id: &str) -> Option<Result<Connection, ConnectionError>> {
        match self.config_service.get(connection_id) {
            Ok(Some(config)) =>
                self.add_connection_pool(config.get_value()).await,
            Ok(None) => None,
            Err(err) => Some(Err(ConnectionError::StorageError(err)))
        }
    }

    async fn add_connection_pool(&self,
                                 connection_config: &ConnectionConfig)
                                 -> Option<Result<Connection, ConnectionError>> {
        let mut config = Config::new();

        config.dbname = Some(connection_config.db_name.clone());
        config.hosts = Some(connection_config.hosts.clone());
        config.ports = Some(connection_config.ports.clone());
        config.user = Some(connection_config.user.clone());
        config.password = Some(connection_config.password.clone());
        config.manager = Some(ManagerConfig { recycling_method: RecyclingMethod::Fast });
        config.pool = Some(PoolConfig::new(connection_config.max_connections as usize));
        config.ssl_mode = Some(SslMode::Prefer);

        Some(match config.create_pool(
            Some(Runtime::Tokio1),
            NoTls
        ) {
            Ok(pool) => {
                let result =
                    pool.get().await.map_err(ConnectionError::PoolError);

                if result.is_ok() {
                    self.pools.insert(connection_config.id.clone(), PoolEntry {
                        pool,
                        rate_limiter: connection_config.rate_limit.as_ref().map_or(
                            RateLimiter::NoOp,
                            |rate_config|
                                RateLimiter::leaky_bucket(
                                    rate_config.max_requests_per_period,
                                    rate_config.period_millis as u128))
                    });
                }

                result
            },
            Err(err) => Err(ConnectionError::CreatePoolError(err))
        })
    }
}

struct PoolEntry {

    pool: Pool,
    rate_limiter: RateLimiter

}