pub(crate) mod connection;
pub(crate) use connection::{HttpConnection, PooledConnection};
use std::collections::{HashMap, HashSet, VecDeque};
use std::net::IpAddr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use http::uri::{Authority, Scheme};
use crate::runtime::RuntimePoll;
#[derive(Clone, Copy, Debug, Default, Hash, Eq, PartialEq)]
pub(crate) enum ProtocolHint {
#[default]
Auto,
H2c,
AdaptiveH2c,
}
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
pub(crate) struct PoolKey {
pub(crate) scheme: Scheme,
pub(crate) authority: Authority,
pub(crate) protocol: ProtocolHint,
}
impl PoolKey {
#[allow(dead_code)]
pub(crate) fn new(scheme: Scheme, authority: Authority) -> Self {
Self {
scheme,
authority,
protocol: ProtocolHint::Auto,
}
}
pub(crate) fn with_hint(scheme: Scheme, authority: Authority, protocol: ProtocolHint) -> Self {
Self {
scheme,
authority,
protocol,
}
}
}
struct IdleConnection<B> {
connection: PooledConnection<B>,
idle_since: Instant,
}
struct PoolInner<B> {
idle: HashMap<PoolKey, VecDeque<IdleConnection<B>>>,
san_index: HashMap<String, HashSet<PoolKey>>,
connecting_h2: HashSet<PoolKey>,
max_idle_per_host: usize,
idle_timeout: Duration,
}
pub(crate) struct ConnectionPool<B> {
inner: Arc<Mutex<PoolInner<B>>>,
reaper_spawned: Arc<AtomicBool>,
}
impl<B> Clone for ConnectionPool<B> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
reaper_spawned: Arc::clone(&self.reaper_spawned),
}
}
}
impl<B: 'static> ConnectionPool<B> {
pub(crate) fn new(max_idle_per_host: usize, idle_timeout: Duration) -> Self {
Self {
inner: Arc::new(Mutex::new(PoolInner {
idle: HashMap::new(),
san_index: HashMap::new(),
connecting_h2: HashSet::new(),
max_idle_per_host,
idle_timeout,
})),
reaper_spawned: Arc::new(AtomicBool::new(false)),
}
}
#[cfg(any(test, feature = "__bench"))]
pub(crate) fn new_no_reaper(max_idle_per_host: usize, idle_timeout: Duration) -> Self {
Self {
inner: Arc::new(Mutex::new(PoolInner {
idle: HashMap::new(),
san_index: HashMap::new(),
connecting_h2: HashSet::new(),
max_idle_per_host,
idle_timeout,
})),
reaper_spawned: Arc::new(AtomicBool::new(true)),
}
}
pub(crate) fn idle_timeout(&self) -> Duration {
self.inner
.lock()
.unwrap_or_else(|e| e.into_inner())
.idle_timeout
}
pub(crate) fn checkout(&self, key: &PoolKey) -> Option<PooledConnection<B>> {
let mut inner = self.inner.lock().ok()?;
let idle_timeout = inner.idle_timeout;
let queue = inner.idle.get_mut(key)?;
let now = Instant::now();
while let Some(entry) = queue.pop_back() {
if now.duration_since(entry.idle_since) >= idle_timeout {
continue;
}
if entry.connection.is_ready() {
if entry.connection.is_h2_or_h3()
&& let Some(cloned) = entry.connection.clone_for_multiplex()
{
let mut entry = entry;
entry.idle_since = now;
queue.push_back(entry);
return Some(cloned);
}
if queue.is_empty() {
inner.idle.remove(key);
}
return Some(entry.connection);
}
}
inner.idle.remove(key);
None
}
pub(crate) fn checkin(&self, key: PoolKey, connection: PooledConnection<B>) {
let Ok(mut inner) = self.inner.lock() else {
return;
};
let max = inner.max_idle_per_host;
if max == 0 {
return;
}
for san in connection.sans.iter() {
inner
.san_index
.entry(san.clone())
.or_default()
.insert(key.clone());
}
let queue = inner.idle.entry(key).or_default();
if queue.len() >= max {
queue.pop_front();
}
queue.push_back(IdleConnection {
connection,
idle_since: Instant::now(),
});
}
pub(crate) fn evict(&self, key: &PoolKey) {
let Ok(mut inner) = self.inner.lock() else {
return;
};
inner.idle.remove(key);
}
pub(crate) fn mark_connecting_h2(&self, key: &PoolKey) -> bool {
let Ok(mut inner) = self.inner.lock() else {
return false;
};
if inner.connecting_h2.contains(key) {
true
} else {
inner.connecting_h2.insert(key.clone());
false
}
}
pub(crate) fn unmark_connecting_h2(&self, key: &PoolKey) {
if let Ok(mut inner) = self.inner.lock() {
inner.connecting_h2.remove(key);
}
}
pub(crate) fn checkout_coalesced(
&self,
target_host: &str,
resolved_ip: Option<IpAddr>,
) -> Option<PooledConnection<B>> {
let mut inner = self.inner.lock().ok()?;
let now = Instant::now();
let idle_timeout = inner.idle_timeout;
let candidate_keys: Vec<PoolKey> = match inner.san_index.get(target_host) {
Some(keys) => keys.iter().cloned().collect(),
None => return None,
};
let mut found_key = None;
let mut found_conn = None;
for key in &candidate_keys {
let queue = match inner.idle.get_mut(key) {
Some(q) => q,
None => {
continue;
}
};
let mut i = queue.len();
while i > 0 {
i -= 1;
if now.duration_since(queue[i].idle_since) >= idle_timeout {
continue;
}
if !queue[i].connection.is_h2_or_h3() {
continue;
}
if !queue[i].connection.sans.iter().any(|s| s == target_host) {
continue;
}
if let Some(ip) = resolved_ip
&& queue[i].connection.remote_addr.map(|a| a.ip()) != Some(ip)
{
continue;
}
if queue[i].connection.is_ready()
&& let Some(cloned) = queue[i].connection.clone_for_multiplex()
{
queue[i].idle_since = now;
found_conn = Some(cloned);
break;
}
if !queue[i].connection.is_ready() {
continue;
}
if let Some(entry) = queue.remove(i) {
if queue.is_empty() {
found_key = Some(key.clone());
}
found_conn = Some(entry.connection);
break;
}
}
if found_conn.is_some() {
break;
}
}
if let Some(key) = found_key {
inner.idle.remove(&key);
}
for key in &candidate_keys {
if !inner.idle.contains_key(key)
&& let Some(keys) = inner.san_index.get_mut(target_host)
{
keys.remove(key);
if keys.is_empty() {
inner.san_index.remove(target_host);
}
}
}
found_conn
}
pub(crate) fn ensure_reaper<R: RuntimePoll>(&self)
where
B: Send,
{
if !self.reaper_spawned.swap(true, Ordering::AcqRel) {
self.spawn_reaper::<R>();
}
}
fn spawn_reaper<R: RuntimePoll>(&self)
where
B: Send,
{
let inner = Arc::clone(&self.inner);
R::spawn_send(async move {
loop {
let timeout = {
let Ok(guard) = inner.lock() else {
return;
};
guard.idle_timeout
};
R::sleep(timeout).await;
let Ok(mut guard) = inner.lock() else {
return;
};
let now = Instant::now();
let idle_timeout = guard.idle_timeout;
guard.idle.retain(|_, queue| {
queue.retain(|entry| now.duration_since(entry.idle_since) < idle_timeout);
!queue.is_empty()
});
let live_keys: HashSet<PoolKey> = guard.idle.keys().cloned().collect();
guard.san_index.retain(|_, keys| {
keys.retain(|k| live_keys.contains(k));
!keys.is_empty()
});
}
});
}
pub(crate) fn ensure_reaper_local<R: crate::runtime::RuntimeLocal>(&self) {
if !self.reaper_spawned.swap(true, Ordering::AcqRel) {
self.spawn_reaper_local::<R>();
}
}
fn spawn_reaper_local<R: crate::runtime::RuntimeLocal>(&self) {
let inner = Arc::clone(&self.inner);
R::spawn_local(async move {
loop {
let timeout = {
let Ok(guard) = inner.lock() else {
return;
};
guard.idle_timeout
};
R::sleep(timeout).await;
let Ok(mut guard) = inner.lock() else {
return;
};
let now = Instant::now();
let idle_timeout = guard.idle_timeout;
guard.idle.retain(|_, queue| {
queue.retain(|entry| now.duration_since(entry.idle_since) < idle_timeout);
!queue.is_empty()
});
let live_keys: HashSet<PoolKey> = guard.idle.keys().cloned().collect();
guard.san_index.retain(|_, keys| {
keys.retain(|k| live_keys.contains(k));
!keys.is_empty()
});
}
});
}
}
#[cfg(all(test, feature = "tokio"))]
mod tests_tokio;
#[cfg(all(test, feature = "smol"))]
mod tests_smol;
#[cfg(all(test, feature = "compio"))]
mod tests_compio;
#[cfg(test)]
mod tests_sync {
use super::*;
use crate::body::RequestBodySend;
fn key(host: &str) -> PoolKey {
PoolKey::new(
Scheme::HTTP,
host.parse::<Authority>().expect("valid authority"),
)
}
#[test]
#[cfg(not(target_arch = "wasm32"))]
fn checkout_returns_none_on_poisoned_mutex() {
let pool = ConnectionPool::<RequestBodySend>::new_no_reaper(8, Duration::from_secs(30));
let k = key("example.com:80");
let inner = Arc::clone(&pool.inner);
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let _guard = inner.lock().unwrap();
panic!("intentional panic to poison the mutex");
}));
assert!(result.is_err(), "panic should have occurred");
let result = pool.checkout(&k);
assert!(
result.is_none(),
"checkout on poisoned mutex should return None"
);
}
#[test]
#[cfg(not(target_arch = "wasm32"))]
fn checkin_returns_on_poisoned_mutex() {
let pool = ConnectionPool::<RequestBodySend>::new_no_reaper(8, Duration::from_secs(30));
let inner = Arc::clone(&pool.inner);
let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let _guard = inner.lock().unwrap();
panic!("intentional panic to poison the mutex");
}));
assert!(pool.inner.lock().is_err());
}
#[test]
#[cfg(not(target_arch = "wasm32"))]
fn mark_connecting_h2_returns_false_on_poisoned_mutex() {
let pool = ConnectionPool::<RequestBodySend>::new_no_reaper(8, Duration::from_secs(30));
let k = key("example.com:80");
let inner = Arc::clone(&pool.inner);
let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let _guard = inner.lock().unwrap();
panic!("intentional panic to poison the mutex");
}));
assert!(pool.inner.lock().is_err(), "mutex should be poisoned");
assert!(!pool.mark_connecting_h2(&k));
}
#[test]
#[cfg(not(target_arch = "wasm32"))]
fn unmark_connecting_h2_no_panic_on_poisoned_mutex() {
let pool = ConnectionPool::<RequestBodySend>::new_no_reaper(8, Duration::from_secs(30));
let k = key("example.com:80");
let inner = Arc::clone(&pool.inner);
let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let _guard = inner.lock().unwrap();
panic!("intentional panic to poison the mutex");
}));
assert!(pool.inner.lock().is_err(), "mutex should be poisoned");
pool.unmark_connecting_h2(&k);
}
#[test]
#[cfg(not(target_arch = "wasm32"))]
fn checkout_coalesced_returns_none_on_poisoned_mutex() {
let pool = ConnectionPool::<RequestBodySend>::new_no_reaper(8, Duration::from_secs(30));
let inner = Arc::clone(&pool.inner);
let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let _guard = inner.lock().unwrap();
panic!("intentional panic to poison the mutex");
}));
assert!(pool.inner.lock().is_err(), "mutex should be poisoned");
let ip: std::net::IpAddr = [10, 0, 0, 1].into();
let result = pool.checkout_coalesced("example.com", Some(ip));
assert!(
result.is_none(),
"checkout_coalesced on poisoned mutex should return None"
);
}
}