#![allow(dead_code)]
use parking_lot::Mutex;
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::{Duration, Instant};
use crate::error::{NetError, NetResult};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct HostKey {
pub scheme: String,
pub host: String,
pub port: u16,
}
impl HostKey {
pub fn from_url(url: &str) -> NetResult<Self> {
let (scheme, rest) = url
.split_once("://")
.ok_or_else(|| NetError::invalid_url(format!("missing scheme in URL: {url}")))?;
let authority = rest.split('/').next().unwrap_or(rest);
let (host_part, port) = if let Some((h, p)) = authority.rsplit_once(':') {
let port: u16 = p
.parse()
.map_err(|_| NetError::invalid_url(format!("invalid port in URL: {url}")))?;
(h.to_owned(), port)
} else {
let default_port = match scheme {
"https" => 443,
"http" => 80,
_ => {
return Err(NetError::invalid_url(format!(
"unknown scheme '{}' in URL: {}",
scheme, url
)));
}
};
(authority.to_owned(), default_port)
};
Ok(Self {
scheme: scheme.to_ascii_lowercase(),
host: host_part.to_ascii_lowercase(),
port,
})
}
#[must_use]
pub fn display(&self) -> String {
format!("{}://{}:{}", self.scheme, self.host, self.port)
}
}
#[derive(Debug, Clone)]
pub struct ConnDescriptor {
pub id: u64,
pub key: HostKey,
pub request_count: u32,
pub max_requests: u32,
pub created_at: Instant,
pub last_used: Instant,
pub peer_close_requested: bool,
}
impl ConnDescriptor {
#[must_use]
pub fn new(id: u64, key: HostKey, max_requests: u32) -> Self {
let now = Instant::now();
Self {
id,
key,
request_count: 0,
max_requests,
created_at: now,
last_used: now,
peer_close_requested: false,
}
}
#[must_use]
pub fn is_reusable(&self, idle_timeout: Duration) -> bool {
if self.peer_close_requested {
return false;
}
if self.request_count >= self.max_requests {
return false;
}
self.last_used.elapsed() < idle_timeout
}
pub fn record_request(&mut self) {
self.request_count = self.request_count.saturating_add(1);
self.last_used = Instant::now();
}
}
#[derive(Debug, Clone)]
pub struct KeepalivePoolConfig {
pub max_idle_per_host: usize,
pub max_total_connections: usize,
pub idle_timeout: Duration,
pub max_requests_per_conn: u32,
pub prefer_reuse: bool,
}
impl Default for KeepalivePoolConfig {
fn default() -> Self {
Self {
max_idle_per_host: 8,
max_total_connections: 64,
idle_timeout: Duration::from_secs(90),
max_requests_per_conn: 1000,
prefer_reuse: true,
}
}
}
impl KeepalivePoolConfig {
#[must_use]
pub fn conservative() -> Self {
Self {
max_idle_per_host: 2,
max_total_connections: 16,
idle_timeout: Duration::from_secs(30),
max_requests_per_conn: 200,
prefer_reuse: true,
}
}
#[must_use]
pub fn high_throughput() -> Self {
Self {
max_idle_per_host: 32,
max_total_connections: 256,
idle_timeout: Duration::from_secs(120),
max_requests_per_conn: 5000,
prefer_reuse: true,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct PoolStats {
pub connections_created: u64,
pub connections_reused: u64,
pub connections_discarded: u64,
pub idle_count: usize,
pub active_count: usize,
}
impl PoolStats {
#[must_use]
pub fn reuse_ratio(&self) -> f64 {
let total = self.connections_created + self.connections_reused;
if total == 0 {
return 0.0;
}
self.connections_reused as f64 / total as f64
}
}
struct PoolInner {
idle: HashMap<HostKey, VecDeque<ConnDescriptor>>,
active_count: usize,
stats: PoolStats,
next_id: u64,
config: KeepalivePoolConfig,
}
impl PoolInner {
fn new(config: KeepalivePoolConfig) -> Self {
Self {
idle: HashMap::new(),
active_count: 0,
stats: PoolStats::default(),
next_id: 1,
config,
}
}
fn total_idle(&self) -> usize {
self.idle.values().map(VecDeque::len).sum()
}
fn acquire_idle(&mut self, key: &HostKey) -> Option<ConnDescriptor> {
let idle_timeout = self.config.idle_timeout;
let queue = self.idle.get_mut(key)?;
while let Some(front) = queue.front() {
if front.is_reusable(idle_timeout) {
break;
}
queue.pop_front();
self.stats.connections_discarded += 1;
}
if let Some(mut desc) = queue.pop_front() {
desc.record_request();
self.active_count += 1;
self.stats.connections_reused += 1;
self.stats.idle_count = self.total_idle();
self.stats.active_count = self.active_count;
Some(desc)
} else {
None
}
}
fn create_new(&mut self, key: HostKey) -> NetResult<ConnDescriptor> {
let total = self.total_idle() + self.active_count;
if total >= self.config.max_total_connections {
return Err(NetError::buffer(format!(
"keep-alive pool exhausted: {} connections (max {})",
total, self.config.max_total_connections
)));
}
let id = self.next_id;
self.next_id = self.next_id.saturating_add(1);
let mut desc = ConnDescriptor::new(id, key, self.config.max_requests_per_conn);
desc.record_request();
self.active_count += 1;
self.stats.connections_created += 1;
self.stats.active_count = self.active_count;
Ok(desc)
}
fn release(&mut self, mut desc: ConnDescriptor) {
self.active_count = self.active_count.saturating_sub(1);
if !desc.is_reusable(self.config.idle_timeout) {
self.stats.connections_discarded += 1;
self.stats.active_count = self.active_count;
self.stats.idle_count = self.total_idle();
return;
}
desc.last_used = Instant::now();
let key = desc.key.clone();
let queue = self.idle.entry(key).or_default();
if queue.len() < self.config.max_idle_per_host {
queue.push_back(desc);
self.stats.idle_count = self.total_idle();
} else {
self.stats.connections_discarded += 1;
}
self.stats.active_count = self.active_count;
}
fn evict_host(&mut self, key: &HostKey) {
if let Some(queue) = self.idle.remove(key) {
self.stats.connections_discarded += queue.len() as u64;
}
self.stats.idle_count = self.total_idle();
}
fn purge_stale(&mut self) {
let timeout = self.config.idle_timeout;
let mut discarded: u64 = 0;
for queue in self.idle.values_mut() {
let before = queue.len();
queue.retain(|d| d.last_used.elapsed() < timeout);
discarded += (before - queue.len()) as u64;
}
self.stats.connections_discarded += discarded;
self.stats.idle_count = self.total_idle();
}
}
#[derive(Clone)]
pub struct CdnKeepalivePool {
inner: Arc<Mutex<PoolInner>>,
}
impl CdnKeepalivePool {
#[must_use]
pub fn new(config: KeepalivePoolConfig) -> Self {
Self {
inner: Arc::new(Mutex::new(PoolInner::new(config))),
}
}
#[must_use]
pub fn default_pool() -> Self {
Self::new(KeepalivePoolConfig::default())
}
pub fn acquire(&self, key: HostKey) -> NetResult<ConnDescriptor> {
let mut inner = self.inner.lock();
if inner.config.prefer_reuse {
if let Some(desc) = inner.acquire_idle(&key) {
return Ok(desc);
}
}
inner.create_new(key)
}
pub fn release(&self, desc: ConnDescriptor) {
self.inner.lock().release(desc);
}
pub fn evict_host(&self, key: &HostKey) {
self.inner.lock().evict_host(key);
}
pub fn purge_stale(&self) {
self.inner.lock().purge_stale();
}
#[must_use]
pub fn stats(&self) -> PoolStats {
self.inner.lock().stats.clone()
}
#[must_use]
pub fn idle_count(&self) -> usize {
self.inner.lock().total_idle()
}
#[must_use]
pub fn active_count(&self) -> usize {
self.inner.lock().active_count
}
}
impl std::fmt::Debug for CdnKeepalivePool {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let inner = self.inner.lock();
f.debug_struct("CdnKeepalivePool")
.field("idle", &inner.total_idle())
.field("active", &inner.active_count)
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_pool() -> CdnKeepalivePool {
CdnKeepalivePool::new(KeepalivePoolConfig {
max_idle_per_host: 4,
max_total_connections: 10,
idle_timeout: Duration::from_secs(60),
max_requests_per_conn: 100,
prefer_reuse: true,
})
}
#[test]
fn test_host_key_from_url_https() {
let key = HostKey::from_url("https://cdn.example.com/path/seg.ts")
.expect("valid HTTPS URL literal");
assert_eq!(key.scheme, "https");
assert_eq!(key.host, "cdn.example.com");
assert_eq!(key.port, 443);
}
#[test]
fn test_host_key_from_url_http_custom_port() {
let key = HostKey::from_url("http://origin.example.com:8080/live/")
.expect("valid HTTP URL with custom port");
assert_eq!(key.scheme, "http");
assert_eq!(key.port, 8080);
}
#[test]
fn test_host_key_from_url_invalid_scheme() {
let err = HostKey::from_url("ftp://files.example.com/data")
.expect_err("ftp scheme is not supported");
assert!(matches!(err, NetError::InvalidUrl(_)));
}
#[test]
fn test_host_key_from_url_missing_scheme() {
let err = HostKey::from_url("no-scheme-here")
.expect_err("URL with no scheme must fail");
assert!(matches!(err, NetError::InvalidUrl(_)));
}
#[test]
fn test_acquire_creates_new_descriptor() {
let pool = make_pool();
let key = HostKey::from_url("https://cdn1.example.com/seg.ts")
.expect("valid URL literal");
let desc = pool.acquire(key).expect("pool is not exhausted");
assert_eq!(desc.request_count, 1);
assert_eq!(pool.active_count(), 1);
assert_eq!(pool.idle_count(), 0);
}
#[test]
fn test_release_and_reuse() {
let pool = make_pool();
let key = HostKey::from_url("https://cdn1.example.com/seg.ts")
.expect("valid URL literal");
let desc = pool.acquire(key.clone()).expect("pool is not exhausted");
let id = desc.id;
pool.release(desc);
assert_eq!(pool.idle_count(), 1);
assert_eq!(pool.active_count(), 0);
let desc2 = pool.acquire(key).expect("pool is not exhausted");
assert_eq!(desc2.id, id, "should reuse the same descriptor");
assert_eq!(desc2.request_count, 2);
let stats = pool.stats();
assert_eq!(stats.connections_created, 1);
assert_eq!(stats.connections_reused, 1);
}
#[test]
fn test_pool_exhaustion_returns_error() {
let pool = CdnKeepalivePool::new(KeepalivePoolConfig {
max_idle_per_host: 1,
max_total_connections: 2,
idle_timeout: Duration::from_secs(60),
max_requests_per_conn: 100,
prefer_reuse: true,
});
let key = HostKey::from_url("https://cdn.example.com/seg.ts")
.expect("valid URL literal");
let _d1 = pool.acquire(key.clone()).expect("pool limit is 2, first acquire succeeds");
let _d2 = pool.acquire(key.clone()).expect("pool limit is 2, second acquire succeeds");
let err = pool.acquire(key).expect_err("pool is exhausted at limit 2");
assert!(matches!(err, NetError::Buffer(_)));
}
#[test]
fn test_peer_close_not_returned_to_pool() {
let pool = make_pool();
let key = HostKey::from_url("https://cdn.example.com/seg.ts")
.expect("valid URL literal");
let mut desc = pool.acquire(key).expect("pool is not exhausted");
desc.peer_close_requested = true;
pool.release(desc);
assert_eq!(pool.idle_count(), 0);
let stats = pool.stats();
assert_eq!(stats.connections_discarded, 1);
}
#[test]
fn test_evict_host_clears_idle() {
let pool = make_pool();
let key = HostKey::from_url("https://cdn.example.com/seg.ts")
.expect("valid URL literal");
let desc = pool.acquire(key.clone()).expect("pool is not exhausted");
pool.release(desc);
assert_eq!(pool.idle_count(), 1);
pool.evict_host(&key);
assert_eq!(pool.idle_count(), 0);
}
#[test]
fn test_max_idle_per_host_enforced() {
let pool = CdnKeepalivePool::new(KeepalivePoolConfig {
max_idle_per_host: 2,
max_total_connections: 20,
idle_timeout: Duration::from_secs(60),
max_requests_per_conn: 100,
prefer_reuse: false, });
let key = HostKey::from_url("https://cdn.example.com/seg.ts")
.expect("valid URL literal");
let d1 = pool.acquire(key.clone()).expect("pool limit is 20");
let d2 = pool.acquire(key.clone()).expect("pool limit is 20");
let d3 = pool.acquire(key.clone()).expect("pool limit is 20");
pool.release(d1);
pool.release(d2);
pool.release(d3);
assert_eq!(pool.idle_count(), 2);
let stats = pool.stats();
assert_eq!(stats.connections_discarded, 1);
}
#[test]
fn test_purge_stale_removes_expired() {
let pool = CdnKeepalivePool::new(KeepalivePoolConfig {
max_idle_per_host: 4,
max_total_connections: 10,
idle_timeout: Duration::ZERO,
max_requests_per_conn: 100,
prefer_reuse: true,
});
let key = HostKey::from_url("https://cdn.example.com/seg.ts")
.expect("valid URL literal");
let desc = pool.acquire(key).expect("pool is not exhausted");
pool.release(desc);
pool.purge_stale();
assert_eq!(pool.idle_count(), 0);
}
#[test]
fn test_stats_reuse_ratio() {
let pool = make_pool();
let key = HostKey::from_url("https://cdn.example.com/seg.ts")
.expect("valid URL literal");
let d = pool.acquire(key.clone()).expect("pool is not exhausted");
pool.release(d);
let _d2 = pool.acquire(key).expect("pool is not exhausted");
let stats = pool.stats();
let ratio = stats.reuse_ratio();
assert!((ratio - 0.5_f64).abs() < 1e-9);
}
#[test]
fn test_conn_descriptor_is_reusable() {
let key = HostKey::from_url("https://cdn.example.com")
.expect("valid URL literal");
let mut desc = ConnDescriptor::new(1, key, 10);
desc.record_request();
assert!(desc.is_reusable(Duration::from_secs(60)));
desc.max_requests = 1;
assert!(!desc.is_reusable(Duration::from_secs(60)));
}
#[test]
fn test_host_key_display() {
let key = HostKey::from_url("https://cdn.example.com/path")
.expect("valid URL literal");
assert_eq!(key.display(), "https://cdn.example.com:443");
}
}