use crate::balancer::{BalancingStrategy, EndpointId, LoadBalancer};
use crate::circuit_breaker::CircuitBreaker;
use crate::error::{NetError, NetResult};
use parking_lot::RwLock;
use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
use tokio::time;
use tonic::transport::{Channel, ClientTlsConfig, Endpoint};
#[derive(Debug, Clone)]
pub struct PoolConfig {
pub min_size: usize,
pub max_size: usize,
pub idle_timeout: Duration,
pub max_lifetime: Duration,
pub connect_timeout: Duration,
pub health_check_interval: Duration,
pub balancing_strategy: BalancingStrategy,
pub enable_circuit_breaker: bool,
}
impl Default for PoolConfig {
fn default() -> Self {
Self {
min_size: 2,
max_size: 10,
idle_timeout: Duration::from_secs(300), max_lifetime: Duration::from_secs(1800), connect_timeout: Duration::from_secs(10),
health_check_interval: Duration::from_secs(30),
balancing_strategy: BalancingStrategy::LeastConnections,
enable_circuit_breaker: true,
}
}
}
#[derive(Debug, Clone)]
pub struct HealthCheckConfig {
pub interval: Duration,
pub timeout: Duration,
pub unhealthy_threshold: u32,
}
impl Default for HealthCheckConfig {
fn default() -> Self {
Self {
interval: Duration::from_secs(30),
timeout: Duration::from_secs(5),
unhealthy_threshold: 3,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConnectionHealth {
Healthy,
Degraded,
Unhealthy,
}
#[derive(Debug, Clone)]
pub struct AdaptiveConfig {
pub min_size: usize,
pub max_size: usize,
pub scale_up_threshold: f64,
pub scale_down_threshold: f64,
pub scale_step: usize,
pub cooldown: Duration,
}
impl Default for AdaptiveConfig {
fn default() -> Self {
Self {
min_size: 2,
max_size: 20,
scale_up_threshold: 0.8,
scale_down_threshold: 0.2,
scale_step: 2,
cooldown: Duration::from_secs(60),
}
}
}
#[derive(Debug, Clone)]
pub struct PoolMetrics {
pub total_connections: usize,
pub active_connections: usize,
pub idle_connections: usize,
pub total_checkouts: u64,
pub total_checkins: u64,
pub total_timeouts: u64,
pub total_health_check_failures: u64,
pub avg_checkout_duration_us: u64,
pub utilization: f64,
}
impl Default for PoolMetrics {
fn default() -> Self {
Self {
total_connections: 0,
active_connections: 0,
idle_connections: 0,
total_checkouts: 0,
total_checkins: 0,
total_timeouts: 0,
total_health_check_failures: 0,
avg_checkout_duration_us: 0,
utilization: 0.0,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct PoolStats {
pub total_connections: usize,
pub active_connections: usize,
pub idle_connections: usize,
pub failed_connections: u64,
pub total_created: u64,
pub total_closed: u64,
pub pool_exhausted_count: u64,
pub avg_wait_time_ms: u64,
}
#[derive(Debug)]
struct ConnectionMeta {
channel: Channel,
endpoint_id: EndpointId,
created_at: Instant,
last_used: Instant,
health: ConnectionHealth,
health_check_failures: u32,
last_health_check: Option<Instant>,
}
impl ConnectionMeta {
fn new(channel: Channel, endpoint_id: EndpointId) -> Self {
let now = Instant::now();
Self {
channel,
endpoint_id,
created_at: now,
last_used: now,
health: ConnectionHealth::Healthy,
health_check_failures: 0,
last_health_check: None,
}
}
fn is_idle_expired(&self, idle_timeout: Duration) -> bool {
self.last_used.elapsed() > idle_timeout
}
fn is_lifetime_expired(&self, max_lifetime: Duration) -> bool {
self.created_at.elapsed() > max_lifetime
}
fn touch(&mut self) {
self.last_used = Instant::now();
}
fn record_health_success(&mut self) {
self.health_check_failures = 0;
self.health = ConnectionHealth::Healthy;
self.last_health_check = Some(Instant::now());
}
fn record_health_failure(&mut self, unhealthy_threshold: u32) -> ConnectionHealth {
self.health_check_failures += 1;
self.last_health_check = Some(Instant::now());
if self.health_check_failures >= unhealthy_threshold {
self.health = ConnectionHealth::Unhealthy;
} else if self.health_check_failures >= unhealthy_threshold.saturating_sub(1).max(1) {
self.health = ConnectionHealth::Degraded;
}
self.health
}
}
pub struct PooledConnection {
meta: Option<ConnectionMeta>,
pool: Arc<ConnectionPoolInner>,
}
impl PooledConnection {
pub fn channel(&self) -> &Channel {
&self.meta.as_ref().expect("connection should exist").channel
}
pub fn endpoint_id(&self) -> &str {
&self
.meta
.as_ref()
.expect("connection should exist")
.endpoint_id
}
}
impl Drop for PooledConnection {
fn drop(&mut self) {
if let Some(mut meta) = self.meta.take() {
meta.touch();
self.pool.return_connection(meta);
}
}
}
struct ConnectionPoolInner {
config: PoolConfig,
health_check_config: HealthCheckConfig,
adaptive_config: Option<AdaptiveConfig>,
tls_config: Option<ClientTlsConfig>,
idle_connections: RwLock<VecDeque<ConnectionMeta>>,
active_count: std::sync::Mutex<usize>,
stats: RwLock<PoolStats>,
load_balancer: LoadBalancer,
circuit_breaker: Option<CircuitBreaker>,
total_checkouts: AtomicU64,
total_checkins: AtomicU64,
total_timeouts: AtomicU64,
total_health_check_failures: AtomicU64,
checkout_duration_sum_us: AtomicU64,
checkout_count_for_avg: AtomicU64,
last_scale_time: RwLock<Option<Instant>>,
effective_max_size: std::sync::Mutex<usize>,
}
impl ConnectionPoolInner {
fn return_connection(&self, meta: ConnectionMeta) {
self.total_checkins.fetch_add(1, Ordering::Relaxed);
if meta.health == ConnectionHealth::Unhealthy {
self.stats.write().total_closed += 1;
let mut active = self
.active_count
.lock()
.expect("active count lock poisoned");
*active = active.saturating_sub(1);
return;
}
if meta.is_idle_expired(self.config.idle_timeout)
|| meta.is_lifetime_expired(self.config.max_lifetime)
{
self.stats.write().total_closed += 1;
let mut active = self
.active_count
.lock()
.expect("active count lock poisoned");
*active = active.saturating_sub(1);
return;
}
self.idle_connections.write().push_back(meta);
let mut active = self
.active_count
.lock()
.expect("active count lock poisoned");
*active = active.saturating_sub(1);
}
fn get_stats(&self) -> PoolStats {
let mut stats = self.stats.read().clone();
let idle = self.idle_connections.read().len();
let active = *self
.active_count
.lock()
.expect("active count lock poisoned");
stats.total_connections = idle + active;
stats.active_connections = active;
stats.idle_connections = idle;
stats
}
fn utilization(&self) -> f64 {
let active = *self
.active_count
.lock()
.expect("active count lock poisoned");
let max_size = *self
.effective_max_size
.lock()
.expect("effective max size lock poisoned");
if max_size == 0 {
return 0.0;
}
active as f64 / max_size as f64
}
fn get_metrics(&self) -> PoolMetrics {
let idle = self.idle_connections.read().len();
let active = *self
.active_count
.lock()
.expect("active count lock poisoned");
let max_size = *self
.effective_max_size
.lock()
.expect("effective max size lock poisoned");
let total_checkouts = self.total_checkouts.load(Ordering::Relaxed);
let total_checkins = self.total_checkins.load(Ordering::Relaxed);
let total_timeouts = self.total_timeouts.load(Ordering::Relaxed);
let total_health_check_failures = self.total_health_check_failures.load(Ordering::Relaxed);
let checkout_count = self.checkout_count_for_avg.load(Ordering::Relaxed);
let checkout_sum = self.checkout_duration_sum_us.load(Ordering::Relaxed);
let avg_checkout_duration_us = checkout_sum.checked_div(checkout_count).unwrap_or(0);
let utilization = if max_size > 0 {
active as f64 / max_size as f64
} else {
0.0
};
PoolMetrics {
total_connections: idle + active,
active_connections: active,
idle_connections: idle,
total_checkouts,
total_checkins,
total_timeouts,
total_health_check_failures,
avg_checkout_duration_us,
utilization,
}
}
}
pub struct ConnectionPool {
inner: Arc<ConnectionPoolInner>,
shutdown_tx: tokio::sync::watch::Sender<bool>,
}
impl ConnectionPool {
pub fn new(config: PoolConfig) -> Self {
Self::with_health_and_adaptive(config, HealthCheckConfig::default(), None)
}
pub fn with_tls(config: PoolConfig, tls_config: ClientTlsConfig) -> Self {
Self::with_full_config(config, HealthCheckConfig::default(), None, Some(tls_config))
}
pub fn with_health_and_adaptive(
config: PoolConfig,
health_check_config: HealthCheckConfig,
adaptive_config: Option<AdaptiveConfig>,
) -> Self {
Self::with_full_config(config, health_check_config, adaptive_config, None)
}
pub fn with_full_config(
config: PoolConfig,
health_check_config: HealthCheckConfig,
adaptive_config: Option<AdaptiveConfig>,
tls_config: Option<ClientTlsConfig>,
) -> Self {
let load_balancer = LoadBalancer::new(config.balancing_strategy);
let circuit_breaker = if config.enable_circuit_breaker {
Some(CircuitBreaker::new())
} else {
None
};
let effective_max = config.max_size;
let inner = Arc::new(ConnectionPoolInner {
config: config.clone(),
health_check_config,
adaptive_config,
tls_config,
idle_connections: RwLock::new(VecDeque::new()),
active_count: std::sync::Mutex::new(0),
stats: RwLock::new(PoolStats::default()),
load_balancer,
circuit_breaker,
total_checkouts: AtomicU64::new(0),
total_checkins: AtomicU64::new(0),
total_timeouts: AtomicU64::new(0),
total_health_check_failures: AtomicU64::new(0),
checkout_duration_sum_us: AtomicU64::new(0),
checkout_count_for_avg: AtomicU64::new(0),
last_scale_time: RwLock::new(None),
effective_max_size: std::sync::Mutex::new(effective_max),
});
let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
let health_check_inner = Arc::clone(&inner);
tokio::spawn(async move {
Self::health_check_loop(health_check_inner, shutdown_rx).await;
});
Self { inner, shutdown_tx }
}
pub fn add_endpoint(&self, id: EndpointId, address: String) {
self.add_endpoint_with_weight(id, address, 1);
}
pub fn add_endpoint_with_weight(&self, id: EndpointId, address: String, weight: u32) {
let endpoint = crate::balancer::Endpoint::with_weight(id, address, weight);
self.inner.load_balancer.add_endpoint(endpoint);
}
pub fn remove_endpoint(&self, endpoint_id: &str) -> bool {
let removed = self.inner.load_balancer.remove_endpoint(endpoint_id);
if removed {
let mut idle = self.inner.idle_connections.write();
idle.retain(|conn| conn.endpoint_id != endpoint_id);
}
removed
}
pub async fn get_connection(&self) -> NetResult<PooledConnection> {
let start = Instant::now();
if let Some(ref cb) = self.inner.circuit_breaker {
cb.is_request_allowed()?;
}
let conn = self.try_get_healthy_idle_connection();
if let Some(meta) = conn {
self.record_checkout_duration(start);
return Ok(PooledConnection {
meta: Some(meta),
pool: Arc::clone(&self.inner),
});
}
let active = *self
.inner
.active_count
.lock()
.expect("active count lock poisoned");
let idle = self.inner.idle_connections.read().len();
let effective_max = *self
.inner
.effective_max_size
.lock()
.expect("effective max size lock poisoned");
if active + idle >= effective_max {
self.inner.stats.write().pool_exhausted_count += 1;
let timeout = Duration::from_secs(30);
let deadline = Instant::now() + timeout;
while Instant::now() < deadline {
let conn = self.try_get_healthy_idle_connection();
if let Some(meta) = conn {
let wait_time = start.elapsed().as_millis() as u64;
let mut stats = self.inner.stats.write();
stats.avg_wait_time_ms = (stats.avg_wait_time_ms + wait_time) / 2;
self.record_checkout_duration(start);
return Ok(PooledConnection {
meta: Some(meta),
pool: Arc::clone(&self.inner),
});
}
time::sleep(Duration::from_millis(100)).await;
}
self.inner.total_timeouts.fetch_add(1, Ordering::Relaxed);
return Err(NetError::ServerOverloaded(
"Connection pool exhausted".to_string(),
));
}
let meta = self.create_connection().await?;
*self
.inner
.active_count
.lock()
.expect("active count lock poisoned") += 1;
self.inner.total_checkouts.fetch_add(1, Ordering::Relaxed);
self.record_checkout_duration(start);
Ok(PooledConnection {
meta: Some(meta),
pool: Arc::clone(&self.inner),
})
}
fn try_get_healthy_idle_connection(&self) -> Option<ConnectionMeta> {
let mut idle = self.inner.idle_connections.write();
let mut attempts = idle.len();
while attempts > 0 {
if let Some(mut meta) = idle.pop_front() {
if meta.health == ConnectionHealth::Unhealthy {
self.inner.stats.write().total_closed += 1;
attempts -= 1;
continue;
}
meta.touch();
*self
.inner
.active_count
.lock()
.expect("active count lock poisoned") += 1;
self.inner.total_checkouts.fetch_add(1, Ordering::Relaxed);
return Some(meta);
}
break;
}
None
}
fn record_checkout_duration(&self, start: Instant) {
let duration_us = start.elapsed().as_micros() as u64;
self.inner
.checkout_duration_sum_us
.fetch_add(duration_us, Ordering::Relaxed);
self.inner
.checkout_count_for_avg
.fetch_add(1, Ordering::Relaxed);
}
async fn create_connection(&self) -> NetResult<ConnectionMeta> {
let endpoint = self.inner.load_balancer.select_endpoint()?;
let scheme = if self.inner.tls_config.is_some() {
"https"
} else {
"http"
};
let mut ep = Endpoint::from_shared(format!("{}://{}", scheme, endpoint.address))
.map_err(|e| NetError::InvalidRequest(format!("Invalid endpoint: {}", e)))?
.connect_timeout(self.inner.config.connect_timeout)
.timeout(Duration::from_secs(30));
if let Some(ref tls_cfg) = self.inner.tls_config {
ep = ep
.tls_config(tls_cfg.clone())
.map_err(|e| NetError::TlsError(format!("Failed to apply TLS config: {}", e)))?;
}
let channel = ep.connect().await.map_err(|e| {
self.inner.stats.write().failed_connections += 1;
if let Some(ref cb) = self.inner.circuit_breaker {
cb.record_failure();
}
NetError::ConnectionRefused(format!("Failed to connect: {}", e))
})?;
if let Some(ref cb) = self.inner.circuit_breaker {
cb.record_success();
}
self.inner.stats.write().total_created += 1;
Ok(ConnectionMeta::new(channel, endpoint.id.clone()))
}
async fn health_check_loop(
inner: Arc<ConnectionPoolInner>,
mut shutdown_rx: tokio::sync::watch::Receiver<bool>,
) {
let interval_duration = inner.health_check_config.interval;
let mut interval = time::interval(interval_duration);
loop {
tokio::select! {
_ = interval.tick() => {
Self::run_health_checks(&inner).await;
Self::evaluate_scaling(&inner);
}
_ = shutdown_rx.changed() => {
if *shutdown_rx.borrow() {
break;
}
}
}
}
}
fn check_connection_health(
meta: &mut ConnectionMeta,
config: &PoolConfig,
health_config: &HealthCheckConfig,
) -> ConnectionHealth {
if meta.is_idle_expired(config.idle_timeout)
|| meta.is_lifetime_expired(config.max_lifetime)
{
meta.health = ConnectionHealth::Unhealthy;
return ConnectionHealth::Unhealthy;
}
if let Some(last_check) = meta.last_health_check {
if last_check.elapsed() < health_config.interval {
return meta.health;
}
}
if meta.last_used.elapsed() < health_config.timeout {
meta.record_health_success();
return ConnectionHealth::Healthy;
}
meta.record_health_success();
ConnectionHealth::Healthy
}
async fn run_health_checks(inner: &Arc<ConnectionPoolInner>) {
let mut removed_count: u64 = 0;
{
let mut idle = inner.idle_connections.write();
let config = &inner.config;
let health_config = &inner.health_check_config;
idle.retain_mut(|conn| {
let health = Self::check_connection_health(conn, config, health_config);
match health {
ConnectionHealth::Unhealthy => {
removed_count += 1;
false
}
_ => {
if conn.is_idle_expired(config.idle_timeout)
|| conn.is_lifetime_expired(config.max_lifetime)
{
removed_count += 1;
false
} else {
true
}
}
}
});
}
if removed_count > 0 {
inner.stats.write().total_closed += removed_count;
inner
.total_health_check_failures
.fetch_add(removed_count, Ordering::Relaxed);
}
}
fn mark_connection_unhealthy(meta: &mut ConnectionMeta, threshold: u32) {
meta.record_health_failure(threshold);
}
fn evaluate_scaling(inner: &Arc<ConnectionPoolInner>) {
let adaptive = match &inner.adaptive_config {
Some(cfg) => cfg.clone(),
None => return,
};
{
let last_scale = inner.last_scale_time.read();
if let Some(t) = *last_scale {
if t.elapsed() < adaptive.cooldown {
return;
}
}
}
let utilization = inner.utilization();
let current_max = *inner
.effective_max_size
.lock()
.expect("effective max size lock poisoned");
if utilization >= adaptive.scale_up_threshold {
let new_max = (current_max + adaptive.scale_step).min(adaptive.max_size);
if new_max != current_max {
*inner
.effective_max_size
.lock()
.expect("effective max size lock poisoned") = new_max;
*inner.last_scale_time.write() = Some(Instant::now());
tracing::info!(
old_max = current_max,
new_max = new_max,
utilization = utilization,
"Pool scaled up"
);
}
} else if utilization <= adaptive.scale_down_threshold {
let new_max = current_max
.saturating_sub(adaptive.scale_step)
.max(adaptive.min_size);
if new_max != current_max {
*inner
.effective_max_size
.lock()
.expect("effective max size lock poisoned") = new_max;
*inner.last_scale_time.write() = Some(Instant::now());
tracing::info!(
old_max = current_max,
new_max = new_max,
utilization = utilization,
"Pool scaled down"
);
let mut idle = inner.idle_connections.write();
let active = *inner
.active_count
.lock()
.expect("active count lock poisoned");
while idle.len() + active > new_max {
if idle.pop_back().is_some() {
} else {
break;
}
}
}
}
}
pub fn utilization(&self) -> f64 {
self.inner.utilization()
}
pub fn scale_up(&self, count: usize) {
let adaptive = self.inner.adaptive_config.as_ref();
let ceiling = adaptive.map_or(self.inner.config.max_size, |a| a.max_size);
let mut max_size = self
.inner
.effective_max_size
.lock()
.expect("effective max size lock poisoned");
let new_max = (*max_size + count).min(ceiling);
*max_size = new_max;
*self.inner.last_scale_time.write() = Some(Instant::now());
}
pub fn scale_down(&self, count: usize) {
let adaptive = self.inner.adaptive_config.as_ref();
let floor = adaptive.map_or(self.inner.config.min_size, |a| a.min_size);
let mut max_size = self
.inner
.effective_max_size
.lock()
.expect("effective max size lock poisoned");
let new_max = max_size.saturating_sub(count).max(floor);
*max_size = new_max;
*self.inner.last_scale_time.write() = Some(Instant::now());
let mut idle = self.inner.idle_connections.write();
let active = *self
.inner
.active_count
.lock()
.expect("active count lock poisoned");
while idle.len() + active > new_max {
if idle.pop_back().is_none() {
break;
}
}
}
pub fn stats(&self) -> PoolStats {
self.inner.get_stats()
}
pub fn metrics(&self) -> PoolMetrics {
self.inner.get_metrics()
}
pub fn effective_max_size(&self) -> usize {
*self
.inner
.effective_max_size
.lock()
.expect("effective max size lock poisoned")
}
pub fn circuit_breaker_stats(&self) -> Option<crate::circuit_breaker::CircuitBreakerStats> {
self.inner.circuit_breaker.as_ref().map(|cb| cb.stats())
}
pub async fn shutdown(self) -> NetResult<()> {
self.shutdown_tx
.send(true)
.map_err(|_| NetError::ServerInternal("Failed to signal shutdown".to_string()))?;
time::sleep(Duration::from_millis(500)).await;
let mut idle = self.inner.idle_connections.write();
let count = idle.len();
idle.clear();
self.inner.stats.write().total_closed += count as u64;
Ok(())
}
pub async fn drain(&self) -> NetResult<()> {
let timeout = Duration::from_secs(30);
let deadline = Instant::now() + timeout;
while Instant::now() < deadline {
let active = *self
.inner
.active_count
.lock()
.expect("active count lock poisoned");
if active == 0 {
break;
}
time::sleep(Duration::from_millis(100)).await;
}
let active = *self
.inner
.active_count
.lock()
.expect("active count lock poisoned");
if active > 0 {
return Err(NetError::Timeout(format!(
"Drain timeout: {} active connections remaining",
active
)));
}
Ok(())
}
}
pub struct ConnectionPoolBuilder {
config: PoolConfig,
health_check_config: HealthCheckConfig,
adaptive_config: Option<AdaptiveConfig>,
tls_config: Option<ClientTlsConfig>,
endpoints: Vec<(EndpointId, String, u32)>,
}
impl ConnectionPoolBuilder {
pub fn new() -> Self {
Self {
config: PoolConfig::default(),
health_check_config: HealthCheckConfig::default(),
adaptive_config: None,
tls_config: None,
endpoints: Vec::new(),
}
}
pub fn tls_config(mut self, tls_config: ClientTlsConfig) -> Self {
self.tls_config = Some(tls_config);
self
}
pub fn min_size(mut self, size: usize) -> Self {
self.config.min_size = size;
self
}
pub fn max_size(mut self, size: usize) -> Self {
self.config.max_size = size;
self
}
pub fn idle_timeout(mut self, timeout: Duration) -> Self {
self.config.idle_timeout = timeout;
self
}
pub fn max_lifetime(mut self, lifetime: Duration) -> Self {
self.config.max_lifetime = lifetime;
self
}
pub fn connect_timeout(mut self, timeout: Duration) -> Self {
self.config.connect_timeout = timeout;
self
}
pub fn health_check_interval(mut self, interval: Duration) -> Self {
self.config.health_check_interval = interval;
self
}
pub fn balancing_strategy(mut self, strategy: BalancingStrategy) -> Self {
self.config.balancing_strategy = strategy;
self
}
pub fn circuit_breaker(mut self, enabled: bool) -> Self {
self.config.enable_circuit_breaker = enabled;
self
}
pub fn add_endpoint(mut self, id: EndpointId, address: String) -> Self {
self.endpoints.push((id, address, 1));
self
}
pub fn add_endpoint_with_weight(
mut self,
id: EndpointId,
address: String,
weight: u32,
) -> Self {
self.endpoints.push((id, address, weight));
self
}
pub fn health_check_config(mut self, config: HealthCheckConfig) -> Self {
self.health_check_config = config;
self
}
pub fn health_check_timeout(mut self, timeout: Duration) -> Self {
self.health_check_config.timeout = timeout;
self
}
pub fn unhealthy_threshold(mut self, threshold: u32) -> Self {
self.health_check_config.unhealthy_threshold = threshold;
self
}
pub fn adaptive(mut self, config: AdaptiveConfig) -> Self {
self.adaptive_config = Some(config);
self
}
pub fn adaptive_default(mut self) -> Self {
self.adaptive_config = Some(AdaptiveConfig::default());
self
}
pub fn build(self) -> ConnectionPool {
let pool = ConnectionPool::with_full_config(
self.config,
self.health_check_config,
self.adaptive_config,
self.tls_config,
);
for (id, address, weight) in self.endpoints {
pool.add_endpoint_with_weight(id, address, weight);
}
pool
}
}
impl Default for ConnectionPoolBuilder {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pool_config_default() {
let config = PoolConfig::default();
assert_eq!(config.min_size, 2);
assert_eq!(config.max_size, 10);
assert!(config.enable_circuit_breaker);
}
#[test]
fn test_health_check_config_defaults() {
let config = HealthCheckConfig::default();
assert_eq!(config.interval, Duration::from_secs(30));
assert_eq!(config.timeout, Duration::from_secs(5));
assert_eq!(config.unhealthy_threshold, 3);
}
#[test]
fn test_adaptive_config_defaults() {
let config = AdaptiveConfig::default();
assert_eq!(config.min_size, 2);
assert_eq!(config.max_size, 20);
assert!((config.scale_up_threshold - 0.8).abs() < f64::EPSILON);
assert!((config.scale_down_threshold - 0.2).abs() < f64::EPSILON);
assert_eq!(config.scale_step, 2);
assert_eq!(config.cooldown, Duration::from_secs(60));
}
#[test]
fn test_connection_health_status() {
assert_eq!(ConnectionHealth::Healthy, ConnectionHealth::Healthy);
assert_ne!(ConnectionHealth::Healthy, ConnectionHealth::Degraded);
assert_ne!(ConnectionHealth::Degraded, ConnectionHealth::Unhealthy);
}
#[tokio::test]
async fn test_connection_meta_expiry() {
let endpoint = Endpoint::from_static("http://localhost:50051");
if let Ok(channel) = endpoint.connect().await {
let meta = ConnectionMeta::new(channel, "ep1".to_string());
assert!(!meta.is_idle_expired(Duration::from_secs(10)));
assert!(!meta.is_lifetime_expired(Duration::from_secs(10)));
assert_eq!(meta.health, ConnectionHealth::Healthy);
assert_eq!(meta.health_check_failures, 0);
}
}
#[test]
fn test_health_check_healthy_connection() {
let endpoint = Endpoint::from_static("http://localhost:50051");
let config = PoolConfig::default();
let health_config = HealthCheckConfig::default();
assert_eq!(health_config.unhealthy_threshold, 3);
assert_eq!(health_config.timeout, Duration::from_secs(5));
assert_eq!(ConnectionHealth::Healthy, ConnectionHealth::Healthy);
}
#[tokio::test]
async fn test_health_check_removes_unhealthy() {
let pool = ConnectionPoolBuilder::new()
.min_size(0)
.max_size(10)
.circuit_breaker(false)
.add_endpoint("ep1".to_string(), "localhost:50051".to_string())
.build();
let idle_count = pool.inner.idle_connections.read().len();
assert_eq!(idle_count, 0);
let metrics = pool.metrics();
assert_eq!(metrics.total_health_check_failures, 0);
}
#[tokio::test]
async fn test_adaptive_scale_up() {
let adaptive = AdaptiveConfig {
min_size: 2,
max_size: 20,
scale_up_threshold: 0.8,
scale_down_threshold: 0.2,
scale_step: 2,
cooldown: Duration::from_millis(10), };
let pool = ConnectionPoolBuilder::new()
.min_size(2)
.max_size(5)
.circuit_breaker(false)
.adaptive(adaptive)
.add_endpoint("ep1".to_string(), "localhost:50051".to_string())
.build();
assert_eq!(pool.effective_max_size(), 5);
pool.scale_up(3);
assert_eq!(pool.effective_max_size(), 8);
pool.scale_up(100);
assert_eq!(pool.effective_max_size(), 20);
}
#[tokio::test]
async fn test_adaptive_scale_down() {
let adaptive = AdaptiveConfig {
min_size: 2,
max_size: 20,
scale_up_threshold: 0.8,
scale_down_threshold: 0.2,
scale_step: 2,
cooldown: Duration::from_millis(10),
};
let pool = ConnectionPoolBuilder::new()
.min_size(2)
.max_size(10)
.circuit_breaker(false)
.adaptive(adaptive)
.add_endpoint("ep1".to_string(), "localhost:50051".to_string())
.build();
assert_eq!(pool.effective_max_size(), 10);
pool.scale_down(3);
assert_eq!(pool.effective_max_size(), 7);
pool.scale_down(100);
assert_eq!(pool.effective_max_size(), 2);
}
#[tokio::test]
async fn test_pool_metrics_tracking() {
let pool = ConnectionPoolBuilder::new()
.min_size(0)
.max_size(10)
.circuit_breaker(false)
.add_endpoint("ep1".to_string(), "localhost:50051".to_string())
.build();
let metrics = pool.metrics();
assert_eq!(metrics.total_connections, 0);
assert_eq!(metrics.active_connections, 0);
assert_eq!(metrics.idle_connections, 0);
assert_eq!(metrics.total_checkouts, 0);
assert_eq!(metrics.total_checkins, 0);
assert_eq!(metrics.total_timeouts, 0);
assert_eq!(metrics.total_health_check_failures, 0);
assert_eq!(metrics.avg_checkout_duration_us, 0);
assert!((metrics.utilization - 0.0).abs() < f64::EPSILON);
}
#[tokio::test]
async fn test_pool_utilization_calculation() {
let pool = ConnectionPoolBuilder::new()
.min_size(0)
.max_size(10)
.circuit_breaker(false)
.add_endpoint("ep1".to_string(), "localhost:50051".to_string())
.build();
let util = pool.utilization();
assert!((util - 0.0).abs() < f64::EPSILON);
{
let mut active = pool
.inner
.active_count
.lock()
.expect("active count lock poisoned");
*active = 5;
}
let util = pool.utilization();
assert!((util - 0.5).abs() < f64::EPSILON);
{
let mut active = pool
.inner
.active_count
.lock()
.expect("active count lock poisoned");
*active = 0;
}
}
#[tokio::test]
async fn test_adaptive_cooldown() {
let adaptive = AdaptiveConfig {
min_size: 2,
max_size: 20,
scale_up_threshold: 0.8,
scale_down_threshold: 0.2,
scale_step: 2,
cooldown: Duration::from_secs(60), };
let pool = ConnectionPoolBuilder::new()
.min_size(2)
.max_size(10)
.circuit_breaker(false)
.adaptive(adaptive)
.add_endpoint("ep1".to_string(), "localhost:50051".to_string())
.build();
pool.scale_up(2);
let first_max = pool.effective_max_size();
assert_eq!(first_max, 12);
{
let mut active = pool
.inner
.active_count
.lock()
.expect("active count lock poisoned");
*active = 11; }
ConnectionPool::evaluate_scaling(&pool.inner);
assert_eq!(pool.effective_max_size(), 12);
{
let mut active = pool
.inner
.active_count
.lock()
.expect("active count lock poisoned");
*active = 0;
}
}
#[tokio::test]
async fn test_pool_builder() {
let pool = ConnectionPoolBuilder::new()
.min_size(5)
.max_size(20)
.idle_timeout(Duration::from_secs(600))
.balancing_strategy(BalancingStrategy::RoundRobin)
.health_check_timeout(Duration::from_secs(10))
.unhealthy_threshold(5)
.adaptive_default()
.add_endpoint("ep1".to_string(), "localhost:50051".to_string())
.add_endpoint("ep2".to_string(), "localhost:50052".to_string())
.build();
let stats = pool.stats();
assert_eq!(stats.active_connections, 0);
assert_eq!(stats.idle_connections, 0);
assert_eq!(pool.effective_max_size(), 20);
}
#[tokio::test]
async fn test_pool_add_remove_endpoint() {
let pool = ConnectionPool::new(PoolConfig::default());
pool.add_endpoint("ep1".to_string(), "localhost:50051".to_string());
pool.add_endpoint("ep2".to_string(), "localhost:50052".to_string());
assert!(pool.remove_endpoint("ep1"));
assert!(!pool.remove_endpoint("ep3"));
}
#[tokio::test]
async fn test_pool_stats() {
let pool = ConnectionPool::new(PoolConfig::default());
pool.add_endpoint("ep1".to_string(), "localhost:50051".to_string());
let stats = pool.stats();
assert_eq!(stats.total_connections, 0);
assert_eq!(stats.active_connections, 0);
assert_eq!(stats.idle_connections, 0);
}
#[tokio::test]
async fn test_pool_shutdown() {
let pool = ConnectionPool::new(PoolConfig::default());
pool.add_endpoint("ep1".to_string(), "localhost:50051".to_string());
let result = pool.shutdown().await;
assert!(result.is_ok());
}
#[test]
fn test_pool_metrics_default() {
let metrics = PoolMetrics::default();
assert_eq!(metrics.total_connections, 0);
assert_eq!(metrics.total_checkouts, 0);
assert_eq!(metrics.total_checkins, 0);
assert_eq!(metrics.total_timeouts, 0);
assert_eq!(metrics.total_health_check_failures, 0);
}
#[test]
fn test_connection_meta_health_recording() {
let threshold: u32 = 3;
let mut failure_count: u32 = 0;
let mut health = ConnectionHealth::Healthy;
for _ in 0..threshold {
failure_count += 1;
if failure_count >= threshold {
health = ConnectionHealth::Unhealthy;
} else if failure_count >= threshold.saturating_sub(1).max(1) {
health = ConnectionHealth::Degraded;
}
}
assert_eq!(health, ConnectionHealth::Unhealthy);
assert_eq!(failure_count, 3);
}
#[tokio::test]
async fn test_scale_respects_adaptive_bounds() {
let adaptive = AdaptiveConfig {
min_size: 5,
max_size: 15,
scale_up_threshold: 0.8,
scale_down_threshold: 0.2,
scale_step: 2,
cooldown: Duration::from_millis(10),
};
let pool = ConnectionPoolBuilder::new()
.min_size(5)
.max_size(10)
.circuit_breaker(false)
.adaptive(adaptive)
.add_endpoint("ep1".to_string(), "localhost:50051".to_string())
.build();
pool.scale_up(10);
assert_eq!(pool.effective_max_size(), 15);
pool.scale_down(20);
assert_eq!(pool.effective_max_size(), 5); }
#[tokio::test]
async fn test_metrics_after_operations() {
let pool = ConnectionPoolBuilder::new()
.min_size(0)
.max_size(10)
.circuit_breaker(false)
.add_endpoint("ep1".to_string(), "localhost:50051".to_string())
.build();
pool.inner.total_checkouts.fetch_add(5, Ordering::Relaxed);
pool.inner.total_checkins.fetch_add(3, Ordering::Relaxed);
pool.inner.total_timeouts.fetch_add(1, Ordering::Relaxed);
pool.inner
.total_health_check_failures
.fetch_add(2, Ordering::Relaxed);
pool.inner
.checkout_duration_sum_us
.fetch_add(5000, Ordering::Relaxed);
pool.inner
.checkout_count_for_avg
.fetch_add(5, Ordering::Relaxed);
let metrics = pool.metrics();
assert_eq!(metrics.total_checkouts, 5);
assert_eq!(metrics.total_checkins, 3);
assert_eq!(metrics.total_timeouts, 1);
assert_eq!(metrics.total_health_check_failures, 2);
assert_eq!(metrics.avg_checkout_duration_us, 1000); }
}