mod manager;
pub use manager::*;
#[cfg(native)]
use crate::error::NativeErrorKind;
use crate::{
connector::ConnectionInfo,
error::{Error, ErrorKind},
};
use mobc::Pool;
use std::{sync::Arc, time::Duration};
#[cfg(feature = "sqlite")]
use std::convert::TryFrom;
#[derive(Clone)]
pub struct Quaint {
pub(crate) inner: Pool<QuaintManager>,
connection_info: Arc<ConnectionInfo>,
pool_timeout: Option<Duration>,
}
pub struct Builder {
manager: QuaintManager,
connection_info: ConnectionInfo,
connection_limit: usize,
max_idle: Option<u64>,
max_idle_lifetime: Option<Duration>,
max_lifetime: Option<Duration>,
health_check_interval: Option<Duration>,
test_on_check_out: bool,
pool_timeout: Option<Duration>,
}
impl Builder {
fn new(url: &str, manager: QuaintManager) -> crate::Result<Self> {
let connection_limit = num_cpus::get_physical() * 2 + 1;
let connection_info = ConnectionInfo::from_url(url)?;
Ok(Self {
manager,
connection_info,
connection_limit,
max_idle: None,
max_idle_lifetime: None,
max_lifetime: None,
health_check_interval: None,
test_on_check_out: false,
pool_timeout: None,
})
}
pub fn connection_limit(&mut self, connection_limit: usize) {
self.connection_limit = connection_limit;
}
pub fn max_idle(&mut self, max_idle: u64) {
self.max_idle = Some(max_idle);
}
pub fn pool_timeout(&mut self, pool_timeout: Duration) {
assert_ne!(pool_timeout, Duration::from_secs(0), "pool_timeout must be positive");
self.pool_timeout = Some(pool_timeout);
}
pub fn max_lifetime(&mut self, max_lifetime: Duration) {
self.max_lifetime = Some(max_lifetime);
}
pub fn max_idle_lifetime(&mut self, max_idle_lifetime: Duration) {
self.max_idle_lifetime = Some(max_idle_lifetime);
}
pub fn test_on_check_out(&mut self, test_on_check_out: bool) {
self.test_on_check_out = test_on_check_out;
}
pub fn health_check_interval(&mut self, health_check_interval: Duration) {
self.health_check_interval = Some(health_check_interval);
}
#[cfg(feature = "postgresql-native")]
pub fn set_postgres_flavour(&mut self, flavour: crate::connector::PostgresFlavour) {
use crate::connector::{NativeConnectionInfo, PostgresUrl};
if let ConnectionInfo::Native(NativeConnectionInfo::Postgres(PostgresUrl::Native(ref mut url))) =
self.connection_info
{
url.set_flavour(flavour);
}
if let QuaintManager::Postgres { ref mut url, .. } = self.manager {
url.set_flavour(flavour);
}
}
pub fn build(self) -> Quaint {
let connection_info = Arc::new(self.connection_info);
Self::log_start(&connection_info, self.connection_limit);
let inner = Pool::builder()
.max_open(self.connection_limit as u64)
.max_idle(self.max_idle.unwrap_or(self.connection_limit as u64))
.max_idle_lifetime(self.max_idle_lifetime)
.max_lifetime(self.max_lifetime)
.get_timeout(None) .health_check_interval(self.health_check_interval)
.test_on_check_out(self.test_on_check_out)
.build(self.manager);
Quaint {
inner,
connection_info,
pool_timeout: self.pool_timeout,
}
}
fn log_start(info: &ConnectionInfo, connection_limit: usize) {
let family = info.sql_family();
let pg_bouncer = if info.pg_bouncer() { " in PgBouncer mode" } else { "" };
tracing::info!(
"Starting a {} pool with {} connections{}.",
family,
connection_limit,
pg_bouncer
);
}
}
impl Quaint {
pub fn builder_with_tracing(
url_str: &str,
#[allow(unused_variables)] is_tracing_enabled: bool,
) -> crate::Result<Builder> {
match url_str {
#[cfg(feature = "sqlite")]
s if s.starts_with("file") => {
let params = crate::connector::SqliteParams::try_from(s)?;
let manager = QuaintManager::Sqlite {
url: s.to_string(),
db_name: params.db_name,
};
let mut builder = Builder::new(s, manager)?;
if let Some(limit) = params.connection_limit {
builder.connection_limit(limit);
}
if let Some(max_lifetime) = params.max_connection_lifetime {
builder.max_lifetime(max_lifetime);
}
if let Some(max_idle_lifetime) = params.max_idle_connection_lifetime {
builder.max_idle_lifetime(max_idle_lifetime);
}
Ok(builder)
}
#[cfg(feature = "mysql")]
s if s.starts_with("mysql") => {
let mut url = crate::connector::MysqlUrl::new(url::Url::parse(s)?)?;
let connection_limit = url.connection_limit();
let pool_timeout = url.pool_timeout();
let max_connection_lifetime = url.max_connection_lifetime();
let max_idle_connection_lifetime = url.max_idle_connection_lifetime();
if is_tracing_enabled {
url.query_params.statement_cache_size = 0;
}
let manager = QuaintManager::Mysql { url };
let mut builder = Builder::new(s, manager)?;
if let Some(limit) = connection_limit {
builder.connection_limit(limit);
}
if let Some(timeout) = pool_timeout {
builder.pool_timeout(timeout);
}
if let Some(max_lifetime) = max_connection_lifetime {
builder.max_lifetime(max_lifetime);
}
if let Some(max_idle_lifetime) = max_idle_connection_lifetime {
builder.max_idle_lifetime(max_idle_lifetime);
}
Ok(builder)
}
#[cfg(feature = "postgresql")]
s if s.starts_with("postgres") || s.starts_with("postgresql") => {
let url = crate::connector::PostgresNativeUrl::new(url::Url::parse(s)?)?;
let connection_limit = url.connection_limit();
let pool_timeout = url.pool_timeout();
let max_connection_lifetime = url.max_connection_lifetime();
let max_idle_connection_lifetime = url.max_idle_connection_lifetime();
let tls_manager = crate::connector::MakeTlsConnectorManager::new(url.clone()).into();
let manager = QuaintManager::Postgres {
url,
tls_manager,
is_tracing_enabled,
};
let mut builder = Builder::new(s, manager)?;
if let Some(limit) = connection_limit {
builder.connection_limit(limit);
}
if let Some(timeout) = pool_timeout {
builder.pool_timeout(timeout);
}
if let Some(max_lifetime) = max_connection_lifetime {
builder.max_lifetime(max_lifetime);
}
if let Some(max_idle_lifetime) = max_idle_connection_lifetime {
builder.max_idle_lifetime(max_idle_lifetime);
}
Ok(builder)
}
#[cfg(feature = "mssql")]
s if s.starts_with("jdbc:sqlserver") || s.starts_with("sqlserver") => {
let url = crate::connector::MssqlUrl::new(s)?;
let connection_limit = url.connection_limit();
let pool_timeout = url.pool_timeout();
let max_connection_lifetime = url.max_connection_lifetime();
let max_idle_connection_lifetime = url.max_idle_connection_lifetime();
let manager = QuaintManager::Mssql { url };
let mut builder = Builder::new(s, manager)?;
if let Some(limit) = connection_limit {
builder.connection_limit(limit);
}
if let Some(timeout) = pool_timeout {
builder.pool_timeout(timeout);
}
if let Some(max_lifetime) = max_connection_lifetime {
builder.max_lifetime(max_lifetime);
}
if let Some(max_idle_lifetime) = max_idle_connection_lifetime {
builder.max_idle_lifetime(max_idle_lifetime);
}
Ok(builder)
}
_ => unimplemented!("Supported url schemes: file or sqlite, mysql, postgres or postgresql."),
}
}
pub fn builder(url_str: &str) -> crate::Result<Builder> {
Self::builder_with_tracing(url_str, false)
}
pub async fn capacity(&self) -> u32 {
self.inner.state().await.max_open as u32
}
pub async fn check_out(&self) -> crate::Result<PooledConnection> {
let res = match self.pool_timeout {
Some(duration) => crate::connector::metrics::check_out(self.inner.get_timeout(duration)).await,
None => crate::connector::metrics::check_out(self.inner.get()).await,
};
let inner = match res {
Ok(conn) => conn,
Err(mobc::Error::PoolClosed) => {
return Err(Error::builder(ErrorKind::Native(NativeErrorKind::PoolClosed {})).build());
}
Err(mobc::Error::Timeout) => {
let state = self.inner.state().await;
let timeout_duration = self.pool_timeout.unwrap();
return Err(
Error::builder(ErrorKind::pool_timeout(state.max_open, state.in_use, timeout_duration)).build(),
);
}
Err(mobc::Error::Inner(e)) => return Err(e),
Err(e @ mobc::Error::BadConn) => {
let error = Error::builder(ErrorKind::Native(NativeErrorKind::ConnectionError(Box::new(e)))).build();
return Err(error);
}
};
Ok(PooledConnection { inner })
}
pub fn connection_info(&self) -> &ConnectionInfo {
&self.connection_info
}
}