use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use cheetah_string::CheetahString;
use dashmap::DashMap;
use tokio::time;
use tracing::debug;
use tracing::info;
use tracing::warn;
use crate::clients::Client;
use crate::request_processor::default_request_processor::DefaultRemotingRequestProcessor;
#[derive(Clone)]
pub struct PooledConnection<PR = DefaultRemotingRequestProcessor> {
client: Client<PR>,
metrics: Arc<ConnectionMetrics>,
created_at: Instant,
}
impl<PR> PooledConnection<PR> {
pub fn new(client: Client<PR>) -> Self {
Self {
client,
metrics: Arc::new(ConnectionMetrics::new()),
created_at: Instant::now(),
}
}
pub fn client(&self) -> &Client<PR> {
&self.client
}
pub fn metrics(&self) -> &ConnectionMetrics {
&self.metrics
}
pub fn is_healthy(&self) -> bool {
true
}
pub fn is_idle(&self, max_idle: Duration) -> bool {
self.metrics.last_used().elapsed() > max_idle
}
pub fn record_success(&self, latency_ms: u64) {
self.metrics.record_success(latency_ms);
}
pub fn record_error(&self) {
self.metrics.record_error();
}
pub fn age(&self) -> Duration {
self.created_at.elapsed()
}
}
#[derive(Debug)]
pub struct ConnectionMetrics {
last_used: parking_lot::Mutex<Instant>,
request_count: AtomicU64,
consecutive_errors: AtomicU64,
latency_sum: AtomicU64,
total_errors: AtomicU64,
}
impl ConnectionMetrics {
pub fn new() -> Self {
Self {
last_used: parking_lot::Mutex::new(Instant::now()),
request_count: AtomicU64::new(0),
consecutive_errors: AtomicU64::new(0),
latency_sum: AtomicU64::new(0),
total_errors: AtomicU64::new(0),
}
}
pub fn record_success(&self, latency_ms: u64) {
*self.last_used.lock() = Instant::now();
self.request_count.fetch_add(1, Ordering::Relaxed);
self.latency_sum.fetch_add(latency_ms, Ordering::Relaxed);
self.consecutive_errors.store(0, Ordering::Relaxed);
}
pub fn record_error(&self) {
*self.last_used.lock() = Instant::now();
self.request_count.fetch_add(1, Ordering::Relaxed);
self.consecutive_errors.fetch_add(1, Ordering::Relaxed);
self.total_errors.fetch_add(1, Ordering::Relaxed);
}
pub fn avg_latency(&self) -> f64 {
let count = self.request_count.load(Ordering::Relaxed);
if count == 0 {
return 0.0;
}
let sum = self.latency_sum.load(Ordering::Relaxed);
sum as f64 / count as f64
}
pub fn error_rate(&self) -> f64 {
let count = self.request_count.load(Ordering::Relaxed);
if count == 0 {
return 0.0;
}
let errors = self.total_errors.load(Ordering::Relaxed);
errors as f64 / count as f64
}
pub fn consecutive_errors(&self) -> u64 {
self.consecutive_errors.load(Ordering::Relaxed)
}
pub fn request_count(&self) -> u64 {
self.request_count.load(Ordering::Relaxed)
}
pub fn last_used(&self) -> Instant {
*self.last_used.lock()
}
}
impl Default for ConnectionMetrics {
fn default() -> Self {
Self::new()
}
}
pub struct ConnectionPool<PR = DefaultRemotingRequestProcessor> {
connections: Arc<DashMap<CheetahString, PooledConnection<PR>>>,
max_connections: usize,
max_idle_duration: Duration,
}
impl<PR> ConnectionPool<PR> {
pub fn new(max_connections: usize, max_idle_duration: Duration) -> Self {
Self {
connections: Arc::new(DashMap::with_capacity(64)),
max_connections,
max_idle_duration,
}
}
pub fn get(&self, addr: &CheetahString) -> Option<PooledConnection<PR>>
where
PR: Clone,
{
if let Some(entry) = self.connections.get(addr) {
let conn = entry.value().clone();
if conn.is_healthy() {
debug!("Reusing pooled connection to {}", addr);
return Some(conn);
} else {
debug!("Removing unhealthy connection to {}", addr);
drop(entry); self.connections.remove(addr);
}
}
None
}
pub fn insert(&self, addr: CheetahString, client: Client<PR>) -> bool {
if self.max_connections > 0 && self.connections.len() >= self.max_connections {
warn!(
"Connection pool at capacity ({}/{}), rejecting connection to {}",
self.connections.len(),
self.max_connections,
addr
);
return false;
}
let pooled = PooledConnection::new(client);
self.connections.insert(addr.clone(), pooled);
info!(
"Added connection to pool: {} (pool size: {})",
addr,
self.connections.len()
);
true
}
pub fn remove(&self, addr: &CheetahString) -> Option<PooledConnection<PR>> {
self.connections.remove(addr).map(|(_, conn)| {
debug!("Removed connection from pool: {}", addr);
conn
})
}
pub fn get_metrics(&self, addr: &CheetahString) -> Option<Arc<ConnectionMetrics>> {
self.connections.get(addr).map(|entry| entry.value().metrics.clone())
}
pub fn record_success(&self, addr: &CheetahString, latency_ms: u64) {
if let Some(entry) = self.connections.get(addr) {
entry.value().record_success(latency_ms);
}
}
pub fn record_error(&self, addr: &CheetahString) {
if let Some(entry) = self.connections.get(addr) {
entry.value().record_error();
}
}
pub async fn evict_idle(&self) -> usize {
let mut to_remove = Vec::new();
for entry in self.connections.iter() {
if entry.value().is_idle(self.max_idle_duration) {
to_remove.push(entry.key().clone());
}
}
let count = to_remove.len();
if count > 0 {
info!("Evicting {} idle connections", count);
for addr in to_remove {
self.connections.remove(&addr);
}
}
count
}
pub async fn evict_unhealthy(&self) -> usize {
let mut to_remove = Vec::new();
for entry in self.connections.iter() {
if !entry.value().is_healthy() {
to_remove.push(entry.key().clone());
}
}
let count = to_remove.len();
if count > 0 {
warn!("Evicting {} unhealthy connections", count);
for addr in to_remove {
self.connections.remove(&addr);
}
}
count
}
pub fn stats(&self) -> PoolStats {
let size = self.connections.len();
let mut healthy = 0;
let mut idle = 0;
let mut total_requests = 0u64;
let mut total_errors = 0u64;
for entry in self.connections.iter() {
let conn = entry.value();
if conn.is_healthy() {
healthy += 1;
}
if conn.is_idle(self.max_idle_duration) {
idle += 1;
}
total_requests += conn.metrics().request_count();
total_errors += conn.metrics().total_errors.load(Ordering::Relaxed);
}
PoolStats {
total: size,
healthy,
idle,
max_connections: self.max_connections,
total_requests,
total_errors,
}
}
pub fn start_cleanup_task(&self, interval: Duration) -> tokio::task::JoinHandle<()>
where
PR: Send + Sync + 'static,
{
let connections = self.connections.clone();
let max_idle = self.max_idle_duration;
tokio::spawn(async move {
let mut ticker = time::interval(interval);
loop {
ticker.tick().await;
let mut idle_count = 0;
let mut unhealthy_count = 0;
let mut to_remove = Vec::new();
for entry in connections.iter() {
let conn = entry.value();
if !conn.is_healthy() {
to_remove.push((entry.key().clone(), "unhealthy"));
unhealthy_count += 1;
} else if conn.is_idle(max_idle) {
to_remove.push((entry.key().clone(), "idle"));
idle_count += 1;
}
}
if !to_remove.is_empty() {
info!(
"Cleanup: evicting {} idle and {} unhealthy connections",
idle_count, unhealthy_count
);
for (addr, reason) in to_remove {
connections.remove(&addr);
debug!("Evicted connection to {} (reason: {})", addr, reason);
}
}
debug!("Connection pool size: {} (after cleanup)", connections.len());
}
})
}
}
impl<PR> Clone for ConnectionPool<PR> {
fn clone(&self) -> Self {
Self {
connections: self.connections.clone(),
max_connections: self.max_connections,
max_idle_duration: self.max_idle_duration,
}
}
}
#[derive(Debug, Clone)]
pub struct PoolStats {
pub total: usize,
pub healthy: usize,
pub idle: usize,
pub max_connections: usize,
pub total_requests: u64,
pub total_errors: u64,
}
impl PoolStats {
pub fn utilization(&self) -> f64 {
if self.max_connections == 0 {
return 0.0;
}
self.total as f64 / self.max_connections as f64
}
pub fn error_rate(&self) -> f64 {
if self.total_requests == 0 {
return 0.0;
}
self.total_errors as f64 / self.total_requests as f64
}
pub fn active(&self) -> usize {
self.total - self.idle
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_connection_metrics() {
let metrics = ConnectionMetrics::new();
metrics.record_success(10);
metrics.record_success(20);
metrics.record_success(30);
assert_eq!(metrics.request_count(), 3);
assert_eq!(metrics.avg_latency(), 20.0);
assert_eq!(metrics.error_rate(), 0.0);
metrics.record_error();
assert_eq!(metrics.request_count(), 4);
assert_eq!(metrics.consecutive_errors(), 1);
assert_eq!(metrics.error_rate(), 0.25);
metrics.record_success(15);
assert_eq!(metrics.consecutive_errors(), 0);
}
#[test]
fn test_pool_stats() {
let stats = PoolStats {
total: 50,
healthy: 45,
idle: 10,
max_connections: 100,
total_requests: 10000,
total_errors: 100,
};
assert_eq!(stats.utilization(), 0.5);
assert_eq!(stats.error_rate(), 0.01);
assert_eq!(stats.active(), 40);
}
}