use async_trait::async_trait;
use http::Response;
use log::{debug, error, trace};
use pingora_http::ResponseHeader;
use std::sync::Arc;
use crate::apps::HttpServerApp;
use crate::modules::http::{HttpModules, ModuleBuilder};
use crate::protocols::http::HttpTask;
use crate::protocols::http::ServerSession;
use crate::protocols::Stream;
use crate::server::ShutdownWatch;
#[cfg_attr(not(doc_async_trait), async_trait)]
pub trait ServeHttp {
async fn response(&self, http_session: &mut ServerSession) -> Response<Vec<u8>>;
}
#[cfg_attr(not(doc_async_trait), async_trait)]
impl<SV> HttpServerApp for SV
where
SV: ServeHttp + Send + Sync,
{
async fn process_new_http(
self: &Arc<Self>,
mut http: ServerSession,
shutdown: &ShutdownWatch,
) -> Option<Stream> {
match http.read_request().await {
Ok(res) => match res {
false => {
debug!("Failed to read request header");
return None;
}
true => {
debug!("Successfully get a new request");
}
},
Err(e) => {
error!("HTTP server fails to read from downstream: {e}");
return None;
}
}
trace!("{:?}", http.req_header());
if *shutdown.borrow() {
http.set_keepalive(None);
} else {
http.set_keepalive(Some(60));
}
let new_response = self.response(&mut http).await;
let (parts, body) = new_response.into_parts();
let resp_header: ResponseHeader = parts.into();
match http.write_response_header(Box::new(resp_header)).await {
Ok(()) => {
debug!("HTTP response header done.");
}
Err(e) => {
error!(
"HTTP server fails to write to downstream: {e}, {}",
http.request_summary()
);
}
}
if !body.is_empty() {
match http.write_response_body(body.into()).await {
Ok(_) => debug!("HTTP response written."),
Err(e) => error!(
"HTTP server fails to write to downstream: {e}, {}",
http.request_summary()
),
}
}
match http.finish().await {
Ok(c) => c,
Err(e) => {
error!("HTTP server fails to finish the request: {e}");
None
}
}
}
}
pub struct HttpServer<SV> {
app: SV,
modules: HttpModules,
}
impl<SV> HttpServer<SV> {
pub fn new_app(app: SV) -> Self {
HttpServer {
app,
modules: HttpModules::new(),
}
}
pub fn add_module(&mut self, module: ModuleBuilder) {
self.modules.add_module(module)
}
}
#[cfg_attr(not(doc_async_trait), async_trait)]
impl<SV> HttpServerApp for HttpServer<SV>
where
SV: ServeHttp + Send + Sync,
{
async fn process_new_http(
self: &Arc<Self>,
mut http: ServerSession,
shutdown: &ShutdownWatch,
) -> Option<Stream> {
match http.read_request().await {
Ok(res) => match res {
false => {
debug!("Failed to read request header");
return None;
}
true => {
debug!("Successfully get a new request");
}
},
Err(e) => {
error!("HTTP server fails to read from downstream: {e}");
return None;
}
}
trace!("{:?}", http.req_header());
if *shutdown.borrow() {
http.set_keepalive(None);
} else {
http.set_keepalive(Some(60));
}
let mut module_ctx = self.modules.build_ctx();
let req = http.req_header_mut();
module_ctx.request_header_filter(req).ok()?;
let new_response = self.app.response(&mut http).await;
let (parts, body) = new_response.into_parts();
let resp_header: ResponseHeader = parts.into();
let mut task = HttpTask::Header(Box::new(resp_header), body.is_empty());
module_ctx.response_filter(&mut task).ok()?;
trace!("{task:?}");
match http.response_duplex_vec(vec![task]).await {
Ok(_) => {
debug!("HTTP response header done.");
}
Err(e) => {
error!(
"HTTP server fails to write to downstream: {e}, {}",
http.request_summary()
);
}
}
let mut task = if !body.is_empty() {
HttpTask::Body(Some(body.into()), true)
} else {
HttpTask::Body(None, true)
};
trace!("{task:?}");
module_ctx.response_filter(&mut task).ok()?;
match http.response_duplex_vec(vec![task]).await {
Ok(_) => debug!("HTTP response written."),
Err(e) => error!(
"HTTP server fails to write to downstream: {e}, {}",
http.request_summary()
),
}
match http.finish().await {
Ok(c) => c,
Err(e) => {
error!("HTTP server fails to finish the request: {e}");
None
}
}
}
}