use super::*;
use pingora_cache::{key::HashBinary, CacheKey, CacheMeta, RespCacheable, RespCacheable::*};
#[cfg_attr(not(doc_async_trait), async_trait)]
pub trait ProxyHttp {
type CTX;
fn new_ctx(&self) -> Self::CTX;
async fn upstream_peer(
&self,
session: &mut Session,
ctx: &mut Self::CTX,
) -> Result<Box<HttpPeer>>;
async fn request_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool>
where
Self::CTX: Send + Sync,
{
Ok(false)
}
fn request_cache_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> Result<()> {
Ok(())
}
fn cache_key_callback(&self, session: &Session, _ctx: &mut Self::CTX) -> Result<CacheKey> {
let req_header = session.req_header();
Ok(CacheKey::default(req_header))
}
fn cache_miss(&self, session: &mut Session, _ctx: &mut Self::CTX) {
session.cache.cache_miss();
}
async fn cache_hit_filter(
&self,
_meta: &CacheMeta,
_ctx: &mut Self::CTX,
_req: &RequestHeader,
) -> Result<bool>
where
Self::CTX: Send + Sync,
{
Ok(false)
}
async fn proxy_upstream_filter(
&self,
_session: &mut Session,
_ctx: &mut Self::CTX,
) -> Result<bool>
where
Self::CTX: Send + Sync,
{
Ok(true)
}
fn response_cache_filter(
&self,
_session: &Session,
_resp: &ResponseHeader,
_ctx: &mut Self::CTX,
) -> Result<RespCacheable> {
Ok(Uncacheable(NoCacheReason::Custom("default")))
}
fn cache_vary_filter(
&self,
_meta: &CacheMeta,
_ctx: &mut Self::CTX,
_req: &RequestHeader,
) -> Option<HashBinary> {
None
}
fn cache_not_modified_filter(
&self,
session: &Session,
resp: &ResponseHeader,
_ctx: &mut Self::CTX,
) -> Result<bool> {
Ok(
pingora_core::protocols::http::conditional_filter::not_modified_filter(
session.req_header(),
resp,
),
)
}
async fn upstream_request_filter(
&self,
_session: &mut Session,
_upstream_request: &mut RequestHeader,
_ctx: &mut Self::CTX,
) -> Result<()>
where
Self::CTX: Send + Sync,
{
Ok(())
}
fn upstream_response_filter(
&self,
_session: &mut Session,
_upstream_response: &mut ResponseHeader,
_ctx: &mut Self::CTX,
) {
}
async fn response_filter(
&self,
_session: &mut Session,
_upstream_response: &mut ResponseHeader,
_ctx: &mut Self::CTX,
) -> Result<()>
where
Self::CTX: Send + Sync,
{
Ok(())
}
fn upstream_response_body_filter(
&self,
_session: &mut Session,
_body: &mut Option<Bytes>,
_end_of_stream: bool,
_ctx: &mut Self::CTX,
) {
}
fn upstream_response_trailer_filter(
&self,
_session: &mut Session,
_upstream_trailers: &mut header::HeaderMap,
_ctx: &mut Self::CTX,
) -> Result<()> {
Ok(())
}
fn response_body_filter(
&self,
_session: &mut Session,
_body: &mut Option<Bytes>,
_end_of_stream: bool,
_ctx: &mut Self::CTX,
) -> Result<Option<std::time::Duration>>
where
Self::CTX: Send + Sync,
{
Ok(None)
}
async fn response_trailer_filter(
&self,
_session: &mut Session,
_upstream_trailers: &mut header::HeaderMap,
_ctx: &mut Self::CTX,
) -> Result<Option<Bytes>>
where
Self::CTX: Send + Sync,
{
Ok(None)
}
async fn logging(&self, _session: &mut Session, _e: Option<&Error>, _ctx: &mut Self::CTX)
where
Self::CTX: Send + Sync,
{
}
fn suppress_error_log(&self, _session: &Session, _ctx: &Self::CTX, _error: &Error) -> bool {
false
}
fn error_while_proxy(
&self,
peer: &HttpPeer,
session: &mut Session,
e: Box<Error>,
_ctx: &mut Self::CTX,
client_reused: bool,
) -> Box<Error> {
let mut e = e.more_context(format!("Peer: {}", peer));
e.retry
.decide_reuse(client_reused && !session.as_ref().retry_buffer_truncated());
e
}
fn fail_to_connect(
&self,
_session: &mut Session,
_peer: &HttpPeer,
_ctx: &mut Self::CTX,
e: Box<Error>,
) -> Box<Error> {
e
}
async fn fail_to_proxy(&self, session: &mut Session, e: &Error, _ctx: &mut Self::CTX) -> u16
where
Self::CTX: Send + Sync,
{
let server_session = session.as_mut();
let code = match e.etype() {
HTTPStatus(code) => *code,
_ => {
match e.esource() {
ErrorSource::Upstream => 502,
ErrorSource::Downstream => {
match e.etype() {
WriteError | ReadError | ConnectionClosed => {
0
}
_ => 400,
}
}
ErrorSource::Internal | ErrorSource::Unset => 500,
}
}
};
if code > 0 {
server_session.respond_error(code).await
}
code
}
fn should_serve_stale(
&self,
_session: &mut Session,
_ctx: &mut Self::CTX,
error: Option<&Error>, ) -> bool {
error.map_or(false, |e| e.esource() == &ErrorSource::Upstream)
}
async fn connected_to_upstream(
&self,
_session: &mut Session,
_reused: bool,
_peer: &HttpPeer,
_fd: std::os::unix::io::RawFd,
_digest: Option<&Digest>,
_ctx: &mut Self::CTX,
) -> Result<()>
where
Self::CTX: Send + Sync,
{
Ok(())
}
fn request_summary(&self, session: &Session, _ctx: &Self::CTX) -> String {
session.as_ref().request_summary()
}
fn is_purge(&self, _session: &Session, _ctx: &Self::CTX) -> bool {
false
}
fn purge_response_filter(
&self,
_session: &Session,
_ctx: &mut Self::CTX,
_purge_status: PurgeStatus,
_purge_response: &mut std::borrow::Cow<'static, ResponseHeader>,
) -> Result<()> {
Ok(())
}
}