mod manager;
pub use manager::*;
use crate::{
connector::{ConnectionInfo, PostgresFlavour},
error::{Error, ErrorKind},
};
use mobc::Pool;
use std::{sync::Arc, time::Duration};
#[cfg(feature = "sqlite")]
use std::convert::TryFrom;
#[derive(Clone)]
pub struct Sqlint {
pub(crate) inner: Pool<SqlintManager>,
connection_info: Arc<ConnectionInfo>,
pool_timeout: Option<Duration>,
}
pub struct Builder {
manager: SqlintManager,
connection_info: ConnectionInfo,
connection_limit: usize,
max_idle: Option<u64>,
max_idle_lifetime: Option<Duration>,
max_lifetime: Option<Duration>,
health_check_interval: Option<Duration>,
test_on_check_out: bool,
pool_timeout: Option<Duration>,
}
impl Builder {
fn new(url: &str, manager: SqlintManager) -> crate::Result<Self> {
let connection_limit = num_cpus::get_physical() * 2 + 1;
let connection_info = ConnectionInfo::from_url(url)?;
Ok(Self {
manager,
connection_info,
connection_limit,
max_idle: None,
max_idle_lifetime: None,
max_lifetime: None,
health_check_interval: None,
test_on_check_out: false,
pool_timeout: None,
})
}
pub fn connection_limit(&mut self, connection_limit: usize) {
self.connection_limit = connection_limit;
}
pub fn max_idle(&mut self, max_idle: u64) {
self.max_idle = Some(max_idle);
}
pub fn pool_timeout(&mut self, pool_timeout: Duration) {
assert_ne!(pool_timeout, Duration::from_secs(0), "pool_timeout must be positive");
self.pool_timeout = Some(pool_timeout);
}
pub fn max_lifetime(&mut self, max_lifetime: Duration) {
self.max_lifetime = Some(max_lifetime);
}
pub fn max_idle_lifetime(&mut self, max_idle_lifetime: Duration) {
self.max_idle_lifetime = Some(max_idle_lifetime);
}
pub fn test_on_check_out(&mut self, test_on_check_out: bool) {
self.test_on_check_out = test_on_check_out;
}
pub fn health_check_interval(&mut self, health_check_interval: Duration) {
self.health_check_interval = Some(health_check_interval);
}
pub fn set_postgres_flavour(&mut self, flavour: PostgresFlavour) {
if let ConnectionInfo::Postgres(ref mut url) = self.connection_info {
url.set_flavour(flavour);
}
if let SqlintManager::Postgres { ref mut url } = self.manager {
url.set_flavour(flavour);
}
}
pub fn build(self) -> Sqlint {
let connection_info = Arc::new(self.connection_info);
Self::log_start(&connection_info, self.connection_limit);
let inner = Pool::builder()
.max_open(self.connection_limit as u64)
.max_idle(self.max_idle.unwrap_or(self.connection_limit as u64))
.max_idle_lifetime(self.max_idle_lifetime)
.max_lifetime(self.max_lifetime)
.get_timeout(None) .health_check_interval(self.health_check_interval)
.test_on_check_out(self.test_on_check_out)
.build(self.manager);
Sqlint { inner, connection_info, pool_timeout: self.pool_timeout }
}
fn log_start(info: &ConnectionInfo, connection_limit: usize) {
let family = info.sql_family();
let pg_bouncer = if info.pg_bouncer() { " in PgBouncer mode" } else { "" };
tracing::info!("Starting a {} pool with {} connections{}.", family, connection_limit, pg_bouncer);
}
}
impl Sqlint {
pub fn builder(url_str: &str) -> crate::Result<Builder> {
match url_str {
#[cfg(feature = "sqlite")]
s if s.starts_with("file") => {
let params = crate::connector::SqliteParams::try_from(s)?;
let manager = SqlintManager::Sqlite { url: s.to_string(), db_name: params.db_name };
let mut builder = Builder::new(s, manager)?;
if let Some(limit) = params.connection_limit {
builder.connection_limit(limit);
}
if let Some(max_lifetime) = params.max_connection_lifetime {
builder.max_lifetime(max_lifetime);
}
if let Some(max_idle_lifetime) = params.max_idle_connection_lifetime {
builder.max_idle_lifetime(max_idle_lifetime);
}
Ok(builder)
}
#[cfg(feature = "mysql")]
s if s.starts_with("mysql") => {
let url = crate::connector::MysqlUrl::new(url::Url::parse(s)?)?;
let connection_limit = url.connection_limit();
let pool_timeout = url.pool_timeout();
let max_connection_lifetime = url.max_connection_lifetime();
let max_idle_connection_lifetime = url.max_idle_connection_lifetime();
let manager = SqlintManager::Mysql { url };
let mut builder = Builder::new(s, manager)?;
if let Some(limit) = connection_limit {
builder.connection_limit(limit);
}
if let Some(timeout) = pool_timeout {
builder.pool_timeout(timeout);
}
if let Some(max_lifetime) = max_connection_lifetime {
builder.max_lifetime(max_lifetime);
}
if let Some(max_idle_lifetime) = max_idle_connection_lifetime {
builder.max_idle_lifetime(max_idle_lifetime);
}
Ok(builder)
}
#[cfg(feature = "postgresql")]
s if s.starts_with("postgres") || s.starts_with("postgresql") => {
let url = crate::connector::PostgresUrl::new(url::Url::parse(s)?)?;
let connection_limit = url.connection_limit();
let pool_timeout = url.pool_timeout();
let max_connection_lifetime = url.max_connection_lifetime();
let max_idle_connection_lifetime = url.max_idle_connection_lifetime();
let manager = SqlintManager::Postgres { url };
let mut builder = Builder::new(s, manager)?;
if let Some(limit) = connection_limit {
builder.connection_limit(limit);
}
if let Some(timeout) = pool_timeout {
builder.pool_timeout(timeout);
}
if let Some(max_lifetime) = max_connection_lifetime {
builder.max_lifetime(max_lifetime);
}
if let Some(max_idle_lifetime) = max_idle_connection_lifetime {
builder.max_idle_lifetime(max_idle_lifetime);
}
Ok(builder)
}
#[cfg(feature = "mssql")]
s if s.starts_with("jdbc:sqlserver") || s.starts_with("sqlserver") => {
let url = crate::connector::MssqlUrl::new(s)?;
let connection_limit = url.connection_limit();
let pool_timeout = url.pool_timeout();
let max_connection_lifetime = url.max_connection_lifetime();
let max_idle_connection_lifetime = url.max_idle_connection_lifetime();
let manager = SqlintManager::Mssql { url };
let mut builder = Builder::new(s, manager)?;
if let Some(limit) = connection_limit {
builder.connection_limit(limit);
}
if let Some(timeout) = pool_timeout {
builder.pool_timeout(timeout);
}
if let Some(max_lifetime) = max_connection_lifetime {
builder.max_lifetime(max_lifetime);
}
if let Some(max_idle_lifetime) = max_idle_connection_lifetime {
builder.max_idle_lifetime(max_idle_lifetime);
}
Ok(builder)
}
_ => unimplemented!("Supported url schemes: file or sqlite, mysql, postgres or postgresql."),
}
}
pub async fn capacity(&self) -> u32 {
self.inner.state().await.max_open as u32
}
pub async fn check_out(&self) -> crate::Result<PooledConnection> {
let res = match self.pool_timeout {
Some(duration) => crate::connector::metrics::check_out(self.inner.get_timeout(duration)).await,
None => crate::connector::metrics::check_out(self.inner.get()).await,
};
let inner = match res {
Ok(conn) => conn,
Err(mobc::Error::PoolClosed) => return Err(Error::builder(ErrorKind::PoolClosed {}).build()),
Err(mobc::Error::Timeout) => {
let state = self.inner.state().await;
let timeout_duration = self.pool_timeout.unwrap();
return Err(
Error::builder(ErrorKind::pool_timeout(state.max_open, state.in_use, timeout_duration)).build()
);
}
Err(mobc::Error::Inner(e)) => return Err(e),
Err(e @ mobc::Error::BadConn) => {
let error = Error::builder(ErrorKind::ConnectionError(Box::new(e))).build();
return Err(error);
}
};
Ok(PooledConnection { inner })
}
pub fn connection_info(&self) -> &ConnectionInfo {
&self.connection_info
}
}