use std::net::ToSocketAddrs;
use std::str::FromStr;
use std::time::{Duration, SystemTime};
use std::{borrow::Cow, collections::HashMap, sync::Arc};
use async_trait::async_trait;
use dashmap::mapref;
use http::uri::PathAndQuery;
use http::{HeaderName, HeaderValue, Uri};
use once_cell::sync::Lazy;
use openssl::base64;
use pingora::http::{RequestHeader, ResponseHeader};
use pingora::proxy::{ProxyHttp, Session};
use pingora::{upstreams::peer::HttpPeer, ErrorType::HTTPStatus};
use pingora_cache::lock::CacheLock;
use pingora_cache::{CacheKey, CacheMeta, NoCacheReason, RespCacheable};
use crate::cache::disk::storage::DiskCache;
use crate::config::{RouteCacheType, RouteUpstream};
use crate::stores::{self, routes::RouteStoreContainer};
use super::{
middleware::{
execute_request_plugins, execute_response_plugins, execute_upstream_request_plugins,
execute_upstream_response_plugins,
},
DEFAULT_PEER_OPTIONS,
};
static STORAGE_MEM_CACHE: Lazy<pingora_cache::MemCache> = Lazy::new(pingora_cache::MemCache::new);
static STORAGE_CACHE: Lazy<DiskCache> = Lazy::new(DiskCache::new);
static CACHEABLE_METHODS: Lazy<Vec<http::Method>> =
Lazy::new(|| vec![http::Method::GET, http::Method::HEAD]);
static CACHE_LOCK: Lazy<CacheLock> = Lazy::new(|| CacheLock::new(Duration::from_secs(1)));
pub struct Router {}
type Container = mapref::one::Ref<'static, String, RouteStoreContainer>;
fn process_route(ctx: &RouterContext) -> Arc<Container> {
ctx.route_container.as_ref().unwrap().clone()
}
fn get_cache_storage(cache_type: &RouteCacheType) -> &'static (dyn pingora_cache::Storage + Sync) {
match cache_type {
RouteCacheType::Disk => &*STORAGE_CACHE,
RouteCacheType::MemCache => &*STORAGE_MEM_CACHE,
}
}
pub struct RouterContext {
pub host: String,
pub route_container: Option<Arc<mapref::one::Ref<'static, String, RouteStoreContainer>>>,
pub upstream: RouteUpstream,
pub extensions: HashMap<Cow<'static, str>, String>,
pub timings: RouterTimings,
}
pub struct RouterTimings {
request_filter_start: std::time::Instant,
}
#[async_trait]
impl ProxyHttp for Router {
type CTX = RouterContext;
fn new_ctx(&self) -> Self::CTX {
RouterContext {
host: String::new(),
route_container: None,
upstream: RouteUpstream::default(),
extensions: HashMap::with_capacity(2),
timings: RouterTimings {
request_filter_start: std::time::Instant::now(),
},
}
}
async fn request_filter(
&self,
session: &mut Session,
ctx: &mut Self::CTX,
) -> pingora::Result<bool> {
let req_host = get_host(session);
let host_without_port = req_host.split(':').collect::<Vec<_>>()[0];
host_without_port.clone_into(&mut ctx.host);
ctx.host = host_without_port.to_string();
let Some(route_container) = stores::get_route_by_key(host_without_port) else {
session.respond_error(404).await;
return Ok(true);
};
let arced = Arc::new(route_container);
let uri = get_uri(session);
match &arced.path_matcher.pattern {
Some(pattern) if pattern.find(uri.path()).is_none() => {
session.respond_error(404).await;
return Ok(true);
}
_ => {}
}
ctx.route_container = Some(Arc::clone(&arced));
if let Ok(true) = execute_request_plugins(session, ctx, &arced.plugins).await {
return Ok(true);
}
if arced.cache.is_some() {
let cache = arced.cache.as_ref().unwrap();
if cache.enabled.unwrap_or(false) {
let storage = get_cache_storage(&cache.cache_type);
stores::insert_cache_routing(&ctx.host, cache.path.to_string_lossy().to_string());
session
.cache
.enable(storage, None, None, Some(&*CACHE_LOCK));
}
}
Ok(false)
}
async fn upstream_peer(
&self,
session: &mut Session,
ctx: &mut Self::CTX,
) -> pingora::Result<Box<HttpPeer>> {
let route_container = process_route(ctx);
if session.cache.enabled() {
session.cache.set_max_file_size_bytes(100 * 1024 * 1024);
}
let Some(healthy_upstream) = route_container.load_balancer.select(b"", 128) else {
return Err(pingora::Error::new(HTTPStatus(503)));
};
let (healthy_ip, healthy_port) = if let Some(scr) = healthy_upstream.addr.as_inet() {
(scr.ip().to_string(), scr.port())
} else {
return Err(pingora::Error::new(HTTPStatus(503)));
};
let Some(upstream) = route_container.upstreams.iter().find(|u| {
format!("{}:{}", u.ip, u.port)
.to_socket_addrs()
.unwrap()
.any(|s| s.ip().to_string() == healthy_ip && s.port() == healthy_port)
}) else {
return Err(pingora::Error::new(HTTPStatus(503)));
};
ctx.upstream = upstream.clone();
let mut peer = HttpPeer::new(
healthy_upstream,
healthy_port == 443,
upstream.sni.clone().unwrap_or(String::new()),
);
peer.options = DEFAULT_PEER_OPTIONS;
Ok(Box::new(peer))
}
async fn response_filter(
&self,
session: &mut Session,
upstream_response: &mut ResponseHeader,
ctx: &mut Self::CTX,
) -> pingora::Result<()> {
let route_container = process_route(ctx);
for (name, value) in &route_container.host_header_add {
upstream_response.insert_header(name, value)?;
}
for name in &route_container.host_header_remove {
upstream_response.remove_header(name);
}
let cache_state = ctx.extensions.get("cache_state").cloned();
if session.cache.enabled() && cache_state.is_some() {
let cache_state = cache_state.unwrap();
upstream_response.insert_header(
HeaderName::from_str("cache-status").unwrap(),
cache_state.as_str(),
)?;
let elapsed = ctx.timings.request_filter_start.elapsed();
upstream_response.insert_header(
HeaderName::from_str("cache-duration").unwrap(),
elapsed.as_millis().to_string(),
)?;
}
execute_response_plugins(&route_container, session, ctx).await?;
Ok(())
}
async fn upstream_request_filter(
&self,
session: &mut Session,
upstream_request: &mut RequestHeader,
ctx: &mut Self::CTX,
) -> pingora::Result<()> {
let route_container = process_route(ctx);
let upstream = &ctx.upstream;
if let Some(headers) = upstream.headers.as_ref() {
if let Some(add) = headers.add.as_ref() {
for header_add in add {
upstream_request
.insert_header(header_add.name.to_string(), header_add.value.to_string())
.ok();
}
}
}
execute_upstream_request_plugins(&route_container, session, upstream_request, ctx)
.await
.ok();
Ok(())
}
fn upstream_response_filter(
&self,
session: &mut Session,
upstream_response: &mut ResponseHeader,
ctx: &mut Self::CTX,
) {
let route_container = process_route(ctx);
execute_upstream_response_plugins(&route_container, session, upstream_response, ctx);
}
async fn logging(
&self,
session: &mut Session,
_: Option<&pingora::Error>,
ctx: &mut Self::CTX,
) {
let duration_ms = ctx.timings.request_filter_start.elapsed().as_millis();
let http_version = if session.is_http2() {
"http/2"
} else {
"http/1.1"
};
let method = session.req_header().method.to_string();
let query = session.req_header().uri.query().unwrap_or_default();
let path = session.req_header().uri.path();
let empty_header = HeaderValue::from_static("");
let host = session.req_header().uri.host();
let referer = session
.req_header()
.headers
.get("referer")
.unwrap_or(&empty_header);
let user_agent = session
.req_header()
.headers
.get("user-agent")
.unwrap_or(&empty_header);
let client_ip = session
.client_addr()
.map(ToString::to_string)
.unwrap_or_default();
let status_code = session
.response_written()
.map(|v| v.status.as_u16())
.unwrap_or_default();
tracing::info!(
method,
path,
query,
host,
duration_ms,
user_agent = user_agent.to_str().unwrap_or(""),
referer = referer.to_str().unwrap_or(""),
client_ip,
status_code,
http_version,
request_id = ctx.extensions.get("request_id_header"),
access_log = true
);
}
fn cache_key_callback(
&self,
session: &Session,
ctx: &mut Self::CTX,
) -> pingora::Result<CacheKey> {
let req_header = session.req_header();
Ok(CacheKey::new(
ctx.host.clone(),
base64::encode_block(
req_header
.uri
.path_and_query()
.unwrap_or(&PathAndQuery::from_static("/"))
.as_str()
.as_bytes(),
),
"",
))
}
fn cache_miss(&self, session: &mut Session, ctx: &mut Self::CTX) {
ctx.extensions
.insert(Cow::Borrowed("cache_state"), "fwd=miss".into());
session.cache.cache_miss();
}
async fn cache_hit_filter(
&self,
meta: &CacheMeta,
ctx: &mut Self::CTX,
_req: &RequestHeader,
) -> pingora::Result<bool> {
if !meta.is_fresh(SystemTime::now()) {
ctx.extensions
.insert(Cow::Borrowed("cache_state"), "expired".into());
return Ok(true);
}
ctx.extensions
.insert(Cow::Borrowed("cache_state"), "hit".into());
Ok(false)
}
fn response_cache_filter(
&self,
session: &Session,
resp: &ResponseHeader,
ctx: &mut Self::CTX,
) -> pingora::Result<RespCacheable> {
let container = process_route(ctx);
let Some(cache) = container.cache.as_ref() else {
return Ok(RespCacheable::Uncacheable(NoCacheReason::NeverEnabled));
};
if !CACHEABLE_METHODS.contains(&session.req_header().method) {
return Ok(RespCacheable::Uncacheable(NoCacheReason::Custom(
"method or status not cacheable",
)));
}
Ok(RespCacheable::Cacheable(CacheMeta::new(
SystemTime::now()
.checked_add(Duration::from_secs(cache.expires_in_secs))
.unwrap(),
SystemTime::now(),
cache.stale_while_revalidate_secs,
cache.stale_if_error_secs,
resp.clone(),
)))
}
}
fn get_uri(session: &mut Session) -> Uri {
session.req_header().uri.clone()
}
fn get_host(session: &mut Session) -> &str {
if let Some(host) = session.get_header(http::header::HOST) {
return host.to_str().unwrap_or("");
}
if let Some(host) = session.req_header().uri.host() {
return host;
}
""
}