pub mod http_app;
pub mod prometheus_http_app;
use crate::server::ShutdownWatch;
use async_trait::async_trait;
use log::{debug, error};
use std::future::poll_fn;
use std::sync::Arc;
use crate::protocols::http::v2::server;
use crate::protocols::http::ServerSession;
use crate::protocols::Digest;
use crate::protocols::Stream;
use crate::protocols::ALPN;
const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
#[async_trait]
pub trait ServerApp {
async fn process_new(
self: &Arc<Self>,
mut session: Stream,
shutdown: &ShutdownWatch,
) -> Option<Stream>;
async fn cleanup(&self) {}
}
#[non_exhaustive]
#[derive(Default)]
pub struct HttpServerOptions {
pub h2c: bool,
pub allow_connect_method_proxying: bool,
#[doc(hidden)]
pub force_custom: bool,
pub keepalive_request_limit: Option<u32>,
}
#[derive(Debug, Clone)]
pub struct HttpPersistentSettings {
keepalive_timeout: Option<u64>,
keepalive_reuses_remaining: Option<u32>,
}
impl HttpPersistentSettings {
pub fn for_session(session: &ServerSession) -> Self {
HttpPersistentSettings {
keepalive_timeout: session.get_keepalive(),
keepalive_reuses_remaining: session.get_keepalive_reuses_remaining(),
}
}
pub fn apply_to_session(self, session: &mut ServerSession) {
let Self {
keepalive_timeout,
mut keepalive_reuses_remaining,
} = self;
if let Some(reuses) = keepalive_reuses_remaining.as_mut() {
*reuses = reuses.saturating_sub(1);
}
session.set_keepalive(keepalive_timeout);
session.set_keepalive_reuses_remaining(keepalive_reuses_remaining);
}
}
#[derive(Debug)]
pub struct ReusedHttpStream {
stream: Stream,
persistent_settings: Option<HttpPersistentSettings>,
}
impl ReusedHttpStream {
pub fn new(stream: Stream, persistent_settings: Option<HttpPersistentSettings>) -> Self {
ReusedHttpStream {
stream,
persistent_settings,
}
}
pub fn consume(self) -> (Stream, Option<HttpPersistentSettings>) {
(self.stream, self.persistent_settings)
}
}
#[async_trait]
pub trait HttpServerApp {
async fn process_new_http(
self: &Arc<Self>,
mut session: ServerSession,
shutdown: &ShutdownWatch,
) -> Option<ReusedHttpStream>;
fn h2_options(&self) -> Option<server::H2Options> {
None
}
fn server_options(&self) -> Option<&HttpServerOptions> {
None
}
async fn http_cleanup(&self) {}
#[doc(hidden)]
async fn process_custom_session(
self: Arc<Self>,
_stream: Stream,
_shutdown: &ShutdownWatch,
) -> Option<Stream> {
None
}
}
#[async_trait]
impl<T> ServerApp for T
where
T: HttpServerApp + Send + Sync + 'static,
{
async fn process_new(
self: &Arc<Self>,
mut stream: Stream,
shutdown: &ShutdownWatch,
) -> Option<Stream> {
let mut h2c = self.server_options().as_ref().map_or(false, |o| o.h2c);
let custom = self
.server_options()
.as_ref()
.map_or(false, |o| o.force_custom);
if h2c && !custom {
let mut buf = [0u8; H2_PREFACE.len()];
let peeked = stream
.try_peek(&mut buf)
.await
.map_err(|e| {
debug!("Read error while peeking h2c preface {e}");
e
})
.ok()?;
if peeked {
h2c = buf == H2_PREFACE;
}
}
if h2c || matches!(stream.selected_alpn_proto(), Some(ALPN::H2)) {
let digest = Arc::new(Digest {
ssl_digest: stream.get_ssl_digest(),
timing_digest: stream.get_timing_digest(),
proxy_digest: stream.get_proxy_digest(),
socket_digest: stream.get_socket_digest(),
});
let h2_options = self.h2_options();
let h2_conn = server::handshake(stream, h2_options).await;
let mut h2_conn = match h2_conn {
Err(e) => {
error!("H2 handshake error {e}");
return None;
}
Ok(c) => c,
};
let mut shutdown = shutdown.clone();
loop {
let h2_stream = tokio::select! {
_ = shutdown.changed() => {
h2_conn.graceful_shutdown();
let _ = poll_fn(|cx| h2_conn.poll_closed(cx))
.await.map_err(|e| error!("H2 error waiting for shutdown {e}"));
return None;
}
h2_stream = server::HttpSession::from_h2_conn(&mut h2_conn, digest.clone()) => h2_stream
};
let h2_stream = match h2_stream {
Err(e) => {
debug!("H2 error when accepting new stream {e}");
return None;
}
Ok(s) => s?, };
let app = self.clone();
let shutdown = shutdown.clone();
pingora_runtime::current_handle().spawn(async move {
app.process_new_http(ServerSession::new_http2(h2_stream), &shutdown)
.await;
});
}
} else if custom || matches!(stream.selected_alpn_proto(), Some(ALPN::Custom(_))) {
return self.clone().process_custom_session(stream, shutdown).await;
} else {
let mut session = ServerSession::new_http1(stream);
if *shutdown.borrow() {
session.set_keepalive(None);
} else {
session.set_keepalive(Some(60));
}
session.set_keepalive_reuses_remaining(
self.server_options()
.and_then(|opts| opts.keepalive_request_limit),
);
let mut result = self.process_new_http(session, shutdown).await;
while let Some((stream, persistent_settings)) = result.map(|r| r.consume()) {
let mut session = ServerSession::new_http1(stream);
if let Some(persistent_settings) = persistent_settings {
persistent_settings.apply_to_session(&mut session);
}
result = self.process_new_http(session, shutdown).await;
}
}
None
}
async fn cleanup(&self) {
self.http_cleanup().await;
}
}