use bytes::Bytes;
use dashmap::DashMap;
use http::Request;
use http_body_util::Empty;
use hyper::client::conn::http2::SendRequest;
use parking_lot::Mutex;
use std::sync::Arc;
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub struct ConnKey {
pub scheme: &'static str,
pub host: String,
pub port: u16,
pub proxy_key: String,
}
impl ConnKey {
pub fn new(
scheme: &'static str,
host: impl Into<String>,
port: u16,
proxy: Option<&url::Url>,
) -> Self {
Self {
scheme,
host: host.into(),
port,
proxy_key: proxy.map(|u| u.as_str().to_string()).unwrap_or_default(),
}
}
}
#[derive(Clone)]
pub struct PooledH2 {
pub sender: SendRequest<Empty<Bytes>>,
}
pub struct PooledH1 {
pub sender: Arc<tokio::sync::Mutex<hyper::client::conn::http1::SendRequest<Empty<Bytes>>>>,
}
impl Clone for PooledH1 {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
}
}
}
#[derive(Clone, Default)]
pub struct ConnPool {
slots: Arc<DashMap<ConnKey, Arc<Mutex<Option<PooledH2>>>>>,
h1_slots: Arc<DashMap<ConnKey, Arc<Mutex<Option<PooledH1>>>>>,
}
impl ConnPool {
pub fn new() -> Self {
Self::default()
}
pub fn slot(&self, key: ConnKey) -> Arc<Mutex<Option<PooledH2>>> {
self.slots
.entry(key)
.or_insert_with(|| Arc::new(Mutex::new(None)))
.clone()
}
pub fn get_live(&self, key: &ConnKey) -> Option<PooledH2> {
let slot = self.slots.get(key)?.clone();
let guard = slot.lock();
let p = guard.as_ref()?;
if p.sender.is_closed() {
return None;
}
Some(p.clone())
}
pub fn store(&self, key: ConnKey, p: PooledH2) {
let slot = self.slot(key);
*slot.lock() = Some(p);
}
pub fn invalidate(&self, key: &ConnKey) {
if let Some(slot) = self.slots.get(key) {
*slot.lock() = None;
}
}
pub fn h1_get_live(&self, key: &ConnKey) -> Option<PooledH1> {
let slot = self.h1_slots.get(key)?.clone();
let guard = slot.lock();
let p = guard.as_ref()?;
if p.sender.try_lock().is_ok_and(|s| s.is_closed()) {
return None;
}
Some(p.clone())
}
pub fn h1_store(&self, key: ConnKey, p: PooledH1) {
let slot = self
.h1_slots
.entry(key)
.or_insert_with(|| Arc::new(Mutex::new(None)))
.clone();
*slot.lock() = Some(p);
}
pub fn h1_invalidate(&self, key: &ConnKey) {
if let Some(slot) = self.h1_slots.get(key) {
*slot.lock() = None;
}
}
}
pub fn build_base_request(scheme: &str, authority: &str, path: &str) -> http::request::Builder {
Request::builder()
.method(http::Method::GET)
.uri(format!("{scheme}://{authority}{path}"))
}