pub mod http_app;
pub mod prometheus_http_app;
use crate::server::ShutdownWatch;
use async_trait::async_trait;
use log::{debug, error};
use std::future::poll_fn;
use std::sync::Arc;
use crate::protocols::http::v2::server;
use crate::protocols::http::ServerSession;
use crate::protocols::Digest;
use crate::protocols::Stream;
use crate::protocols::ALPN;
const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
#[async_trait]
pub trait ServerApp {
async fn process_new(
self: &Arc<Self>,
mut session: Stream,
shutdown: &ShutdownWatch,
) -> Option<Stream>;
async fn cleanup(&self) {}
}
#[non_exhaustive]
#[derive(Default)]
pub struct HttpServerOptions {
pub h2c: bool,
}
#[async_trait]
pub trait HttpServerApp {
async fn process_new_http(
self: &Arc<Self>,
mut session: ServerSession,
shutdown: &ShutdownWatch,
) -> Option<Stream>;
fn h2_options(&self) -> Option<server::H2Options> {
None
}
fn server_options(&self) -> Option<&HttpServerOptions> {
None
}
async fn http_cleanup(&self) {}
}
#[async_trait]
impl<T> ServerApp for T
where
T: HttpServerApp + Send + Sync + 'static,
{
async fn process_new(
self: &Arc<Self>,
mut stream: Stream,
shutdown: &ShutdownWatch,
) -> Option<Stream> {
let mut h2c = self.server_options().as_ref().map_or(false, |o| o.h2c);
if h2c {
let mut buf = [0u8; H2_PREFACE.len()];
let peeked = stream
.try_peek(&mut buf)
.await
.map_err(|e| {
debug!("Read error while peeking h2c preface {e}");
e
})
.ok()?;
if peeked {
h2c = buf == H2_PREFACE;
}
}
if h2c || matches!(stream.selected_alpn_proto(), Some(ALPN::H2)) {
let digest = Arc::new(Digest {
ssl_digest: stream.get_ssl_digest(),
timing_digest: stream.get_timing_digest(),
proxy_digest: stream.get_proxy_digest(),
socket_digest: stream.get_socket_digest(),
});
let h2_options = self.h2_options();
let h2_conn = server::handshake(stream, h2_options).await;
let mut h2_conn = match h2_conn {
Err(e) => {
error!("H2 handshake error {e}");
return None;
}
Ok(c) => c,
};
let mut shutdown = shutdown.clone();
loop {
let h2_stream = tokio::select! {
_ = shutdown.changed() => {
h2_conn.graceful_shutdown();
let _ = poll_fn(|cx| h2_conn.poll_closed(cx))
.await.map_err(|e| error!("H2 error waiting for shutdown {e}"));
return None;
}
h2_stream = server::HttpSession::from_h2_conn(&mut h2_conn, digest.clone()) => h2_stream
};
let h2_stream = match h2_stream {
Err(e) => {
debug!("H2 error when accepting new stream {e}");
return None;
}
Ok(s) => s?, };
let app = self.clone();
let shutdown = shutdown.clone();
pingora_runtime::current_handle().spawn(async move {
app.process_new_http(ServerSession::new_http2(h2_stream), &shutdown)
.await;
});
}
} else {
self.process_new_http(ServerSession::new_http1(stream), shutdown)
.await
}
}
async fn cleanup(&self) {
self.http_cleanup().await;
}
}