use std::collections::VecDeque;
use std::sync::{Arc, Condvar, Mutex};
use crate::{StarConfig, StarError, StarResult};
use super::StarStore;
pub struct ConnectionPool {
available_connections: Arc<Mutex<VecDeque<Arc<StarStore>>>>,
connection_available: Arc<Condvar>,
max_connections: usize,
active_connections: Arc<Mutex<usize>>,
config: StarConfig,
}
impl ConnectionPool {
pub fn new(max_connections: usize, config: StarConfig) -> Self {
Self {
available_connections: Arc::new(Mutex::new(VecDeque::new())),
connection_available: Arc::new(Condvar::new()),
max_connections,
active_connections: Arc::new(Mutex::new(0)),
config,
}
}
pub fn get_connection(&self) -> StarResult<PooledConnection> {
let mut available = self
.available_connections
.lock()
.unwrap_or_else(|e| e.into_inner());
if let Some(store) = available.pop_front() {
return Ok(PooledConnection::new(store, self.clone()));
}
let mut active_count = self
.active_connections
.lock()
.unwrap_or_else(|e| e.into_inner());
if *active_count < self.max_connections {
*active_count += 1;
drop(active_count);
drop(available);
let store = Arc::new(StarStore::with_config(self.config.clone()));
return Ok(PooledConnection::new(store, self.clone()));
}
drop(active_count);
available = self
.connection_available
.wait(available)
.unwrap_or_else(|e| e.into_inner());
match available.pop_front() {
Some(store) => Ok(PooledConnection::new(store, self.clone())),
_ => Err(StarError::query_error(
"No connections available".to_string(),
)),
}
}
pub fn try_get_connection(&self) -> Option<PooledConnection> {
let mut available = self.available_connections.lock().ok()?;
if let Some(store) = available.pop_front() {
return Some(PooledConnection::new(store, self.clone()));
}
let mut active_count = self.active_connections.lock().ok()?;
if *active_count < self.max_connections {
*active_count += 1;
drop(active_count);
drop(available);
let store = Arc::new(StarStore::with_config(self.config.clone()));
Some(PooledConnection::new(store, self.clone()))
} else {
None
}
}
pub(super) fn return_connection(&self, store: Arc<StarStore>) {
let mut available = self
.available_connections
.lock()
.unwrap_or_else(|e| e.into_inner());
available.push_back(store);
self.connection_available.notify_one();
}
pub fn get_statistics(&self) -> PoolStatistics {
let available = self
.available_connections
.lock()
.unwrap_or_else(|e| e.into_inner());
let active_count = self
.active_connections
.lock()
.unwrap_or_else(|e| e.into_inner());
PoolStatistics {
available_connections: available.len(),
active_connections: *active_count,
max_connections: self.max_connections,
utilization: (*active_count as f64 / self.max_connections as f64) * 100.0,
}
}
}
impl Clone for ConnectionPool {
fn clone(&self) -> Self {
Self {
available_connections: Arc::clone(&self.available_connections),
connection_available: Arc::clone(&self.connection_available),
max_connections: self.max_connections,
active_connections: Arc::clone(&self.active_connections),
config: self.config.clone(),
}
}
}
pub struct PooledConnection {
store: Option<Arc<StarStore>>,
pool: ConnectionPool,
}
impl PooledConnection {
fn new(store: Arc<StarStore>, pool: ConnectionPool) -> Self {
Self {
store: Some(store),
pool,
}
}
pub fn store(&self) -> &StarStore {
self.store.as_ref().expect("Connection has been dropped")
}
}
impl Drop for PooledConnection {
fn drop(&mut self) {
if let Some(store) = self.store.take() {
self.pool.return_connection(store);
}
}
}
#[derive(Debug, Clone)]
pub struct PoolStatistics {
pub available_connections: usize,
pub active_connections: usize,
pub max_connections: usize,
pub utilization: f64,
}