use crate::connectors::{ConnectorOptions, TransportConnector};
use crate::protocols::http::v1::client::HttpSession;
use crate::upstreams::peer::Peer;
use pingora_error::Result;
use std::time::Duration;
pub struct Connector {
transport: TransportConnector,
}
impl Connector {
pub fn new(options: Option<ConnectorOptions>) -> Self {
Connector {
transport: TransportConnector::new(options),
}
}
pub async fn get_http_session<P: Peer + Send + Sync + 'static>(
&self,
peer: &P,
) -> Result<(HttpSession, bool)> {
let (stream, reused) = self.transport.get_stream(peer).await?;
let http = HttpSession::new_with_options(stream, peer);
Ok((http, reused))
}
pub async fn reused_http_session<P: Peer + Send + Sync + 'static>(
&self,
peer: &P,
) -> Option<HttpSession> {
let stream = self.transport.reused_stream(peer).await?;
let http = HttpSession::new_with_options(stream, peer);
Some(http)
}
pub async fn release_http_session<P: Peer + Send + Sync + 'static>(
&self,
mut session: HttpSession,
peer: &P,
idle_timeout: Option<Duration>,
) {
session.respect_keepalive();
if let Some(stream) = session.reuse().await {
self.transport
.release_stream(stream, peer.reuse_hash(), idle_timeout);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::protocols::l4::socket::SocketAddr;
use crate::upstreams::peer::HttpPeer;
use crate::upstreams::peer::Peer;
use pingora_http::RequestHeader;
use std::fmt::{Display, Formatter, Result as FmtResult};
async fn get_http(http: &mut HttpSession, expected_status: u16) {
let mut req = Box::new(RequestHeader::build("GET", b"/", None).unwrap());
req.append_header("Host", "one.one.one.one").unwrap();
http.write_request_header(req).await.unwrap();
http.read_response().await.unwrap();
http.respect_keepalive();
assert_eq!(http.get_status().unwrap(), expected_status);
while http.read_body_bytes().await.unwrap().is_some() {}
}
#[tokio::test]
async fn test_connect() {
let connector = Connector::new(None);
let peer = HttpPeer::new(("1.1.1.1", 80), false, "".into());
let (http, reused) = connector.get_http_session(&peer).await.unwrap();
let server_addr = http.server_addr().unwrap();
assert_eq!(*server_addr, "1.1.1.1:80".parse::<SocketAddr>().unwrap());
assert!(!reused);
connector.release_http_session(http, &peer, None).await;
let (mut http, reused) = connector.get_http_session(&peer).await.unwrap();
assert!(!reused);
get_http(&mut http, 301).await;
connector.release_http_session(http, &peer, None).await;
let (_, reused) = connector.get_http_session(&peer).await.unwrap();
assert!(reused);
}
#[cfg(unix)]
#[tokio::test]
async fn test_reuse_rejects_fd_mismatch() {
use std::os::unix::prelude::AsRawFd;
#[derive(Clone)]
struct MismatchPeer {
reuse_hash: u64,
address: SocketAddr,
}
impl Display for MismatchPeer {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
write!(f, "{:?}", self.address)
}
}
impl Peer for MismatchPeer {
fn address(&self) -> &SocketAddr {
&self.address
}
fn tls(&self) -> bool {
false
}
fn sni(&self) -> &str {
""
}
fn reuse_hash(&self) -> u64 {
self.reuse_hash
}
fn matches_fd<V: AsRawFd>(&self, _fd: V) -> bool {
false
}
}
let connector = Connector::new(None);
let peer = HttpPeer::new(("1.1.1.1", 80), false, "".into());
let (mut http, reused) = connector.get_http_session(&peer).await.unwrap();
assert!(!reused);
get_http(&mut http, 301).await;
connector.release_http_session(http, &peer, None).await;
let mismatch_peer = MismatchPeer {
reuse_hash: peer.reuse_hash(),
address: peer.address().clone(),
};
assert!(connector
.reused_http_session(&mismatch_peer)
.await
.is_none());
}
#[tokio::test]
#[cfg(feature = "any_tls")]
async fn test_connect_tls() {
let connector = Connector::new(None);
let peer = HttpPeer::new(("1.1.1.1", 443), true, "one.one.one.one".into());
let (http, reused) = connector.get_http_session(&peer).await.unwrap();
let server_addr = http.server_addr().unwrap();
assert_eq!(*server_addr, "1.1.1.1:443".parse::<SocketAddr>().unwrap());
assert!(!reused);
connector.release_http_session(http, &peer, None).await;
let (mut http, reused) = connector.get_http_session(&peer).await.unwrap();
assert!(!reused);
get_http(&mut http, 200).await;
connector.release_http_session(http, &peer, None).await;
let (_, reused) = connector.get_http_session(&peer).await.unwrap();
assert!(reused);
}
}