use std::{collections::BTreeMap, sync::Arc, time::Duration};
use async_trait::async_trait;
use pingora::{
protocols::ALPN,
upstreams::peer::{HttpPeer, PeerOptions, TcpKeepalive},
ErrorType::HTTPStatus,
};
use pingora_http::ResponseHeader;
use pingora_proxy::{ProxyHttp, Session};
use tracing::info;
use crate::stores::routes::{RouteStore, RouteStoreContainer};
pub const DEFAULT_PEER_OPTIONS: PeerOptions = PeerOptions {
verify_hostname: true,
read_timeout: Some(Duration::from_secs(30)),
connection_timeout: Some(Duration::from_secs(30)),
tcp_recv_buf: None,
tcp_keepalive: Some(TcpKeepalive {
count: 5,
interval: Duration::from_secs(15),
idle: Duration::from_secs(60),
}),
bind_to: None,
total_connection_timeout: None,
idle_timeout: None,
write_timeout: None,
verify_cert: false,
alternative_cn: None,
alpn: ALPN::H2H1,
ca: None,
no_header_eos: false,
h2_ping_interval: None,
max_h2_streams: 1,
extra_proxy_headers: BTreeMap::new(),
curves: None,
second_keyshare: true, tracer: None,
};
pub struct Router {
pub store: RouteStore,
}
pub struct RouterContext {
pub host: String,
pub route_container: Option<Arc<RouteStoreContainer>>,
}
#[async_trait]
impl ProxyHttp for Router {
type CTX = RouterContext;
fn new_ctx(&self) -> Self::CTX {
RouterContext {
host: String::new(),
route_container: None,
}
}
async fn request_filter(
&self,
session: &mut Session,
ctx: &mut Self::CTX,
) -> pingora::Result<bool> {
let req_host = get_host(session);
let host_without_port = req_host.split(':').collect::<Vec<_>>()[0];
let route_container = self.store.get(host_without_port);
if route_container.is_none() {
session.respond_error(404).await;
return Ok(true);
}
let uri = session.req_header().uri.clone();
let route_container = route_container.unwrap();
match &route_container.path_matcher.pattern {
Some(pattern) if pattern.find(uri.path()).is_none() => {
session.respond_error(404).await;
return Ok(true);
}
_ => {}
}
ctx.host = host_without_port.to_string();
ctx.route_container = Some(route_container.value().clone());
Ok(false)
}
async fn upstream_peer(
&self,
_session: &mut Session,
ctx: &mut Self::CTX,
) -> pingora::Result<Box<HttpPeer>> {
let upstream = ctx.route_container.as_ref();
if upstream.is_none() {
return Err(pingora::Error::new(HTTPStatus(404)));
}
let upstream = upstream.unwrap();
let healthy_upstream = upstream.load_balancer.select(b"", 32);
if healthy_upstream.is_none() {
info!("No healthy upstream found");
return Err(pingora::Error::new(HTTPStatus(503)));
}
let mut peer = HttpPeer::new(healthy_upstream.unwrap(), false, ctx.host.clone());
peer.options = DEFAULT_PEER_OPTIONS;
Ok(Box::new(peer))
}
async fn response_filter(
&self,
_session: &mut Session,
upstream_response: &mut ResponseHeader,
ctx: &mut Self::CTX,
) -> pingora::Result<()> {
let container = ctx.route_container.as_ref().unwrap();
for (name, value) in &container.host_header_add {
upstream_response.insert_header(name, value)?;
}
for name in &container.host_header_remove {
upstream_response.remove_header(name);
}
Ok(())
}
}
fn get_host(session: &Session) -> &str {
if let Some(host) = session.get_header(http::header::HOST) {
return host.to_str().unwrap_or("");
}
if let Some(host) = session.req_header().uri.host() {
return host;
}
""
}