use crate::yahoo_error::YahooError;
use reqwest::{Client, ClientBuilder};
use std::collections::VecDeque;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Weak};
use std::time::{Duration, Instant, SystemTime};
use tokio::sync::{Mutex, RwLock, Semaphore};
use tokio::time::sleep;
#[derive(Debug, Clone)]
pub struct ConnectionPoolConfig {
pub min_connections: usize,
pub max_connections: usize,
pub connection_timeout: Duration,
pub request_timeout: Duration,
pub idle_timeout: Duration,
pub health_check_interval: Duration,
pub max_connection_lifetime: Duration,
pub enable_analytics: bool,
pub keep_alive_config: KeepAliveConfig,
}
#[derive(Debug, Clone)]
pub struct KeepAliveConfig {
pub enabled: bool,
pub timeout: Duration,
pub interval: Duration,
pub probe_count: u32,
}
#[derive(Debug, Clone, PartialEq)]
pub enum ConnectionHealth {
Healthy,
Degraded,
Unhealthy,
Testing,
}
#[derive(Debug)]
pub struct PooledConnection {
pub client: Client,
pub id: u64,
pub created_at: Instant,
pub last_used: Instant,
pub request_count: AtomicU64,
pub health: Arc<RwLock<ConnectionHealth>>,
pub config: ConnectionConfig,
}
#[derive(Debug, Clone)]
pub struct ConnectionConfig {
pub user_agent: String,
pub headers: Vec<(String, String)>,
pub proxy: Option<String>,
pub tls_config: TlsConfig,
}
#[derive(Debug, Clone)]
pub struct TlsConfig {
pub accept_invalid_certs: bool,
pub accept_invalid_hostnames: bool,
pub min_tls_version: TlsVersion,
}
#[derive(Debug, Clone, PartialEq)]
pub enum TlsVersion {
TlsV1_0,
TlsV1_1,
TlsV1_2,
TlsV1_3,
}
#[derive(Debug, Clone)]
pub struct PoolStats {
pub active_connections: usize,
pub idle_connections: usize,
pub total_requests: u64,
pub failed_requests: u64,
pub avg_request_duration_ms: f64,
pub pool_efficiency: f64,
pub health_distribution: HealthDistribution,
pub utilization_history: VecDeque<UtilizationSnapshot>,
}
#[derive(Debug, Clone)]
pub struct HealthDistribution {
pub healthy_count: usize,
pub degraded_count: usize,
pub unhealthy_count: usize,
pub testing_count: usize,
}
#[derive(Debug, Clone)]
pub struct UtilizationSnapshot {
pub timestamp: SystemTime,
pub active_connections: usize,
pub pending_requests: usize,
pub throughput_rps: f64,
}
pub struct ConnectionPool {
config: ConnectionPoolConfig,
connections: Arc<Mutex<VecDeque<Arc<PooledConnection>>>>,
active_connections: Arc<AtomicUsize>,
connection_semaphore: Arc<Semaphore>,
connection_id_counter: Arc<AtomicU64>,
stats: Arc<RwLock<PoolStats>>,
health_monitor: Arc<HealthMonitor>,
is_shutdown: Arc<AtomicUsize>, }
pub struct HealthMonitor {
monitored_connections: Arc<RwLock<Vec<Weak<PooledConnection>>>>,
check_interval: Duration,
health_stats: Arc<RwLock<HealthStats>>,
}
#[derive(Debug, Clone)]
pub struct HealthStats {
pub total_checks: u64,
pub passed_checks: u64,
pub failed_checks: u64,
pub avg_check_duration_ms: f64,
pub recent_failures: VecDeque<HealthFailure>,
}
#[derive(Debug, Clone)]
pub struct HealthFailure {
pub connection_id: u64,
pub timestamp: SystemTime,
pub error_message: String,
pub failure_type: HealthFailureType,
}
#[derive(Debug, Clone, PartialEq)]
pub enum HealthFailureType {
Timeout,
NetworkError,
DnsFailure,
TlsFailure,
HttpError(u16),
}
impl Default for ConnectionPoolConfig {
fn default() -> Self {
Self {
min_connections: 5,
max_connections: 50,
connection_timeout: Duration::from_secs(30),
request_timeout: Duration::from_secs(60),
idle_timeout: Duration::from_secs(300), health_check_interval: Duration::from_secs(30),
max_connection_lifetime: Duration::from_secs(3600), enable_analytics: true,
keep_alive_config: KeepAliveConfig {
enabled: true,
timeout: Duration::from_secs(60),
interval: Duration::from_secs(10),
probe_count: 3,
},
}
}
}
impl Default for ConnectionConfig {
fn default() -> Self {
Self {
user_agent: "EEYF/1.0".to_string(),
headers: vec![
("Accept".to_string(), "application/json".to_string()),
("Accept-Encoding".to_string(), "gzip, deflate".to_string()),
],
proxy: None,
tls_config: TlsConfig {
accept_invalid_certs: false,
accept_invalid_hostnames: false,
min_tls_version: TlsVersion::TlsV1_2,
},
}
}
}
impl ConnectionPool {
pub async fn new(config: ConnectionPoolConfig) -> Result<Self, YahooError> {
let semaphore = Arc::new(Semaphore::new(config.max_connections));
let health_monitor = Arc::new(HealthMonitor::new(config.health_check_interval));
let pool = Self {
connections: Arc::new(Mutex::new(VecDeque::new())),
active_connections: Arc::new(AtomicUsize::new(0)),
connection_semaphore: semaphore,
connection_id_counter: Arc::new(AtomicU64::new(1)),
stats: Arc::new(RwLock::new(PoolStats::default())),
health_monitor,
is_shutdown: Arc::new(AtomicUsize::new(0)),
config,
};
pool.ensure_min_connections().await?;
pool.update_pool_statistics().await?;
pool.start_health_monitoring().await;
Ok(pool)
}
pub async fn get_connection(&self) -> Result<Arc<PooledConnection>, YahooError> {
if self.is_shutdown.load(Ordering::Relaxed) > 0 {
return Err(YahooError::ConnectionFailed(
"Pool is shutting down".to_string(),
));
}
if let Some(conn) = self.try_get_existing_connection().await? {
return Ok(conn);
}
let _permit = self.connection_semaphore.acquire().await.map_err(|_| {
YahooError::ConnectionFailed("Failed to acquire connection permit".to_string())
})?;
if self.active_connections.load(Ordering::Relaxed) < self.config.max_connections {
let conn = self.create_connection().await?;
self.active_connections.fetch_add(1, Ordering::Relaxed);
return Ok(conn);
}
Err(YahooError::ConnectionFailed(
"Connection pool exhausted".to_string(),
))
}
pub async fn return_connection(
&self,
connection: Arc<PooledConnection>,
) -> Result<(), YahooError> {
let health = connection.health.read().await;
if *health == ConnectionHealth::Healthy {
drop(health);
let mut connections = self.connections.lock().await;
connections.push_back(connection);
self.active_connections.fetch_sub(1, Ordering::Relaxed);
} else {
self.active_connections.fetch_sub(1, Ordering::Relaxed);
}
Ok(())
}
pub async fn get_stats(&self) -> PoolStats {
self.stats.read().await.clone()
}
pub async fn maintenance(&self) -> Result<(), YahooError> {
self.cleanup_idle_connections().await?;
self.replace_old_connections().await?;
self.ensure_min_connections().await?;
self.update_pool_statistics().await?;
Ok(())
}
pub async fn shutdown(&self) -> Result<(), YahooError> {
self.is_shutdown.store(1, Ordering::Relaxed);
let mut retries = 0;
while self.active_connections.load(Ordering::Relaxed) > 0 && retries < 30 {
sleep(Duration::from_secs(1)).await;
retries += 1;
}
let mut connections = self.connections.lock().await;
connections.clear();
self.is_shutdown.store(2, Ordering::Relaxed);
Ok(())
}
async fn try_get_existing_connection(
&self,
) -> Result<Option<Arc<PooledConnection>>, YahooError> {
let mut connections = self.connections.lock().await;
while let Some(conn) = connections.pop_front() {
let health = conn.health.read().await.clone();
if health == ConnectionHealth::Healthy {
if conn.created_at.elapsed() < self.config.max_connection_lifetime {
self.active_connections.fetch_add(1, Ordering::Relaxed);
return Ok(Some(conn));
}
}
}
Ok(None)
}
async fn create_connection(&self) -> Result<Arc<PooledConnection>, YahooError> {
let connection_id = self.connection_id_counter.fetch_add(1, Ordering::Relaxed);
let conn_config = ConnectionConfig::default();
let mut client_builder = ClientBuilder::new()
.timeout(self.config.request_timeout)
.connect_timeout(self.config.connection_timeout)
.tcp_keepalive(self.config.keep_alive_config.timeout)
.user_agent(&conn_config.user_agent);
match conn_config.tls_config.min_tls_version {
TlsVersion::TlsV1_2 => {
client_builder = client_builder.min_tls_version(reqwest::tls::Version::TLS_1_2);
}
TlsVersion::TlsV1_3 => {
client_builder = client_builder.min_tls_version(reqwest::tls::Version::TLS_1_3);
}
_ => {} }
let client = client_builder
.build()
.map_err(|e| YahooError::ConnectionFailed(format!("Failed to create client: {}", e)))?;
let connection = Arc::new(PooledConnection {
client,
id: connection_id,
created_at: Instant::now(),
last_used: Instant::now(),
request_count: AtomicU64::new(0),
health: Arc::new(RwLock::new(ConnectionHealth::Healthy)),
config: conn_config,
});
self.health_monitor
.register_connection(Arc::downgrade(&connection))
.await;
Ok(connection)
}
async fn cleanup_idle_connections(&self) -> Result<(), YahooError> {
let mut connections = self.connections.lock().await;
let now = Instant::now();
connections.retain(|conn| {
let idle_time = now.duration_since(conn.last_used);
if idle_time > self.config.idle_timeout {
false
} else {
true
}
});
Ok(())
}
async fn replace_old_connections(&self) -> Result<(), YahooError> {
let mut connections = self.connections.lock().await;
let now = Instant::now();
let old_connections: Vec<_> = connections
.iter()
.enumerate()
.filter(|(_, conn)| {
now.duration_since(conn.created_at) > self.config.max_connection_lifetime
})
.map(|(i, _)| i)
.collect();
for &index in old_connections.iter().rev() {
connections.remove(index);
}
Ok(())
}
async fn ensure_min_connections(&self) -> Result<(), YahooError> {
let connections_count = self.connections.lock().await.len();
let needed = self
.config
.min_connections
.saturating_sub(connections_count);
for _ in 0..needed {
if let Ok(conn) = self.create_connection().await {
self.connections.lock().await.push_back(conn);
}
}
Ok(())
}
async fn start_health_monitoring(&self) {
let health_monitor = Arc::clone(&self.health_monitor);
let pool_weak = Arc::downgrade(&self.stats);
tokio::spawn(async move {
while let Some(_stats) = pool_weak.upgrade() {
if let Err(e) = health_monitor.perform_health_checks().await {
eprintln!("Health check failed: {}", e);
}
sleep(health_monitor.check_interval).await;
}
});
}
async fn update_pool_statistics(&self) -> Result<(), YahooError> {
let mut stats = self.stats.write().await;
let connections_guard = self.connections.lock().await;
stats.active_connections = self.active_connections.load(Ordering::Relaxed);
stats.idle_connections = connections_guard.len();
let mut healthy = 0;
let mut degraded = 0;
let mut unhealthy = 0;
let mut testing = 0;
for conn in connections_guard.iter() {
let health = futures::executor::block_on(conn.health.read());
match *health {
ConnectionHealth::Healthy => healthy += 1,
ConnectionHealth::Degraded => degraded += 1,
ConnectionHealth::Unhealthy => unhealthy += 1,
ConnectionHealth::Testing => testing += 1,
}
}
stats.health_distribution = HealthDistribution {
healthy_count: healthy,
degraded_count: degraded,
unhealthy_count: unhealthy,
testing_count: testing,
};
let total_connections = stats.active_connections + stats.idle_connections;
stats.pool_efficiency = if total_connections > 0 {
stats.active_connections as f64 / total_connections as f64
} else {
0.0
};
Ok(())
}
}
impl HealthMonitor {
pub fn new(check_interval: Duration) -> Self {
Self {
monitored_connections: Arc::new(RwLock::new(Vec::new())),
check_interval,
health_stats: Arc::new(RwLock::new(HealthStats::default())),
}
}
pub async fn register_connection(&self, connection: Weak<PooledConnection>) {
let mut connections = self.monitored_connections.write().await;
connections.push(connection);
}
pub async fn perform_health_checks(&self) -> Result<(), YahooError> {
let connections = self.monitored_connections.read().await;
let mut active_connections = Vec::new();
for weak_conn in connections.iter() {
if let Some(conn) = weak_conn.upgrade() {
active_connections.push(conn);
}
}
drop(connections);
let mut connections = self.monitored_connections.write().await;
connections.retain(|weak| weak.strong_count() > 0);
drop(connections);
for conn in active_connections {
self.check_connection_health(conn).await?;
}
Ok(())
}
async fn check_connection_health(
&self,
connection: Arc<PooledConnection>,
) -> Result<(), YahooError> {
let start_time = Instant::now();
let mut health_stats = self.health_stats.write().await;
health_stats.total_checks += 1;
drop(health_stats);
{
let mut health = connection.health.write().await;
*health = ConnectionHealth::Testing;
}
let health_result = self.perform_lightweight_check(&connection).await;
let new_health = match health_result {
Ok(true) => {
let mut stats = self.health_stats.write().await;
stats.passed_checks += 1;
ConnectionHealth::Healthy
}
Ok(false) => {
let mut stats = self.health_stats.write().await;
stats.failed_checks += 1;
ConnectionHealth::Degraded
}
Err(_) => {
let mut stats = self.health_stats.write().await;
stats.failed_checks += 1;
ConnectionHealth::Unhealthy
}
};
{
let mut health = connection.health.write().await;
*health = new_health;
}
let check_duration = start_time.elapsed();
let mut health_stats = self.health_stats.write().await;
health_stats.avg_check_duration_ms = (health_stats.avg_check_duration_ms
* (health_stats.total_checks - 1) as f64
+ check_duration.as_secs_f64() * 1000.0)
/ health_stats.total_checks as f64;
Ok(())
}
async fn perform_lightweight_check(
&self,
_connection: &PooledConnection,
) -> Result<bool, YahooError> {
Ok(true)
}
}
impl Default for PoolStats {
fn default() -> Self {
Self {
active_connections: 0,
idle_connections: 0,
total_requests: 0,
failed_requests: 0,
avg_request_duration_ms: 0.0,
pool_efficiency: 0.0,
health_distribution: HealthDistribution {
healthy_count: 0,
degraded_count: 0,
unhealthy_count: 0,
testing_count: 0,
},
utilization_history: VecDeque::new(),
}
}
}
impl Default for HealthStats {
fn default() -> Self {
Self {
total_checks: 0,
passed_checks: 0,
failed_checks: 0,
avg_check_duration_ms: 0.0,
recent_failures: VecDeque::new(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_connection_pool_creation() {
let config = ConnectionPoolConfig::default();
let pool = ConnectionPool::new(config).await.unwrap();
let stats = pool.get_stats().await;
assert!(stats.idle_connections >= 5); }
#[tokio::test]
async fn test_get_and_return_connection() {
let config = ConnectionPoolConfig::default();
let pool = ConnectionPool::new(config).await.unwrap();
let conn = pool.get_connection().await.unwrap();
assert_eq!(conn.request_count.load(Ordering::Relaxed), 0);
pool.return_connection(conn).await.unwrap();
let stats = pool.get_stats().await;
assert!(stats.idle_connections > 0);
}
#[tokio::test]
async fn test_pool_maintenance() {
let mut config = ConnectionPoolConfig::default();
config.idle_timeout = Duration::from_millis(100);
let pool = ConnectionPool::new(config).await.unwrap();
tokio::time::sleep(Duration::from_millis(150)).await;
pool.maintenance().await.unwrap();
let stats = pool.get_stats().await;
assert!(stats.idle_connections >= 5);
}
#[tokio::test]
async fn test_pool_shutdown() {
let config = ConnectionPoolConfig::default();
let pool = ConnectionPool::new(config).await.unwrap();
let _conn = pool.get_connection().await.unwrap();
let result = pool.shutdown().await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_health_monitor() {
let monitor = HealthMonitor::new(Duration::from_millis(100));
let config = ConnectionConfig::default();
let client = ClientBuilder::new().build().unwrap();
let conn = Arc::new(PooledConnection {
client,
id: 1,
created_at: Instant::now(),
last_used: Instant::now(),
request_count: AtomicU64::new(0),
health: Arc::new(RwLock::new(ConnectionHealth::Healthy)),
config,
});
monitor.register_connection(Arc::downgrade(&conn)).await;
let result = monitor.perform_health_checks().await;
assert!(result.is_ok());
}
}