use super::HttpSession;
use crate::connectors::{ConnectorOptions, TransportConnector};
use crate::protocols::http::custom::client::Session;
use crate::protocols::http::v1::client::HttpSession as Http1Session;
use crate::protocols::http::v2::client::{drive_connection, Http2Session};
use crate::protocols::{Digest, Stream, UniqueIDType};
use crate::upstreams::peer::{Peer, ALPN};
use bytes::Bytes;
use h2::client::SendRequest;
use log::debug;
use parking_lot::{Mutex, RwLock};
use pingora_error::{Error, ErrorType::*, OrErr, Result};
use pingora_pool::{ConnectionMeta, ConnectionPool, PoolNode};
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::watch;
struct Stub(SendRequest<Bytes>);
impl Stub {
async fn new_stream(&self) -> Result<SendRequest<Bytes>> {
let send_req = self.0.clone();
send_req
.ready()
.await
.or_err(H2Error, "while creating new stream")
}
}
pub(crate) struct ConnectionRefInner {
connection_stub: Stub,
closed: watch::Receiver<bool>,
ping_timeout_occurred: Arc<AtomicBool>,
id: UniqueIDType,
max_streams: usize,
current_streams: AtomicUsize,
shutting_down: AtomicBool,
pub(crate) digest: Digest,
pub(crate) release_lock: Arc<Mutex<()>>,
}
#[derive(Clone)]
pub struct ConnectionRef(Arc<ConnectionRefInner>);
impl ConnectionRef {
pub fn new(
send_req: SendRequest<Bytes>,
closed: watch::Receiver<bool>,
ping_timeout_occurred: Arc<AtomicBool>,
id: UniqueIDType,
max_streams: usize,
digest: Digest,
) -> Self {
ConnectionRef(Arc::new(ConnectionRefInner {
connection_stub: Stub(send_req),
closed,
ping_timeout_occurred,
id,
max_streams,
current_streams: AtomicUsize::new(0),
shutting_down: false.into(),
digest,
release_lock: Arc::new(Mutex::new(())),
}))
}
pub fn more_streams_allowed(&self) -> bool {
let current = self.0.current_streams.load(Ordering::Relaxed);
!self.is_shutting_down()
&& self.0.max_streams > current
&& self.0.connection_stub.0.current_max_send_streams() > current
}
pub fn is_idle(&self) -> bool {
self.0.current_streams.load(Ordering::Relaxed) == 0
}
pub fn release_stream(&self) {
self.0.current_streams.fetch_sub(1, Ordering::SeqCst);
}
pub fn id(&self) -> UniqueIDType {
self.0.id
}
pub fn digest(&self) -> &Digest {
&self.0.digest
}
pub fn digest_mut(&mut self) -> Option<&mut Digest> {
Arc::get_mut(&mut self.0).map(|inner| &mut inner.digest)
}
pub fn ping_timedout(&self) -> bool {
self.0.ping_timeout_occurred.load(Ordering::Relaxed)
}
pub fn is_closed(&self) -> bool {
*self.0.closed.borrow()
}
pub fn is_shutting_down(&self) -> bool {
self.0.shutting_down.load(Ordering::Relaxed)
}
pub async fn spawn_stream(&self) -> Result<Option<Http2Session>> {
let current_streams = self.0.current_streams.fetch_add(1, Ordering::SeqCst);
if current_streams >= self.0.max_streams {
self.0.current_streams.fetch_sub(1, Ordering::SeqCst);
return Ok(None);
}
match self.0.connection_stub.new_stream().await {
Ok(send_req) => Ok(Some(Http2Session::new(send_req, self.clone()))),
Err(e) => {
self.0.current_streams.fetch_sub(1, Ordering::SeqCst);
if e.root_cause()
.downcast_ref::<h2::Error>()
.map(|e| {
e.is_go_away() && e.is_remote() && e.reason() == Some(h2::Reason::NO_ERROR)
})
.unwrap_or(false)
{
self.0.shutting_down.store(true, Ordering::Relaxed);
Ok(None)
} else {
Err(e)
}
}
}
}
}
pub struct InUsePool {
pools: RwLock<HashMap<u64, PoolNode<ConnectionRef>>>,
}
impl InUsePool {
fn new() -> Self {
InUsePool {
pools: RwLock::new(HashMap::new()),
}
}
pub fn insert(&self, reuse_hash: u64, conn: ConnectionRef) {
{
let pools = self.pools.read();
if let Some(pool) = pools.get(&reuse_hash) {
pool.insert(conn.id(), conn);
return;
}
}
let pool = PoolNode::new();
pool.insert(conn.id(), conn);
let mut pools = self.pools.write();
pools.insert(reuse_hash, pool);
}
pub fn get(&self, reuse_hash: u64) -> Option<ConnectionRef> {
let pools = self.pools.read();
pools.get(&reuse_hash)?.get_any().map(|v| v.1)
}
pub fn release(&self, reuse_hash: u64, id: UniqueIDType) -> Option<ConnectionRef> {
let pools = self.pools.read();
if let Some(pool) = pools.get(&reuse_hash) {
pool.remove(id)
} else {
None
}
}
}
const DEFAULT_POOL_SIZE: usize = 128;
pub struct Connector {
transport: TransportConnector,
idle_pool: Arc<ConnectionPool<ConnectionRef>>,
in_use_pool: InUsePool,
}
impl Connector {
pub fn new(options: Option<ConnectorOptions>) -> Self {
let pool_size = options
.as_ref()
.map_or(DEFAULT_POOL_SIZE, |o| o.keepalive_pool_size);
Connector {
transport: TransportConnector::new(options),
idle_pool: Arc::new(ConnectionPool::new(pool_size)),
in_use_pool: InUsePool::new(),
}
}
pub fn transport(&self) -> &TransportConnector {
&self.transport
}
pub fn idle_pool(&self) -> &Arc<ConnectionPool<ConnectionRef>> {
&self.idle_pool
}
pub fn in_use_pool(&self) -> &InUsePool {
&self.in_use_pool
}
pub async fn new_http_session<P: Peer + Send + Sync + 'static, C: Session>(
&self,
peer: &P,
) -> Result<HttpSession<C>> {
let stream = self.transport.new_stream(peer).await?;
match stream.selected_alpn_proto() {
Some(ALPN::H2) => { }
Some(_) => {
return Ok(HttpSession::H1(Http1Session::new_with_options(
stream, peer,
)));
}
None => {
if peer.tls()
|| peer
.get_peer_options()
.is_none_or(|o| o.alpn.get_min_http_version() == 1)
{
return Ok(HttpSession::H1(Http1Session::new_with_options(
stream, peer,
)));
}
}
}
let max_h2_stream = peer.get_peer_options().map_or(1, |o| o.max_h2_streams);
let conn = handshake(stream, max_h2_stream, peer.h2_ping_interval()).await?;
let h2_stream = conn
.spawn_stream()
.await?
.expect("newly created connections should have at least one free stream");
if conn.more_streams_allowed() {
self.in_use_pool.insert(peer.reuse_hash(), conn);
}
Ok(HttpSession::H2(h2_stream))
}
pub async fn reused_http_session<P: Peer + Send + Sync + 'static>(
&self,
peer: &P,
) -> Result<Option<Http2Session>> {
let reuse_hash = peer.reuse_hash();
let maybe_conn = self
.in_use_pool
.get(reuse_hash)
.filter(|c| !c.is_closed())
.or_else(|| self.idle_pool.get(&reuse_hash));
if let Some(conn) = maybe_conn {
#[cfg(unix)]
if !peer.matches_fd(conn.id()) {
return Ok(None);
}
#[cfg(windows)]
{
use std::os::windows::io::{AsRawSocket, RawSocket};
struct WrappedRawSocket(RawSocket);
impl AsRawSocket for WrappedRawSocket {
fn as_raw_socket(&self) -> RawSocket {
self.0
}
}
if !peer.matches_sock(WrappedRawSocket(conn.id() as RawSocket)) {
return Ok(None);
}
}
let h2_stream = conn.spawn_stream().await?;
if conn.more_streams_allowed() {
self.in_use_pool.insert(reuse_hash, conn);
}
Ok(h2_stream)
} else {
Ok(None)
}
}
pub fn release_http_session<P: Peer + Send + Sync + 'static>(
&self,
session: Http2Session,
peer: &P,
idle_timeout: Option<Duration>,
) {
let id = session.conn.id();
let reuse_hash = peer.reuse_hash();
let conn = session.conn();
let locked = conn.0.release_lock.lock_arc();
drop(session);
let conn = self.in_use_pool.release(reuse_hash, id).unwrap_or(conn);
if conn.is_closed() || conn.is_shutting_down() {
return;
}
if conn.is_idle() {
drop(locked);
let meta = ConnectionMeta {
key: reuse_hash,
id,
};
let closed = conn.0.closed.clone();
let (notify_evicted, watch_use) = self.idle_pool.put(&meta, conn);
let pool = self.idle_pool.clone(); let rt = pingora_runtime::current_handle();
rt.spawn(async move {
pool.idle_timeout(&meta, idle_timeout, notify_evicted, closed, watch_use)
.await;
});
} else {
self.in_use_pool.insert(reuse_hash, conn);
drop(locked);
}
}
pub fn prefer_h1(&self, peer: &impl Peer) {
self.transport.prefer_h1(peer);
}
pub(crate) fn h1_is_preferred(&self, peer: &impl Peer) -> bool {
self.transport
.preferred_http_version
.get(peer)
.is_some_and(|v| matches!(v, ALPN::H1))
}
}
const H2_WINDOW_SIZE: u32 = 1 << 23;
pub async fn handshake(
stream: Stream,
max_streams: usize,
h2_ping_interval: Option<Duration>,
) -> Result<ConnectionRef> {
use h2::client::Builder;
use pingora_runtime::current_handle;
if max_streams == 0 {
return Error::e_explain(H2Error, "zero max_stream configured");
}
let id = stream.id();
let digest = Digest {
ssl_digest: stream.get_ssl_digest(),
timing_digest: stream.get_timing_digest(),
proxy_digest: stream.get_proxy_digest(),
socket_digest: stream.get_socket_digest(),
};
let (send_req, connection) = Builder::new()
.enable_push(false)
.initial_max_send_streams(max_streams)
.max_concurrent_streams(1)
.max_frame_size(64 * 1024) .initial_window_size(H2_WINDOW_SIZE)
.initial_connection_window_size(H2_WINDOW_SIZE)
.handshake(stream)
.await
.or_err(HandshakeError, "during H2 handshake")?;
debug!("H2 handshake to server done.");
let ping_timeout_occurred = Arc::new(AtomicBool::new(false));
let ping_timeout_clone = ping_timeout_occurred.clone();
let max_allowed_streams = std::cmp::min(max_streams, connection.max_concurrent_send_streams());
if max_allowed_streams == 0 {
return Error::e_explain(H2Error, "zero max_concurrent_send_streams received");
}
let (closed_tx, closed_rx) = watch::channel(false);
current_handle().spawn(async move {
drive_connection(
connection,
id,
closed_tx,
h2_ping_interval,
ping_timeout_clone,
)
.await;
});
Ok(ConnectionRef::new(
send_req,
closed_rx,
ping_timeout_occurred,
id,
max_allowed_streams,
digest,
))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::upstreams::peer::HttpPeer;
#[tokio::test]
#[cfg(feature = "any_tls")]
async fn test_connect_h2() {
let connector = Connector::new(None);
let mut peer = HttpPeer::new(("1.1.1.1", 443), true, "one.one.one.one".into());
peer.options.set_http_version(2, 2);
let h2 = connector
.new_http_session::<HttpPeer, ()>(&peer)
.await
.unwrap();
match h2 {
HttpSession::H1(_) => panic!("expect h2"),
HttpSession::H2(h2_stream) => assert!(!h2_stream.ping_timedout()),
HttpSession::Custom(_) => panic!("expect h2"),
}
}
#[tokio::test]
#[cfg(feature = "any_tls")]
async fn test_connect_h1() {
let connector = Connector::new(None);
let mut peer = HttpPeer::new(("1.1.1.1", 443), true, "one.one.one.one".into());
peer.options.set_http_version(1, 1);
let h2 = connector
.new_http_session::<HttpPeer, ()>(&peer)
.await
.unwrap();
match h2 {
HttpSession::H1(_) => {}
HttpSession::H2(_) => panic!("expect h1"),
HttpSession::Custom(_) => panic!("expect h1"),
}
}
#[tokio::test]
async fn test_connect_h1_plaintext() {
let connector = Connector::new(None);
let mut peer = HttpPeer::new(("1.1.1.1", 80), false, "".into());
peer.options.set_http_version(2, 1);
let h2 = connector
.new_http_session::<HttpPeer, ()>(&peer)
.await
.unwrap();
match h2 {
HttpSession::H1(_) => {}
HttpSession::H2(_) => panic!("expect h1"),
HttpSession::Custom(_) => panic!("expect h1"),
}
}
#[tokio::test]
#[cfg(feature = "any_tls")]
async fn test_h2_single_stream() {
let connector = Connector::new(None);
let mut peer = HttpPeer::new(("1.1.1.1", 443), true, "one.one.one.one".into());
peer.options.set_http_version(2, 2);
peer.options.max_h2_streams = 1;
let h2 = connector
.new_http_session::<HttpPeer, ()>(&peer)
.await
.unwrap();
let h2_1 = match h2 {
HttpSession::H1(_) => panic!("expect h2"),
HttpSession::H2(h2_stream) => h2_stream,
HttpSession::Custom(_) => panic!("expect h2"),
};
let id = h2_1.conn.id();
assert!(connector
.reused_http_session(&peer)
.await
.unwrap()
.is_none());
connector.release_http_session(h2_1, &peer, None);
let h2_2 = connector.reused_http_session(&peer).await.unwrap().unwrap();
assert_eq!(id, h2_2.conn.id());
connector.release_http_session(h2_2, &peer, None);
let h2_3 = connector.reused_http_session(&peer).await.unwrap().unwrap();
assert_eq!(id, h2_3.conn.id());
}
#[tokio::test]
#[cfg(feature = "any_tls")]
async fn test_h2_multiple_stream() {
let connector = Connector::new(None);
let mut peer = HttpPeer::new(("1.1.1.1", 443), true, "one.one.one.one".into());
peer.options.set_http_version(2, 2);
peer.options.max_h2_streams = 3;
let h2 = connector
.new_http_session::<HttpPeer, ()>(&peer)
.await
.unwrap();
let h2_1 = match h2 {
HttpSession::H1(_) => panic!("expect h2"),
HttpSession::H2(h2_stream) => h2_stream,
HttpSession::Custom(_) => panic!("expect h2"),
};
let id = h2_1.conn.id();
let h2_2 = connector.reused_http_session(&peer).await.unwrap().unwrap();
assert_eq!(id, h2_2.conn.id());
let h2_3 = connector.reused_http_session(&peer).await.unwrap().unwrap();
assert_eq!(id, h2_3.conn.id());
assert!(connector
.reused_http_session(&peer)
.await
.unwrap()
.is_none());
connector.release_http_session(h2_1, &peer, None);
let h2_4 = connector.reused_http_session(&peer).await.unwrap().unwrap();
assert_eq!(id, h2_4.conn.id());
connector.release_http_session(h2_2, &peer, None);
connector.release_http_session(h2_3, &peer, None);
connector.release_http_session(h2_4, &peer, None);
let h2_5 = connector.reused_http_session(&peer).await.unwrap().unwrap();
assert_eq!(id, h2_5.conn.id());
}
#[cfg(all(feature = "any_tls", unix))]
#[tokio::test]
async fn test_h2_reuse_rejects_fd_mismatch() {
use crate::protocols::l4::socket::SocketAddr;
use crate::upstreams::peer::Peer;
use std::fmt::{Display, Formatter, Result as FmtResult};
use std::os::unix::prelude::AsRawFd;
#[derive(Clone)]
struct MismatchPeer {
reuse_hash: u64,
address: SocketAddr,
}
impl Display for MismatchPeer {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
write!(f, "{:?}", self.address)
}
}
impl Peer for MismatchPeer {
fn address(&self) -> &SocketAddr {
&self.address
}
fn tls(&self) -> bool {
true
}
fn sni(&self) -> &str {
""
}
fn reuse_hash(&self) -> u64 {
self.reuse_hash
}
fn matches_fd<V: AsRawFd>(&self, _fd: V) -> bool {
false
}
}
let connector = Connector::new(None);
let mut peer = HttpPeer::new(("1.1.1.1", 443), true, "one.one.one.one".into());
peer.options.set_http_version(2, 2);
peer.options.max_h2_streams = 1;
let h2 = connector
.new_http_session::<HttpPeer, ()>(&peer)
.await
.unwrap();
let h2_stream = match h2 {
HttpSession::H1(_) => panic!("expect h2"),
HttpSession::H2(h2_stream) => h2_stream,
HttpSession::Custom(_) => panic!("expect h2"),
};
connector.release_http_session(h2_stream, &peer, None);
let mismatch_peer = MismatchPeer {
reuse_hash: peer.reuse_hash(),
address: peer.address().clone(),
};
assert!(connector
.reused_http_session(&mismatch_peer)
.await
.unwrap()
.is_none());
}
}