pub(crate) mod connection;
pub(crate) use connection::{HttpConnection, PooledConnection};
use std::collections::{HashMap, HashSet, VecDeque};
use std::marker::PhantomData;
use std::net::IpAddr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use http::uri::{Authority, Scheme};
use crate::runtime::Runtime;
#[derive(Clone, Copy, Debug, Default, Hash, Eq, PartialEq)]
pub(crate) enum ProtocolHint {
#[default]
Auto,
H2c,
AdaptiveH2c,
}
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
pub(crate) struct PoolKey {
pub(crate) scheme: Scheme,
pub(crate) authority: Authority,
pub(crate) protocol: ProtocolHint,
}
impl PoolKey {
#[allow(dead_code)]
pub(crate) fn new(scheme: Scheme, authority: Authority) -> Self {
Self {
scheme,
authority,
protocol: ProtocolHint::Auto,
}
}
pub(crate) fn with_hint(scheme: Scheme, authority: Authority, protocol: ProtocolHint) -> Self {
Self {
scheme,
authority,
protocol,
}
}
}
struct IdleConnection<R: Runtime> {
connection: PooledConnection<R>,
idle_since: Instant,
_runtime: PhantomData<R>,
}
struct PoolInner<R: Runtime> {
idle: HashMap<PoolKey, VecDeque<IdleConnection<R>>>,
san_index: HashMap<String, HashSet<PoolKey>>,
max_idle_per_host: usize,
idle_timeout: Duration,
_runtime: PhantomData<R>,
}
pub(crate) struct ConnectionPool<R: Runtime> {
inner: Arc<Mutex<PoolInner<R>>>,
reaper_spawned: Arc<AtomicBool>,
}
impl<R: Runtime> Clone for ConnectionPool<R> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
reaper_spawned: Arc::clone(&self.reaper_spawned),
}
}
}
impl<R: Runtime> ConnectionPool<R> {
pub(crate) fn new(max_idle_per_host: usize, idle_timeout: Duration) -> Self {
Self {
inner: Arc::new(Mutex::new(PoolInner::<R> {
idle: HashMap::new(),
san_index: HashMap::new(),
max_idle_per_host,
idle_timeout,
_runtime: PhantomData,
})),
reaper_spawned: Arc::new(AtomicBool::new(false)),
}
}
#[cfg(any(test, feature = "__bench"))]
pub(crate) fn new_no_reaper(max_idle_per_host: usize, idle_timeout: Duration) -> Self {
Self {
inner: Arc::new(Mutex::new(PoolInner::<R> {
idle: HashMap::new(),
san_index: HashMap::new(),
max_idle_per_host,
idle_timeout,
_runtime: PhantomData,
})),
reaper_spawned: Arc::new(AtomicBool::new(true)),
}
}
pub(crate) fn checkout(&self, key: &PoolKey) -> Option<PooledConnection<R>> {
let mut inner = self.inner.lock().unwrap();
let idle_timeout = inner.idle_timeout;
let queue = inner.idle.get_mut(key)?;
let now = Instant::now();
while let Some(entry) = queue.pop_back() {
if now.duration_since(entry.idle_since) >= idle_timeout {
continue;
}
if entry.connection.is_ready() {
if queue.is_empty() {
inner.idle.remove(key);
}
return Some(entry.connection);
}
}
inner.idle.remove(key);
None
}
pub(crate) fn checkin(&self, key: PoolKey, connection: PooledConnection<R>) {
self.ensure_reaper();
let mut inner = self.inner.lock().unwrap();
let max = inner.max_idle_per_host;
for san in &connection.sans {
inner
.san_index
.entry(san.clone())
.or_default()
.insert(key.clone());
}
let queue = inner.idle.entry(key).or_default();
if queue.len() >= max {
queue.pop_front();
}
queue.push_back(IdleConnection::<R> {
connection,
idle_since: Instant::now(),
_runtime: PhantomData,
});
}
pub(crate) fn checkout_coalesced(
&self,
target_host: &str,
resolved_ip: Option<IpAddr>,
) -> Option<PooledConnection<R>> {
let mut inner = self.inner.lock().unwrap();
let now = Instant::now();
let idle_timeout = inner.idle_timeout;
let candidate_keys: Vec<PoolKey> = match inner.san_index.get(target_host) {
Some(keys) => keys.iter().cloned().collect(),
None => return None,
};
let mut found_key = None;
let mut found_conn = None;
for key in &candidate_keys {
let queue = match inner.idle.get_mut(key) {
Some(q) => q,
None => {
continue;
}
};
let mut i = queue.len();
while i > 0 {
i -= 1;
let entry = &queue[i];
if now.duration_since(entry.idle_since) >= idle_timeout {
continue;
}
if !entry.connection.is_h2_or_h3() {
continue;
}
if !entry.connection.sans.iter().any(|s| s == target_host) {
continue;
}
if let Some(ip) = resolved_ip
&& entry.connection.remote_addr.map(|a| a.ip()) != Some(ip)
{
continue;
}
let entry = queue.remove(i).unwrap();
if entry.connection.is_ready() {
if queue.is_empty() {
found_key = Some(key.clone());
}
found_conn = Some(entry.connection);
break;
}
}
if found_conn.is_some() {
break;
}
}
if let Some(key) = found_key {
inner.idle.remove(&key);
}
for key in &candidate_keys {
if !inner.idle.contains_key(key)
&& let Some(keys) = inner.san_index.get_mut(target_host)
{
keys.remove(key);
if keys.is_empty() {
inner.san_index.remove(target_host);
}
}
}
found_conn
}
fn ensure_reaper(&self) {
if !self.reaper_spawned.swap(true, Ordering::AcqRel) {
self.spawn_reaper();
}
}
fn spawn_reaper(&self) {
let inner = Arc::clone(&self.inner);
R::spawn(async move {
loop {
let timeout = {
let guard = inner.lock().unwrap();
guard.idle_timeout
};
R::sleep(timeout).await;
let mut guard = inner.lock().unwrap();
let now = Instant::now();
let idle_timeout = guard.idle_timeout;
guard.idle.retain(|_, queue| {
queue.retain(|entry| now.duration_since(entry.idle_since) < idle_timeout);
!queue.is_empty()
});
let live_keys: HashSet<PoolKey> = guard.idle.keys().cloned().collect();
guard.san_index.retain(|_, keys| {
keys.retain(|k| live_keys.contains(k));
!keys.is_empty()
});
}
});
}
}
#[cfg(all(test, feature = "tokio"))]
mod tests_tokio;
#[cfg(all(test, feature = "smol"))]
mod tests_smol;
#[cfg(all(test, feature = "compio"))]
mod tests_compio;