use crate::error::{FusekiError, FusekiResult};
use crate::pool::adaptive_pool::{AdaptivePool, PoolConfig, PooledConnection};
use crate::pool::backpressure::{
BackpressureConfig, BackpressureController, BackpressureDecision, BackpressureStats,
};
#[derive(Debug, Clone)]
pub struct PoolStats {
pub active: u32,
pub idle: u32,
pub total: u32,
}
pub struct ConnectionPool<C: Send + 'static> {
inner: AdaptivePool<C>,
backpressure: BackpressureController,
backend_name: String,
}
impl<C: Send + 'static> ConnectionPool<C> {
pub fn new(backend: &str, pool: AdaptivePool<C>, bp_config: BackpressureConfig) -> Self {
ConnectionPool {
inner: pool,
backpressure: BackpressureController::new(bp_config),
backend_name: backend.to_string(),
}
}
pub fn acquire(&self) -> FusekiResult<PooledConnection<C>> {
let stats = self.inner.stats();
let utilization = stats.utilization;
match self.backpressure.should_accept_request(utilization) {
BackpressureDecision::Accept | BackpressureDecision::Queue => {
let conn = self.inner.acquire();
if utilization >= self.backpressure_config_queue_threshold() {
self.backpressure.record_dequeue();
}
conn
}
BackpressureDecision::Reject { retry_after_ms } => {
Err(FusekiError::ServiceUnavailable {
message: format!(
"Backend '{}' is overloaded; retry after {}ms",
self.backend_name, retry_after_ms
),
})
}
}
}
pub fn pool_stats(&self) -> PoolStats {
let s = self.inner.stats();
PoolStats {
active: s.active_connections as u32,
idle: s.idle_connections as u32,
total: s.total_connections as u32,
}
}
pub fn backpressure_stats(&self) -> BackpressureStats {
self.backpressure.stats()
}
pub fn backend_name(&self) -> &str {
&self.backend_name
}
fn backpressure_config_queue_threshold(&self) -> f64 {
let depth = self.backpressure.current_queue_depth();
if depth > 0 {
0.70
} else {
1.1
} }
}
#[cfg(test)]
mod tests {
use super::*;
use crate::pool::adaptive_pool::PoolConfig;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
struct DummyConn {
id: usize,
}
fn make_pool(min: usize, max: usize) -> FusekiResult<AdaptivePool<DummyConn>> {
let counter = Arc::new(AtomicUsize::new(0));
AdaptivePool::new(
PoolConfig {
min_connections: min,
max_connections: max,
acquire_timeout: Duration::from_millis(200),
idle_timeout: Duration::from_secs(300),
max_lifetime: Duration::from_secs(3600),
target_utilization: 0.7,
resize_interval: Duration::from_secs(60),
},
move || {
Ok(DummyConn {
id: counter.fetch_add(1, Ordering::Relaxed),
})
},
)
}
fn make_accepting_cp(min: usize, max: usize) -> ConnectionPool<DummyConn> {
let pool = make_pool(min, max).unwrap();
let bp = BackpressureConfig {
queue_threshold: 1.1,
reject_threshold: 1.1,
max_queue_depth: 1000,
base_retry_after_ms: 100,
};
ConnectionPool::new("test_backend", pool, bp)
}
fn make_rejecting_cp(min: usize, max: usize) -> ConnectionPool<DummyConn> {
let pool = make_pool(min, max).unwrap();
let bp = BackpressureConfig {
queue_threshold: 0.0,
reject_threshold: 0.0,
max_queue_depth: 1000,
base_retry_after_ms: 50,
};
ConnectionPool::new("overloaded_backend", pool, bp)
}
#[test]
fn test_connection_pool_acquire_ok() {
let cp = make_accepting_cp(2, 10);
let conn = cp.acquire();
assert!(conn.is_ok(), "Should acquire successfully");
}
#[test]
fn test_connection_pool_backend_name() {
let cp = make_accepting_cp(1, 5);
assert_eq!(cp.backend_name(), "test_backend");
}
#[test]
fn test_connection_pool_pool_stats_initial() {
let cp = make_accepting_cp(2, 10);
let stats = cp.pool_stats();
assert_eq!(stats.total, 2);
assert_eq!(stats.active, 0);
assert_eq!(stats.idle, 2);
}
#[test]
fn test_connection_pool_pool_stats_active_increments() {
let cp = make_accepting_cp(2, 10);
let _conn = cp.acquire().unwrap();
let stats = cp.pool_stats();
assert_eq!(stats.active, 1);
}
#[test]
fn test_connection_pool_pool_stats_active_decrements_on_drop() {
let cp = make_accepting_cp(2, 10);
{
let _conn = cp.acquire().unwrap();
assert_eq!(cp.pool_stats().active, 1);
}
assert_eq!(cp.pool_stats().active, 0);
}
#[test]
fn test_connection_pool_rejected_when_overloaded() {
let cp = make_rejecting_cp(2, 10);
let result = cp.acquire();
assert!(result.is_err(), "Should be rejected due to backpressure");
match result {
Err(FusekiError::ServiceUnavailable { .. }) => {}
Err(other) => panic!("Expected ServiceUnavailable, got error: {}", other),
Ok(_) => panic!("Expected error but got Ok"),
}
}
#[test]
fn test_connection_pool_error_message_contains_backend_name() {
let cp = make_rejecting_cp(2, 10);
match cp.acquire() {
Err(FusekiError::ServiceUnavailable { message }) => {
assert!(
message.contains("overloaded_backend"),
"Error message should mention backend name"
);
}
Err(other) => panic!("Expected ServiceUnavailable, got error: {}", other),
Ok(_) => panic!("Expected error but got Ok"),
}
}
#[test]
fn test_backpressure_accept_at_low_util() {
let ctrl = BackpressureController::new(BackpressureConfig::default());
assert_eq!(
ctrl.should_accept_request(0.0),
BackpressureDecision::Accept
);
}
#[test]
fn test_backpressure_queue_at_medium_util() {
let ctrl = BackpressureController::new(BackpressureConfig::default());
assert_eq!(
ctrl.should_accept_request(0.75),
BackpressureDecision::Queue
);
}
#[test]
fn test_backpressure_reject_at_high_util() {
let ctrl = BackpressureController::new(BackpressureConfig::default());
match ctrl.should_accept_request(0.95) {
BackpressureDecision::Reject { .. } => {}
other => panic!("Expected Reject, got {:?}", other),
}
}
#[test]
fn test_backpressure_stats_after_decisions() {
let ctrl = BackpressureController::new(BackpressureConfig::default());
ctrl.should_accept_request(0.0); ctrl.should_accept_request(0.75); ctrl.should_accept_request(0.95);
let stats = ctrl.stats();
assert_eq!(stats.total_accepted, 1);
assert_eq!(stats.total_queued, 1);
assert_eq!(stats.total_rejected, 1);
}
#[test]
fn test_connection_pool_backpressure_stats_accessible() {
let cp = make_accepting_cp(2, 10);
let _conn = cp.acquire().unwrap();
let bp_stats = cp.backpressure_stats();
assert_eq!(bp_stats.total_rejected, 0);
assert!(bp_stats.total_accepted + bp_stats.total_queued >= 1);
}
#[test]
fn test_pool_stats_idle_plus_active_equals_total() {
let cp = make_accepting_cp(3, 10);
let stats = cp.pool_stats();
assert_eq!(stats.idle + stats.active, stats.total);
}
#[test]
fn test_pool_stats_after_multiple_acquires() {
let cp = make_accepting_cp(4, 10);
let _c1 = cp.acquire().unwrap();
let _c2 = cp.acquire().unwrap();
let stats = cp.pool_stats();
assert_eq!(stats.active, 2);
}
#[test]
fn test_multiple_pools_independent() {
let cp1 = make_accepting_cp(1, 5);
let cp2 = make_accepting_cp(1, 5);
let _c1 = cp1.acquire().unwrap();
let stats2 = cp2.pool_stats();
assert_eq!(stats2.active, 0, "cp2 should have no active connections");
}
}