use async_trait::async_trait;
use bytes::Bytes;
use futures::future::BoxFuture;
use futures::future::FutureExt;
use http::{header, version::Version, Method};
use log::{debug, error, trace, warn};
use once_cell::sync::Lazy;
use pingora_http::{RequestHeader, ResponseHeader};
use std::fmt::Debug;
use std::str;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use std::time::Duration;
use tokio::sync::{mpsc, Notify};
use tokio::time;
use pingora_cache::NoCacheReason;
use pingora_core::apps::{
HttpPersistentSettings, HttpServerApp, HttpServerOptions, ReusedHttpStream,
};
use pingora_core::connectors::http::custom;
use pingora_core::connectors::{http::Connector, ConnectorOptions};
use pingora_core::modules::http::compression::ResponseCompressionBuilder;
use pingora_core::modules::http::{HttpModuleCtx, HttpModules};
use pingora_core::protocols::http::client::HttpSession as ClientSession;
use pingora_core::protocols::http::custom::CustomMessageWrite;
use pingora_core::protocols::http::subrequest::server::SubrequestHandle;
use pingora_core::protocols::http::v1::client::HttpSession as HttpSessionV1;
use pingora_core::protocols::http::v2::server::H2Options;
use pingora_core::protocols::http::HttpTask;
use pingora_core::protocols::http::ServerSession as HttpSession;
use pingora_core::protocols::http::SERVER_NAME;
use pingora_core::protocols::Stream;
use pingora_core::protocols::{Digest, UniqueID};
use pingora_core::server::configuration::ServerConf;
use pingora_core::server::ShutdownWatch;
use pingora_core::upstreams::peer::{HttpPeer, Peer};
use pingora_error::{Error, ErrorSource, ErrorType::*, OrErr, Result};
const TASK_BUFFER_SIZE: usize = 4;
mod proxy_cache;
mod proxy_common;
mod proxy_custom;
mod proxy_h1;
mod proxy_h2;
mod proxy_purge;
mod proxy_trait;
pub mod subrequest;
use subrequest::{BodyMode, Ctx as SubrequestCtx};
pub use proxy_cache::range_filter::{range_header_filter, MultiRangeInfo, RangeType};
pub use proxy_purge::PurgeStatus;
pub use proxy_trait::{FailToProxy, ProxyHttp};
pub mod prelude {
pub use crate::{http_proxy, http_proxy_service, ProxyHttp, Session};
}
pub type ProcessCustomSession<SV, C> = Arc<
dyn Fn(Arc<HttpProxy<SV, C>>, Stream, &ShutdownWatch) -> BoxFuture<'static, Option<Stream>>
+ Send
+ Sync
+ Unpin
+ 'static,
>;
pub struct HttpProxy<SV, C = ()>
where
C: custom::Connector, {
inner: SV, client_upstream: Connector<C>,
shutdown: Notify,
shutdown_flag: Arc<AtomicBool>,
pub server_options: Option<HttpServerOptions>,
pub h2_options: Option<H2Options>,
pub downstream_modules: HttpModules,
max_retries: usize,
process_custom_session: Option<ProcessCustomSession<SV, C>>,
}
impl<SV> HttpProxy<SV, ()> {
pub fn new(inner: SV, conf: Arc<ServerConf>) -> Self {
HttpProxy {
inner,
client_upstream: Connector::new(Some(ConnectorOptions::from_server_conf(&conf))),
shutdown: Notify::new(),
shutdown_flag: Arc::new(AtomicBool::new(false)),
server_options: None,
h2_options: None,
downstream_modules: HttpModules::new(),
max_retries: conf.max_retries,
process_custom_session: None,
}
}
}
impl<SV, C> HttpProxy<SV, C>
where
C: custom::Connector,
{
fn new_custom(
inner: SV,
conf: Arc<ServerConf>,
connector: C,
on_custom: Option<ProcessCustomSession<SV, C>>,
server_options: Option<HttpServerOptions>,
) -> Self
where
SV: ProxyHttp + Send + Sync + 'static,
SV::CTX: Send + Sync,
{
let client_upstream =
Connector::new_custom(Some(ConnectorOptions::from_server_conf(&conf)), connector);
HttpProxy {
inner,
client_upstream,
shutdown: Notify::new(),
shutdown_flag: Arc::new(AtomicBool::new(false)),
server_options,
downstream_modules: HttpModules::new(),
max_retries: conf.max_retries,
process_custom_session: on_custom,
h2_options: None,
}
}
pub fn handle_init_modules(&mut self)
where
SV: ProxyHttp,
{
self.inner
.init_downstream_modules(&mut self.downstream_modules);
}
async fn handle_new_request(
&self,
mut downstream_session: Box<HttpSession>,
) -> Option<Box<HttpSession>>
where
SV: ProxyHttp + Send + Sync,
SV::CTX: Send + Sync,
{
let res = tokio::select! {
biased; res = downstream_session.read_request() => { res }
_ = self.shutdown.notified() => {
return None;
}
};
match res {
Ok(true) => {
debug!("Successfully get a new request");
}
Ok(false) => {
return None; }
Err(mut e) => {
e.as_down();
error!("Fail to proxy: {e}");
if matches!(e.etype, InvalidHTTPHeader) {
downstream_session
.respond_error(400)
.await
.unwrap_or_else(|e| {
error!("failed to send error response to downstream: {e}");
});
} downstream_session.shutdown().await;
return None;
}
}
trace!(
"Request header: {:?}",
downstream_session.req_header().as_ref()
);
if !self
.server_options
.as_ref()
.is_some_and(|opts| opts.allow_connect_method_proxying)
&& downstream_session.req_header().method == Method::CONNECT
{
downstream_session
.respond_error(405)
.await
.unwrap_or_else(|e| {
error!("failed to send error response to downstream: {e}");
});
downstream_session.shutdown().await;
return None;
}
Some(downstream_session)
}
async fn proxy_to_upstream(
&self,
session: &mut Session,
ctx: &mut SV::CTX,
) -> (bool, Option<Box<Error>>)
where
SV: ProxyHttp + Send + Sync,
SV::CTX: Send + Sync,
{
let peer = match self.inner.upstream_peer(session, ctx).await {
Ok(p) => p,
Err(e) => return (false, Some(e)),
};
let client_session = self.client_upstream.get_http_session(&*peer).await;
match client_session {
Ok((client_session, client_reused)) => {
let (server_reused, error) = match client_session {
ClientSession::H1(mut h1) => {
let (server_reused, client_reuse, error) = self
.proxy_to_h1_upstream(session, &mut h1, client_reused, &peer, ctx)
.await;
if client_reuse {
let session = ClientSession::H1(h1);
self.client_upstream
.release_http_session(session, &*peer, peer.idle_timeout())
.await;
}
(server_reused, error)
}
ClientSession::H2(mut h2) => {
let (server_reused, mut error) = self
.proxy_to_h2_upstream(session, &mut h2, client_reused, &peer, ctx)
.await;
let session = ClientSession::H2(h2);
self.client_upstream
.release_http_session(session, &*peer, peer.idle_timeout())
.await;
if let Some(e) = error.as_mut() {
if matches!(e.etype, H2Downgrade | InvalidH2) {
if peer
.get_alpn()
.is_none_or(|alpn| alpn.get_min_http_version() == 1)
{
self.client_upstream.prefer_h1(&*peer);
} else {
e.retry = false.into();
}
}
}
(server_reused, error)
}
ClientSession::Custom(mut c) => {
let (server_reused, error) = self
.proxy_to_custom_upstream(session, &mut c, client_reused, &peer, ctx)
.await;
let session = ClientSession::Custom(c);
self.client_upstream
.release_http_session(session, &*peer, peer.idle_timeout())
.await;
(server_reused, error)
}
};
(
server_reused,
error.map(|e| {
self.inner
.error_while_proxy(&peer, session, e, ctx, client_reused)
}),
)
}
Err(mut e) => {
e.as_up();
let new_err = self.inner.fail_to_connect(session, &peer, ctx, e);
(false, Some(new_err.into_up()))
}
}
}
async fn upstream_filter(
&self,
session: &mut Session,
task: &mut HttpTask,
ctx: &mut SV::CTX,
) -> Result<Option<Duration>>
where
SV: ProxyHttp + Send + Sync,
SV::CTX: Send + Sync,
{
let duration = match task {
HttpTask::Header(header, _eos) => {
self.inner
.upstream_response_filter(session, header, ctx)
.await?;
None
}
HttpTask::Body(data, eos) | HttpTask::UpgradedBody(data, eos) => self
.inner
.upstream_response_body_filter(session, data, *eos, ctx)?,
HttpTask::Trailer(Some(trailers)) => {
self.inner
.upstream_response_trailer_filter(session, trailers, ctx)?;
None
}
_ => {
None
}
};
Ok(duration)
}
async fn finish(
&self,
mut session: Session,
ctx: &mut SV::CTX,
reuse: bool,
error: Option<Box<Error>>,
) -> Option<ReusedHttpStream>
where
SV: ProxyHttp + Send + Sync,
SV::CTX: Send + Sync,
{
self.inner
.logging(&mut session, error.as_deref(), ctx)
.await;
if let Some(e) = error {
session.downstream_session.on_proxy_failure(e);
}
if reuse {
let persistent_settings = HttpPersistentSettings::for_session(&session);
session
.downstream_session
.finish()
.await
.ok()
.flatten()
.map(|s| ReusedHttpStream::new(s, Some(persistent_settings)))
} else {
None
}
}
fn cleanup_sub_req(&self, session: &mut Session) {
if let Some(ctx) = session.subrequest_ctx.as_mut() {
ctx.release_write_lock();
}
}
}
use pingora_cache::HttpCache;
use pingora_core::protocols::http::compression::ResponseCompressionCtx;
pub struct Session {
pub downstream_session: Box<HttpSession>,
pub cache: HttpCache,
pub upstream_compression: ResponseCompressionCtx,
pub ignore_downstream_range: bool,
pub upstream_headers_mutated_for_cache: bool,
pub subrequest_ctx: Option<Box<SubrequestCtx>>,
pub subrequest_spawner: Option<SubrequestSpawner>,
pub downstream_modules_ctx: HttpModuleCtx,
upstream_body_bytes_received: usize,
upstream_write_pending_time: Duration,
shutdown_flag: Arc<AtomicBool>,
}
impl Session {
fn new(
downstream_session: impl Into<Box<HttpSession>>,
downstream_modules: &HttpModules,
shutdown_flag: Arc<AtomicBool>,
) -> Self {
Session {
downstream_session: downstream_session.into(),
cache: HttpCache::new(),
upstream_compression: ResponseCompressionCtx::new(0, false, false),
ignore_downstream_range: false,
upstream_headers_mutated_for_cache: false,
subrequest_ctx: None,
subrequest_spawner: None, downstream_modules_ctx: downstream_modules.build_ctx(),
upstream_body_bytes_received: 0,
upstream_write_pending_time: Duration::ZERO,
shutdown_flag,
}
}
pub fn new_h1(stream: Stream) -> Self {
let modules = HttpModules::new();
Self::new(
Box::new(HttpSession::new_http1(stream)),
&modules,
Arc::new(AtomicBool::new(false)),
)
}
pub fn new_h1_with_modules(stream: Stream, downstream_modules: &HttpModules) -> Self {
Self::new(
Box::new(HttpSession::new_http1(stream)),
downstream_modules,
Arc::new(AtomicBool::new(false)),
)
}
pub fn as_downstream_mut(&mut self) -> &mut HttpSession {
&mut self.downstream_session
}
pub fn as_downstream(&self) -> &HttpSession {
&self.downstream_session
}
pub async fn respond_error(&mut self, error: u16) -> Result<()> {
self.as_downstream_mut().respond_error(error).await
}
pub async fn respond_error_with_body(&mut self, error: u16, body: Bytes) -> Result<()> {
self.as_downstream_mut()
.respond_error_with_body(error, body)
.await
}
pub async fn write_response_header(
&mut self,
mut resp: Box<ResponseHeader>,
end_of_stream: bool,
) -> Result<()> {
self.downstream_modules_ctx
.response_header_filter(&mut resp, end_of_stream)
.await?;
self.downstream_session.write_response_header(resp).await
}
pub async fn write_response_header_ref(
&mut self,
resp: &ResponseHeader,
end_of_stream: bool,
) -> Result<(), Box<Error>> {
self.write_response_header(Box::new(resp.clone()), end_of_stream)
.await
}
pub async fn write_response_body(
&mut self,
mut body: Option<Bytes>,
end_of_stream: bool,
) -> Result<()> {
self.downstream_modules_ctx
.response_body_filter(&mut body, end_of_stream)?;
if body.is_none() && !end_of_stream {
return Ok(());
}
let data = body.unwrap_or_default();
self.downstream_session
.write_response_body(data, end_of_stream)
.await
}
pub async fn write_response_tasks(&mut self, mut tasks: Vec<HttpTask>) -> Result<bool> {
let mut seen_upgraded = self.was_upgraded();
for task in tasks.iter_mut() {
match task {
HttpTask::Header(resp, end) => {
self.downstream_modules_ctx
.response_header_filter(resp, *end)
.await?;
}
HttpTask::Body(data, end) => {
self.downstream_modules_ctx
.response_body_filter(data, *end)?;
}
HttpTask::UpgradedBody(data, end) => {
seen_upgraded = true;
self.downstream_modules_ctx
.response_body_filter(data, *end)?;
}
HttpTask::Trailer(trailers) => {
if let Some(buf) = self
.downstream_modules_ctx
.response_trailer_filter(trailers)?
{
*task = HttpTask::Body(Some(buf), true);
}
}
HttpTask::Done => {
if let Some(buf) = self.downstream_modules_ctx.response_done_filter()? {
if seen_upgraded {
*task = HttpTask::UpgradedBody(Some(buf), true);
} else {
*task = HttpTask::Body(Some(buf), true);
}
}
}
_ => { }
}
}
self.downstream_session.response_duplex_vec(tasks).await
}
pub fn mark_upstream_headers_mutated_for_cache(&mut self) {
self.upstream_headers_mutated_for_cache = true;
}
pub fn upstream_headers_mutated_for_cache(&self) -> bool {
self.upstream_headers_mutated_for_cache
}
pub fn upstream_body_bytes_received(&self) -> usize {
self.upstream_body_bytes_received
}
pub(crate) fn set_upstream_body_bytes_received(&mut self, n: usize) {
self.upstream_body_bytes_received = n;
}
pub fn upstream_write_pending_time(&self) -> Duration {
self.upstream_write_pending_time
}
pub(crate) fn set_upstream_write_pending_time(&mut self, d: Duration) {
self.upstream_write_pending_time = d;
}
pub fn is_process_shutting_down(&self) -> bool {
self.shutdown_flag.load(Ordering::Acquire)
}
pub fn downstream_custom_message(
&mut self,
) -> Result<
Option<Box<dyn futures::Stream<Item = Result<Bytes>> + Unpin + Send + Sync + 'static>>,
> {
if let Some(custom_session) = self.downstream_session.as_custom_mut() {
custom_session
.take_custom_message_reader()
.map(Some)
.ok_or(Error::explain(
ReadError,
"can't extract custom reader from downstream",
))
} else {
Ok(None)
}
}
}
impl AsRef<HttpSession> for Session {
fn as_ref(&self) -> &HttpSession {
&self.downstream_session
}
}
impl AsMut<HttpSession> for Session {
fn as_mut(&mut self) -> &mut HttpSession {
&mut self.downstream_session
}
}
use std::ops::{Deref, DerefMut};
impl Deref for Session {
type Target = HttpSession;
fn deref(&self) -> &Self::Target {
&self.downstream_session
}
}
impl DerefMut for Session {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.downstream_session
}
}
static BAD_GATEWAY: Lazy<ResponseHeader> = Lazy::new(|| {
let mut resp = ResponseHeader::build(http::StatusCode::BAD_GATEWAY, Some(3)).unwrap();
resp.insert_header(header::SERVER, &SERVER_NAME[..])
.unwrap();
resp.insert_header(header::CONTENT_LENGTH, 0).unwrap();
resp.insert_header(header::CACHE_CONTROL, "private, no-store")
.unwrap();
resp
});
impl<SV, C> HttpProxy<SV, C>
where
C: custom::Connector,
{
async fn process_request(
self: &Arc<Self>,
mut session: Session,
mut ctx: <SV as ProxyHttp>::CTX,
) -> Option<ReusedHttpStream>
where
SV: ProxyHttp + Send + Sync + 'static,
<SV as ProxyHttp>::CTX: Send + Sync,
{
if let Err(e) = self
.inner
.early_request_filter(&mut session, &mut ctx)
.await
{
return self
.handle_error(session, &mut ctx, e, "Fail to early filter request:")
.await;
}
if self.inner.allow_spawning_subrequest(&session, &ctx) {
session.subrequest_spawner = Some(SubrequestSpawner::new(self.clone()));
}
let req = session.downstream_session.req_header_mut();
if let Err(e) = session
.downstream_modules_ctx
.request_header_filter(req)
.await
{
return self
.handle_error(
session,
&mut ctx,
e,
"Failed in downstream modules request filter:",
)
.await;
}
match self.inner.request_filter(&mut session, &mut ctx).await {
Ok(response_sent) => {
if response_sent {
self.inner.logging(&mut session, None, &mut ctx).await;
self.cleanup_sub_req(&mut session);
let persistent_settings = HttpPersistentSettings::for_session(&session);
return session
.downstream_session
.finish()
.await
.ok()
.flatten()
.map(|s| ReusedHttpStream::new(s, Some(persistent_settings)));
}
}
Err(e) => {
return self
.handle_error(session, &mut ctx, e, "Fail to filter request:")
.await;
}
}
if let Some((reuse, err)) = self.proxy_cache(&mut session, &mut ctx).await {
return self.finish(session, &mut ctx, reuse, err).await;
}
self.cleanup_sub_req(&mut session);
match self
.inner
.proxy_upstream_filter(&mut session, &mut ctx)
.await
{
Ok(proxy_to_upstream) => {
if !proxy_to_upstream {
if session.cache.enabled() {
session.cache.disable(NoCacheReason::DeclinedToUpstream);
}
if session.response_written().is_none() {
match session.write_response_header_ref(&BAD_GATEWAY, true).await {
Ok(()) => {}
Err(e) => {
return self
.handle_error(
session,
&mut ctx,
e,
"Error responding with Bad Gateway:",
)
.await;
}
}
}
return self.finish(session, &mut ctx, true, None).await;
}
}
Err(e) => {
if session.cache.enabled() {
session.cache.disable(NoCacheReason::InternalError);
}
return self
.handle_error(
session,
&mut ctx,
e,
"Error deciding if we should proxy to upstream:",
)
.await;
}
}
let mut retries: usize = 0;
let mut server_reuse = false;
let mut proxy_error: Option<Box<Error>> = None;
while retries < self.max_retries {
retries += 1;
let (reuse, e) = self.proxy_to_upstream(&mut session, &mut ctx).await;
server_reuse = reuse;
match e {
Some(error) => {
let retry = error.retry();
proxy_error = Some(error);
if !retry {
break;
}
warn!(
"Fail to proxy: {}, tries: {}, retry: {}, {}",
proxy_error.as_ref().unwrap(),
retries,
retry,
self.inner.request_summary(&session, &ctx)
);
}
None => {
proxy_error = None;
break;
}
};
}
#[allow(clippy::unnecessary_unwrap)]
let serve_stale_result = if proxy_error.is_some() && session.cache.can_serve_stale_error() {
self.handle_stale_if_error(&mut session, &mut ctx, proxy_error.as_ref().unwrap())
.await
} else {
None
};
let final_error = if let Some((reuse, stale_cache_error)) = serve_stale_result {
server_reuse = server_reuse && reuse;
stale_cache_error
} else {
proxy_error
};
if let Some(e) = final_error.as_ref() {
if session.cache.enabled() {
let reason = if *e.esource() == ErrorSource::Upstream {
NoCacheReason::UpstreamError
} else {
NoCacheReason::InternalError
};
session.cache.disable(reason);
}
let res = self.inner.fail_to_proxy(&mut session, e, &mut ctx).await;
if !self.inner.suppress_error_log(&session, &ctx, e) {
error!(
"Fail to proxy: {}, status: {}, tries: {}, retry: {}, {}",
final_error.as_ref().unwrap(),
res.error_code,
retries,
false, self.inner.request_summary(&session, &ctx),
);
}
}
self.finish(session, &mut ctx, server_reuse, final_error)
.await
}
async fn handle_error(
&self,
mut session: Session,
ctx: &mut <SV as ProxyHttp>::CTX,
e: Box<Error>,
context: &str,
) -> Option<ReusedHttpStream>
where
SV: ProxyHttp + Send + Sync + 'static,
<SV as ProxyHttp>::CTX: Send + Sync,
{
let res = self.inner.fail_to_proxy(&mut session, &e, ctx).await;
if !self.inner.suppress_error_log(&session, ctx, &e) {
error!(
"{context} {}, status: {}, {}",
e,
res.error_code,
self.inner.request_summary(&session, ctx)
);
}
self.inner.logging(&mut session, Some(&e), ctx).await;
self.cleanup_sub_req(&mut session);
session.downstream_session.on_proxy_failure(e);
if res.can_reuse_downstream {
let persistent_settings = HttpPersistentSettings::for_session(&session);
session
.downstream_session
.finish()
.await
.ok()
.flatten()
.map(|s| ReusedHttpStream::new(s, Some(persistent_settings)))
} else {
None
}
}
}
#[async_trait]
pub trait Subrequest {
async fn process_subrequest(
self: Arc<Self>,
session: Box<HttpSession>,
sub_req_ctx: Box<SubrequestCtx>,
);
}
#[async_trait]
impl<SV, C> Subrequest for HttpProxy<SV, C>
where
SV: ProxyHttp + Send + Sync + 'static,
<SV as ProxyHttp>::CTX: Send + Sync,
C: custom::Connector,
{
async fn process_subrequest(
self: Arc<Self>,
session: Box<HttpSession>,
sub_req_ctx: Box<SubrequestCtx>,
) {
debug!("starting subrequest");
let mut session = match self.handle_new_request(session).await {
Some(downstream_session) => Session::new(
downstream_session,
&self.downstream_modules,
self.shutdown_flag.clone(),
),
None => return, };
session.set_keepalive(None);
session.subrequest_ctx.replace(sub_req_ctx);
trace!("processing subrequest");
let ctx = self.inner.new_ctx();
self.process_request(session, ctx).await;
trace!("subrequest done");
}
}
pub struct SubrequestSpawner {
app: Arc<dyn Subrequest + Send + Sync>,
}
pub struct PreparedSubrequest {
app: Arc<dyn Subrequest + Send + Sync>,
session: Box<HttpSession>,
sub_req_ctx: Box<SubrequestCtx>,
}
impl PreparedSubrequest {
pub async fn run(self) {
self.app
.process_subrequest(self.session, self.sub_req_ctx)
.await
}
pub fn session(&self) -> &HttpSession {
self.session.as_ref()
}
pub fn session_mut(&mut self) -> &mut HttpSession {
self.session.deref_mut()
}
}
impl SubrequestSpawner {
pub fn new(app: Arc<dyn Subrequest + Send + Sync>) -> SubrequestSpawner {
SubrequestSpawner { app }
}
pub fn spawn_background_subrequest(
&self,
session: &HttpSession,
ctx: SubrequestCtx,
) -> tokio::task::JoinHandle<()> {
let new_app = self.app.clone(); let (mut session, handle) = subrequest::create_session(session);
if ctx.body_mode() == BodyMode::NoBody {
session
.as_subrequest_mut()
.expect("created subrequest session")
.clear_request_body_headers();
}
let sub_req_ctx = Box::new(ctx);
handle.drain_tasks();
tokio::spawn(async move {
new_app
.process_subrequest(Box::new(session), sub_req_ctx)
.await;
})
}
pub fn create_subrequest(
&self,
session: &HttpSession,
ctx: SubrequestCtx,
) -> (PreparedSubrequest, SubrequestHandle) {
let new_app = self.app.clone(); let (mut session, handle) = subrequest::create_session(session);
if ctx.body_mode() == BodyMode::NoBody {
session
.as_subrequest_mut()
.expect("created subrequest session")
.clear_request_body_headers();
}
let sub_req_ctx = Box::new(ctx);
(
PreparedSubrequest {
app: new_app,
session: Box::new(session),
sub_req_ctx,
},
handle,
)
}
}
#[async_trait]
impl<SV, C> HttpServerApp for HttpProxy<SV, C>
where
SV: ProxyHttp + Send + Sync + 'static,
<SV as ProxyHttp>::CTX: Send + Sync,
C: custom::Connector,
{
async fn process_new_http(
self: &Arc<Self>,
session: HttpSession,
shutdown: &ShutdownWatch,
) -> Option<ReusedHttpStream> {
let session = Box::new(session);
let mut session = match self.handle_new_request(session).await {
Some(downstream_session) => Session::new(
downstream_session,
&self.downstream_modules,
self.shutdown_flag.clone(),
),
None => return None, };
if *shutdown.borrow() {
session.set_keepalive(None);
}
let ctx = self.inner.new_ctx();
self.process_request(session, ctx).await
}
async fn http_cleanup(&self) {
self.shutdown_flag.store(true, Ordering::Release);
self.shutdown.notify_waiters();
}
fn server_options(&self) -> Option<&HttpServerOptions> {
self.server_options.as_ref()
}
fn h2_options(&self) -> Option<H2Options> {
self.h2_options.clone()
}
async fn process_custom_session(
self: Arc<Self>,
stream: Stream,
shutdown: &ShutdownWatch,
) -> Option<Stream> {
let app = self.clone();
let Some(process_custom_session) = app.process_custom_session.as_ref() else {
warn!("custom was called on an empty on_custom");
return None;
};
process_custom_session(self.clone(), stream, shutdown).await
}
}
use pingora_core::services::listening::Service;
pub fn http_proxy<SV>(conf: &Arc<ServerConf>, inner: SV) -> HttpProxy<SV>
where
SV: ProxyHttp,
{
let mut proxy = HttpProxy::new(inner, conf.clone());
proxy.handle_init_modules();
proxy
}
pub fn http_proxy_service<SV>(conf: &Arc<ServerConf>, inner: SV) -> Service<HttpProxy<SV, ()>>
where
SV: ProxyHttp,
{
http_proxy_service_with_name(conf, inner, "Pingora HTTP Proxy Service")
}
pub fn http_proxy_service_with_name<SV>(
conf: &Arc<ServerConf>,
inner: SV,
name: &str,
) -> Service<HttpProxy<SV, ()>>
where
SV: ProxyHttp,
{
let mut proxy = HttpProxy::new(inner, conf.clone());
proxy.handle_init_modules();
Service::new(name.to_string(), proxy)
}
pub fn http_proxy_service_with_name_custom<SV, C>(
conf: &Arc<ServerConf>,
inner: SV,
name: &str,
connector: C,
on_custom: ProcessCustomSession<SV, C>,
) -> Service<HttpProxy<SV, C>>
where
SV: ProxyHttp + Send + Sync + 'static,
SV::CTX: Send + Sync + 'static,
C: custom::Connector,
{
let mut proxy = HttpProxy::new_custom(inner, conf.clone(), connector, Some(on_custom), None);
proxy.handle_init_modules();
Service::new(name.to_string(), proxy)
}
pub struct ProxyServiceBuilder<SV, C>
where
SV: ProxyHttp + Send + Sync + 'static,
SV::CTX: Send + Sync + 'static,
C: custom::Connector,
{
conf: Arc<ServerConf>,
inner: SV,
name: String,
connector: C,
custom: Option<ProcessCustomSession<SV, C>>,
server_options: Option<HttpServerOptions>,
}
impl<SV> ProxyServiceBuilder<SV, ()>
where
SV: ProxyHttp + Send + Sync + 'static,
SV::CTX: Send + Sync + 'static,
{
pub fn new(conf: &Arc<ServerConf>, inner: SV) -> Self {
ProxyServiceBuilder {
conf: conf.clone(),
inner,
name: "Pingora HTTP Proxy Service".into(),
connector: (),
custom: None,
server_options: None,
}
}
}
impl<SV, C> ProxyServiceBuilder<SV, C>
where
SV: ProxyHttp + Send + Sync + 'static,
SV::CTX: Send + Sync + 'static,
C: custom::Connector,
{
pub fn name(mut self, name: impl AsRef<str>) -> Self {
self.name = name.as_ref().to_owned();
self
}
pub fn custom<C2: custom::Connector>(
self,
connector: C2,
on_custom: ProcessCustomSession<SV, C2>,
) -> ProxyServiceBuilder<SV, C2> {
let Self {
conf,
inner,
name,
server_options,
..
} = self;
ProxyServiceBuilder {
conf,
inner,
name,
connector,
custom: Some(on_custom),
server_options,
}
}
pub fn server_options(mut self, options: HttpServerOptions) -> Self {
self.server_options = Some(options);
self
}
pub fn build(self) -> Service<HttpProxy<SV, C>> {
let Self {
conf,
inner,
name,
connector,
custom,
server_options,
} = self;
let mut proxy = HttpProxy::new_custom(inner, conf, connector, custom, server_options);
proxy.handle_init_modules();
Service::new(name, proxy)
}
}