use crate::tcp::Tcp;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
#[derive(Debug, Clone)]
pub struct PoolConfig {
pub max_size: usize,
pub max_idle: u64,
pub max_lifetime: u64,
pub health_check_timeout: u64,
}
impl Default for PoolConfig {
fn default() -> Self {
Self {
max_size: 3,
max_idle: 600, max_lifetime: 3600, health_check_timeout: 5,
}
}
}
#[derive(Debug)]
struct PooledConnection {
tcp: Option<Tcp>,
created_at: Instant,
last_used: Instant,
in_use: bool,
}
impl PooledConnection {
fn new() -> Result<Self, std::io::Error> {
let tcp = Tcp::new()?;
Ok(Self {
tcp: Some(tcp),
created_at: Instant::now(),
last_used: Instant::now(),
in_use: false,
})
}
fn is_healthy(&self, config: &PoolConfig) -> bool {
if self.created_at.elapsed() > Duration::from_secs(config.max_lifetime) {
return false;
}
if self.last_used.elapsed() > Duration::from_secs(config.max_idle) {
return false;
}
true
}
fn mark_used(&mut self) {
self.in_use = true;
self.last_used = Instant::now();
}
fn mark_idle(&mut self) {
self.in_use = false;
self.last_used = Instant::now();
}
}
#[derive(Debug, Clone)]
pub struct ConnectionPool {
connections: Arc<Mutex<Vec<PooledConnection>>>,
config: PoolConfig,
}
impl ConnectionPool {
pub fn new(max_size: usize) -> Result<Self, std::io::Error> {
let config = PoolConfig {
max_size,
..Default::default()
};
Ok(Self {
connections: Arc::new(Mutex::new(Vec::new())),
config,
})
}
pub fn with_config(config: PoolConfig) -> Result<Self, std::io::Error> {
Ok(Self {
connections: Arc::new(Mutex::new(Vec::new())),
config,
})
}
pub fn get_connection(&self) -> Result<PooledConn, std::io::Error> {
let mut connections = self.connections.lock().unwrap();
self.cleanup_idle_connections(&mut connections);
for (i, conn) in connections.iter_mut().enumerate() {
if !conn.in_use && conn.is_healthy(&self.config) {
conn.mark_used();
let tcp = conn.tcp.take().unwrap();
return Ok(PooledConn {
pool: self.clone(),
index: i,
tcp: Some(tcp),
});
}
}
if connections.len() < self.config.max_size {
let tcp = Tcp::new()?;
let mut new_conn = PooledConnection {
tcp: None, created_at: Instant::now(),
last_used: Instant::now(),
in_use: true,
};
connections.push(new_conn);
let index = connections.len() - 1;
return Ok(PooledConn {
pool: self.clone(),
index,
tcp: Some(tcp),
});
}
Err(std::io::Error::new(
std::io::ErrorKind::WouldBlock,
"Connection pool exhausted",
))
}
fn cleanup_idle_connections(&self, connections: &mut Vec<PooledConnection>) {
connections.retain(|conn| {
if conn.in_use {
return true;
}
conn.is_healthy(&self.config)
});
}
pub fn stats(&self) -> PoolStats {
let connections = self.connections.lock().unwrap();
let total = connections.len();
let active = connections.iter().filter(|c| c.in_use).count();
let idle = total - active;
PoolStats {
total,
active,
idle,
max_size: self.config.max_size,
}
}
pub fn close(&self) {
let mut connections = self.connections.lock().unwrap();
connections.clear();
}
}
pub struct PooledConn {
pool: ConnectionPool,
index: usize,
tcp: Option<Tcp>,
}
impl PooledConn {
pub fn get_mut(&mut self) -> &mut Tcp {
self.tcp.as_mut().unwrap()
}
pub fn execute<F, R>(&mut self, f: F) -> R
where
F: FnOnce(&mut Tcp) -> R,
{
f(self.get_mut())
}
}
impl Drop for PooledConn {
fn drop(&mut self) {
let mut connections = self.pool.connections.lock().unwrap();
if let Some(pool_conn) = connections.get_mut(self.index) {
pool_conn.tcp = self.tcp.take();
pool_conn.mark_idle();
}
}
}
#[derive(Debug, Clone)]
pub struct PoolStats {
pub total: usize,
pub active: usize,
pub idle: usize,
pub max_size: usize,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pool_creation() {
let pool = ConnectionPool::new(3).unwrap();
let stats = pool.stats();
assert_eq!(stats.total, 0);
assert_eq!(stats.active, 0);
assert_eq!(stats.max_size, 3);
}
#[test]
fn test_pool_get_connection() {
let pool = ConnectionPool::new(3).unwrap();
let conn1 = pool.get_connection();
assert!(conn1.is_ok());
let stats = pool.stats();
assert_eq!(stats.total, 1);
assert_eq!(stats.active, 1);
assert_eq!(stats.idle, 0);
}
#[test]
fn test_pool_connection_return() {
let pool = ConnectionPool::new(3).unwrap();
{
let _conn = pool.get_connection().unwrap();
let stats = pool.stats();
assert_eq!(stats.active, 1);
}
let stats = pool.stats();
assert_eq!(stats.active, 0);
assert_eq!(stats.idle, 1);
}
#[test]
fn test_pool_max_size() {
let pool = ConnectionPool::new(2).unwrap();
let _conn1 = pool.get_connection().unwrap();
let stats = pool.stats();
assert_eq!(stats.total, 1);
let _conn2 = pool.get_connection().unwrap();
let stats = pool.stats();
assert_eq!(stats.total, 2);
let result = pool.get_connection();
assert!(result.is_err());
}
#[test]
fn test_pool_connection_reuse() {
let pool = ConnectionPool::new(2).unwrap();
{
let _conn = pool.get_connection().unwrap();
}
let stats = pool.stats();
assert_eq!(stats.total, 1);
let _conn2 = pool.get_connection().unwrap();
let stats = pool.stats();
assert_eq!(stats.total, 1); assert_eq!(stats.active, 1);
}
#[test]
fn test_pool_close() {
let pool = ConnectionPool::new(3).unwrap();
let _conn1 = pool.get_connection().unwrap();
let _conn2 = pool.get_connection().unwrap();
let stats = pool.stats();
assert_eq!(stats.total, 2);
pool.close();
let stats = pool.stats();
assert_eq!(stats.total, 0);
}
#[test]
fn test_pool_config() {
let config = PoolConfig {
max_size: 5,
max_idle: 300,
max_lifetime: 1800,
health_check_timeout: 10,
};
let pool = ConnectionPool::with_config(config).unwrap();
let stats = pool.stats();
assert_eq!(stats.max_size, 5);
}
}