use core::time::Duration;
use std::{convert::TryFrom, fmt::Display, path::PathBuf};
use diesel::{
SqliteConnection,
r2d2::{ConnectionManager, Pool, PooledConnection},
};
use log::*;
use crate::{connection_options::ConnectionOptions, error::SqliteStorageError};
const LOG_TARGET: &str = "common_sqlite::sqlite_connection_pool";
pub const R2D2_POOL_CONNECTION_DELTA: Duration = Duration::from_secs(5);
pub const R2D2_POOL_CONNECTION_TIMEOUT: Duration = Duration::from_secs(30);
#[derive(Clone)]
pub struct SqliteConnectionPool {
pool: Option<Pool<ConnectionManager<SqliteConnection>>>,
db_path: String,
pool_size: usize,
connection_options: ConnectionOptions,
}
impl SqliteConnectionPool {
pub fn new(
db_path: String,
pool_size: usize,
enable_wal: bool,
enable_foreign_keys: bool,
busy_timeout: Duration,
) -> Self {
Self {
pool: None,
db_path,
pool_size,
connection_options: ConnectionOptions::new(enable_wal, enable_foreign_keys, busy_timeout),
}
}
pub fn create_pool(&mut self) -> Result<(), SqliteStorageError> {
if self.pool.is_none() {
let mut builder = Pool::builder()
.max_size(u32::try_from(self.pool_size)?)
.connection_customizer(Box::new(self.connection_options.clone()));
if let Some(timeout) = self.connection_options.get_busy_timeout() {
builder = builder.connection_timeout(timeout + R2D2_POOL_CONNECTION_DELTA);
} else {
builder = builder.connection_timeout(R2D2_POOL_CONNECTION_TIMEOUT);
}
let pool = builder
.build(ConnectionManager::<SqliteConnection>::new(self.db_path.as_str()))
.map_err(|e| SqliteStorageError::DieselR2d2Error(e.to_string()))?;
self.pool = Some(pool);
} else {
warn!(target: LOG_TARGET, "Connection pool for {} already exists", self.db_path);
}
Ok(())
}
pub fn get_pooled_connection(
&self,
) -> Result<PooledConnection<ConnectionManager<SqliteConnection>>, SqliteStorageError> {
if let Some(pool) = self.pool.as_ref() {
let start = std::time::Instant::now();
let connection = pool.get().map_err(|e| {
warn!(target: LOG_TARGET, "Connection pool state {:?}: {}", pool.state(), e);
SqliteStorageError::DieselR2d2Error(e.to_string())
});
let timing = start.elapsed();
if timing > Duration::from_millis(100) {
debug!(target: LOG_TARGET, "Acquired 'get_pooled_connection' from pool in {:.2?}", timing);
}
connection
} else {
Err(SqliteStorageError::DieselR2d2Error("Pool does not exist".to_string()))
}
}
pub fn get_pooled_connection_timeout(
&self,
timeout: Duration,
) -> Result<PooledConnection<ConnectionManager<SqliteConnection>>, SqliteStorageError> {
if let Some(pool) = self.pool.clone() {
let start = std::time::Instant::now();
let connection = pool.get_timeout(timeout).map_err(|e| {
warn!(target: LOG_TARGET, "Connection pool state {:?}: {}", pool.state(), e);
SqliteStorageError::DieselR2d2Error(e.to_string())
});
let timing = start.elapsed();
if timing > Duration::from_millis(100) {
debug!(target: LOG_TARGET, "Acquired 'get_pooled_connection_timeout' from pool in {:.2?}", timing);
}
connection
} else {
Err(SqliteStorageError::DieselR2d2Error("Pool does not exist".to_string()))
}
}
pub fn try_get_pooled_connection(
&self,
) -> Result<Option<PooledConnection<ConnectionManager<SqliteConnection>>>, SqliteStorageError> {
if let Some(pool) = self.pool.clone() {
let start = std::time::Instant::now();
let connection = pool.try_get();
if connection.is_none() {
warn!(target: LOG_TARGET, "No connections available, pool state {:?}", pool.state());
} else {
let timing = start.elapsed();
if timing > Duration::from_millis(100) {
debug!(target: LOG_TARGET, "Acquired 'try_get_pooled_connection' from pool in {:.2?}", timing);
}
}
Ok(connection)
} else {
Err(SqliteStorageError::DieselR2d2Error("Pool does not exist".to_string()))
}
}
pub fn db_path(&self) -> PathBuf {
PathBuf::from(&self.db_path)
}
pub fn cleanup(&mut self) -> Option<String> {
if let Some(pool) = self.pool.take() {
let state = format!("{:?}", pool.state());
drop(pool);
return Some(state);
}
None
}
}
impl Display for SqliteConnectionPool {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let pool_state = if let Some(pool) = self.pool.clone() {
format!("{:?}", pool.state())
} else {
"None".to_string()
};
write!(
f,
"SqliteConnectionPool {{ pool state: {}, db_path: {}, pool_size: {}, connection_options: {:?} }}",
pool_state, self.db_path, self.pool_size, self.connection_options
)
}
}
pub trait PooledDbConnection: Send + Sync + Clone {
type Error;
fn get_pooled_connection(&self) -> Result<PooledConnection<ConnectionManager<SqliteConnection>>, Self::Error>;
}