#![cfg_attr(doc_async_trait, feature(async_fn_in_trait))]
use async_trait::async_trait;
use bytes::Bytes;
use futures::future::FutureExt;
use http::{header, version::Version};
use log::{debug, error, trace, warn};
use once_cell::sync::Lazy;
use pingora_http::{RequestHeader, ResponseHeader};
use std::fmt::Debug;
use std::str;
use std::sync::Arc;
use tokio::sync::{mpsc, Notify};
use tokio::time;
use pingora_cache::NoCacheReason;
use pingora_core::apps::HttpServerApp;
use pingora_core::connectors::{http::Connector, ConnectorOptions};
use pingora_core::protocols::http::client::HttpSession as ClientSession;
use pingora_core::protocols::http::v1::client::HttpSession as HttpSessionV1;
use pingora_core::protocols::http::HttpTask;
use pingora_core::protocols::http::ServerSession as HttpSession;
use pingora_core::protocols::http::SERVER_NAME;
use pingora_core::protocols::Stream;
use pingora_core::protocols::{Digest, UniqueID};
use pingora_core::server::configuration::ServerConf;
use pingora_core::server::ShutdownWatch;
use pingora_core::upstreams::peer::{HttpPeer, Peer};
use pingora_error::{Error, ErrorSource, ErrorType::*, OrErr, Result};
const MAX_RETRIES: usize = 16;
const TASK_BUFFER_SIZE: usize = 4;
mod proxy_cache;
mod proxy_common;
mod proxy_h1;
mod proxy_h2;
mod proxy_purge;
mod proxy_trait;
mod subrequest;
use subrequest::Ctx as SubReqCtx;
pub use proxy_purge::PurgeStatus;
pub use proxy_trait::ProxyHttp;
pub mod prelude {
pub use crate::{http_proxy_service, ProxyHttp, Session};
}
pub struct HttpProxy<SV> {
inner: SV, client_upstream: Connector,
shutdown: Notify,
}
impl<SV> HttpProxy<SV> {
fn new(inner: SV, conf: Arc<ServerConf>) -> Arc<Self> {
Arc::new(HttpProxy {
inner,
client_upstream: Connector::new(Some(ConnectorOptions::from_server_conf(&conf))),
shutdown: Notify::new(),
})
}
async fn handle_new_request(
&self,
mut downstream_session: Box<HttpSession>,
) -> Option<Box<HttpSession>>
where
SV: ProxyHttp + Send + Sync,
SV::CTX: Send + Sync,
{
let res = tokio::select! {
biased; res = downstream_session.read_request() => { res }
_ = self.shutdown.notified() => {
return None;
}
};
match res {
Ok(true) => {
debug!("Successfully get a new request");
}
Ok(false) => {
return None; }
Err(mut e) => {
e.as_down();
error!("Fail to proxy: {}", e);
if matches!(e.etype, InvalidHTTPHeader) {
downstream_session.respond_error(400).await;
} downstream_session.shutdown().await;
return None;
}
}
trace!(
"Request header: {:?}",
downstream_session.req_header().as_ref()
);
Some(downstream_session)
}
async fn proxy_to_upstream(
&self,
session: &mut Session,
ctx: &mut SV::CTX,
) -> (bool, Option<Box<Error>>)
where
SV: ProxyHttp + Send + Sync,
SV::CTX: Send + Sync,
{
let peer = match self.inner.upstream_peer(session, ctx).await {
Ok(p) => p,
Err(e) => return (false, Some(e)),
};
let client_session = self.client_upstream.get_http_session(&*peer).await;
match client_session {
Ok((client_session, client_reused)) => {
let (server_reused, error) = match client_session {
ClientSession::H1(mut h1) => {
let (server_reused, client_reuse, error) = self
.proxy_to_h1_upstream(session, &mut h1, client_reused, &peer, ctx)
.await;
if client_reuse {
let session = ClientSession::H1(h1);
self.client_upstream
.release_http_session(session, &*peer, peer.idle_timeout())
.await;
}
(server_reused, error)
}
ClientSession::H2(mut h2) => {
let (server_reused, mut error) = self
.proxy_to_h2_upstream(session, &mut h2, client_reused, &peer, ctx)
.await;
let session = ClientSession::H2(h2);
self.client_upstream
.release_http_session(session, &*peer, peer.idle_timeout())
.await;
if let Some(e) = error.as_mut() {
if matches!(e.etype, H2Downgrade | InvalidH2) {
if peer
.get_alpn()
.map_or(true, |alpn| alpn.get_min_http_version() == 1)
{
self.client_upstream.prefer_h1(&*peer);
} else {
e.retry = false.into();
}
}
}
(server_reused, error)
}
};
(
server_reused,
error.map(|e| {
self.inner
.error_while_proxy(&peer, session, e, ctx, client_reused)
}),
)
}
Err(e) => {
let new_err = self.inner.fail_to_connect(session, &peer, ctx, e);
(false, Some(new_err.into_up()))
}
}
}
fn upstream_filter(
&self,
session: &mut Session,
task: &mut HttpTask,
ctx: &mut SV::CTX,
) -> Result<()>
where
SV: ProxyHttp,
{
match task {
HttpTask::Header(header, _eos) => {
self.inner.upstream_response_filter(session, header, ctx)
}
HttpTask::Body(data, eos) => self
.inner
.upstream_response_body_filter(session, data, *eos, ctx),
HttpTask::Trailer(Some(trailers)) => self
.inner
.upstream_response_trailer_filter(session, trailers, ctx)?,
_ => {
}
}
Ok(())
}
async fn finish(
&self,
mut session: Session,
ctx: &mut SV::CTX,
reuse: bool,
error: Option<&Error>,
) -> Option<Stream>
where
SV: ProxyHttp + Send + Sync,
SV::CTX: Send + Sync,
{
self.inner.logging(&mut session, error, ctx).await;
if reuse {
session.downstream_session.finish().await.ok().flatten()
} else {
None
}
}
}
use pingora_cache::HttpCache;
use pingora_core::protocols::http::compression::ResponseCompressionCtx;
pub struct Session {
pub downstream_session: Box<HttpSession>,
pub cache: HttpCache,
pub upstream_compression: ResponseCompressionCtx,
pub downstream_compression: ResponseCompressionCtx,
pub ignore_downstream_range: bool,
subrequest_ctx: Option<Box<SubReqCtx>>,
}
impl Session {
fn new(downstream_session: impl Into<Box<HttpSession>>) -> Self {
Session {
downstream_session: downstream_session.into(),
cache: HttpCache::new(),
upstream_compression: ResponseCompressionCtx::new(0, false), downstream_compression: ResponseCompressionCtx::new(0, false), ignore_downstream_range: false,
subrequest_ctx: None,
}
}
pub fn new_h1(stream: Stream) -> Self {
Self::new(Box::new(HttpSession::new_http1(stream)))
}
pub fn as_downstream_mut(&mut self) -> &mut HttpSession {
&mut self.downstream_session
}
pub fn as_downstream(&self) -> &HttpSession {
&self.downstream_session
}
}
impl Session {
async fn write_response_tasks(&mut self, mut tasks: Vec<HttpTask>) -> Result<bool> {
tasks
.iter_mut()
.for_each(|t| self.downstream_compression.response_filter(t));
self.downstream_session.response_duplex_vec(tasks).await
}
}
impl AsRef<HttpSession> for Session {
fn as_ref(&self) -> &HttpSession {
&self.downstream_session
}
}
impl AsMut<HttpSession> for Session {
fn as_mut(&mut self) -> &mut HttpSession {
&mut self.downstream_session
}
}
use std::ops::{Deref, DerefMut};
impl Deref for Session {
type Target = HttpSession;
fn deref(&self) -> &Self::Target {
&self.downstream_session
}
}
impl DerefMut for Session {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.downstream_session
}
}
static BAD_GATEWAY: Lazy<ResponseHeader> = Lazy::new(|| {
let mut resp = ResponseHeader::build(http::StatusCode::BAD_GATEWAY, Some(3)).unwrap();
resp.insert_header(header::SERVER, &SERVER_NAME[..])
.unwrap();
resp.insert_header(header::CONTENT_LENGTH, 0).unwrap();
resp.insert_header(header::CACHE_CONTROL, "private, no-store")
.unwrap();
resp
});
impl<SV> HttpProxy<SV> {
async fn process_request(
self: &Arc<Self>,
mut session: Session,
mut ctx: <SV as ProxyHttp>::CTX,
) -> Option<Stream>
where
SV: ProxyHttp + Send + Sync + 'static,
<SV as ProxyHttp>::CTX: Send + Sync,
{
match self.inner.request_filter(&mut session, &mut ctx).await {
Ok(response_sent) => {
if response_sent {
self.inner.logging(&mut session, None, &mut ctx).await;
return session.downstream_session.finish().await.ok().flatten();
}
}
Err(e) => {
if !self.inner.suppress_error_log(&session, &ctx, &e) {
error!(
"Fail to filter request: {}, {}",
e,
self.inner.request_summary(&session, &ctx)
);
}
self.inner.fail_to_proxy(&mut session, &e, &mut ctx).await;
self.inner.logging(&mut session, Some(&e), &mut ctx).await;
return None;
}
}
session
.downstream_compression
.request_filter(session.downstream_session.req_header());
if let Some((reuse, err)) = self.proxy_cache(&mut session, &mut ctx).await {
return self.finish(session, &mut ctx, reuse, err.as_deref()).await;
}
match self
.inner
.proxy_upstream_filter(&mut session, &mut ctx)
.await
{
Ok(proxy_to_upstream) => {
if !proxy_to_upstream {
if session.response_written().is_none() {
match session.write_response_header_ref(&BAD_GATEWAY).await {
Ok(()) => {}
Err(e) => {
if !self.inner.suppress_error_log(&session, &ctx, &e) {
error!(
"Error responding with Bad Gateway: {}, {}",
e,
self.inner.request_summary(&session, &ctx)
);
}
self.inner.fail_to_proxy(&mut session, &e, &mut ctx).await;
self.inner.logging(&mut session, Some(&e), &mut ctx).await;
return None;
}
}
}
return self.finish(session, &mut ctx, false, None).await;
}
}
Err(e) => {
if !self.inner.suppress_error_log(&session, &ctx, &e) {
error!(
"Error deciding if we should proxy to upstream: {}, {}",
e,
self.inner.request_summary(&session, &ctx)
);
}
self.inner.fail_to_proxy(&mut session, &e, &mut ctx).await;
self.inner.logging(&mut session, Some(&e), &mut ctx).await;
return None;
}
}
let mut retries: usize = 0;
let mut server_reuse = false;
let mut proxy_error: Option<Box<Error>> = None;
while retries < MAX_RETRIES {
retries += 1;
let (reuse, e) = self.proxy_to_upstream(&mut session, &mut ctx).await;
server_reuse = reuse;
match e {
Some(error) => {
let retry = error.retry();
proxy_error = Some(error);
if !retry {
break;
}
warn!(
"Fail to proxy: {}, tries: {}, retry: {}, {}",
proxy_error.as_ref().unwrap(),
retries,
retry,
self.inner.request_summary(&session, &ctx)
);
}
None => {
proxy_error = None;
break;
}
};
}
let serve_stale_result = if proxy_error.is_some() && session.cache.can_serve_stale_error() {
self.handle_stale_if_error(&mut session, &mut ctx, proxy_error.as_ref().unwrap())
.await
} else {
None
};
let final_error = if let Some((reuse, stale_cache_error)) = serve_stale_result {
server_reuse = server_reuse && reuse;
stale_cache_error
} else {
proxy_error
};
if let Some(e) = final_error.as_ref() {
let status = self.inner.fail_to_proxy(&mut session, e, &mut ctx).await;
if !self.inner.suppress_error_log(&session, &ctx, e) {
error!(
"Fail to proxy: {}, status: {}, tries: {}, retry: {}, {}",
final_error.as_ref().unwrap(),
status,
retries,
false, self.inner.request_summary(&session, &ctx)
);
}
}
self.finish(session, &mut ctx, server_reuse, final_error.as_deref())
.await
}
}
#[async_trait]
trait Subrequest {
async fn process_subrequest(
self: &Arc<Self>,
session: Box<HttpSession>,
sub_req_ctx: Box<SubReqCtx>,
);
}
#[async_trait]
impl<SV> Subrequest for HttpProxy<SV>
where
SV: ProxyHttp + Send + Sync + 'static,
<SV as ProxyHttp>::CTX: Send + Sync,
{
async fn process_subrequest(
self: &Arc<Self>,
session: Box<HttpSession>,
sub_req_ctx: Box<SubReqCtx>,
) {
debug!("starting subrequest");
let mut session = match self.handle_new_request(session).await {
Some(downstream_session) => Session::new(downstream_session),
None => return, };
session.set_keepalive(None);
session.subrequest_ctx.replace(sub_req_ctx);
trace!("processing subrequest");
let ctx = self.inner.new_ctx();
self.process_request(session, ctx).await;
trace!("subrequest done");
}
}
#[async_trait]
impl<SV> HttpServerApp for HttpProxy<SV>
where
SV: ProxyHttp + Send + Sync + 'static,
<SV as ProxyHttp>::CTX: Send + Sync,
{
async fn process_new_http(
self: &Arc<Self>,
session: HttpSession,
shutdown: &ShutdownWatch,
) -> Option<Stream> {
let session = Box::new(session);
let mut session = match self.handle_new_request(session).await {
Some(downstream_session) => Session::new(downstream_session),
None => return None, };
if *shutdown.borrow() {
session.set_keepalive(None);
} else {
session.set_keepalive(Some(60));
}
let ctx = self.inner.new_ctx();
self.process_request(session, ctx).await
}
async fn http_cleanup(&self) {
self.shutdown.notify_waiters();
}
}
use pingora_core::services::listening::Service;
pub fn http_proxy_service<SV>(conf: &Arc<ServerConf>, inner: SV) -> Service<HttpProxy<SV>> {
Service::new(
"Pingora HTTP Proxy Service".into(),
HttpProxy::new(inner, conf.clone()),
)
}
pub fn http_proxy_service_with_name<SV>(
conf: &Arc<ServerConf>,
inner: SV,
name: &str,
) -> Service<HttpProxy<SV>> {
Service::new(name.to_string(), HttpProxy::new(inner, conf.clone()))
}