use std::{
borrow::{Borrow, Cow},
collections::BTreeMap,
sync::Arc,
time::Duration,
};
use async_trait::async_trait;
use pingora::{
protocols::ALPN,
upstreams::peer::{HttpPeer, PeerOptions, TcpKeepalive},
ErrorType::HTTPStatus,
};
use pingora_http::ResponseHeader;
use pingora_load_balancing::{selection::RoundRobin, LoadBalancer};
use pingora_proxy::{ProxyHttp, Session};
use tracing::info;
use crate::ROUTE_STORE;
pub const DEFAULT_PEER_OPTIONS: PeerOptions = PeerOptions {
verify_hostname: true,
read_timeout: Some(Duration::from_secs(30)),
connection_timeout: Some(Duration::from_secs(30)),
tcp_recv_buf: Some(2048),
tcp_keepalive: Some(TcpKeepalive {
count: 5,
interval: Duration::from_secs(10),
idle: Duration::from_secs(30),
}),
bind_to: None,
total_connection_timeout: None,
idle_timeout: None,
write_timeout: None,
verify_cert: false,
alternative_cn: None,
alpn: ALPN::H2H1,
ca: None,
no_header_eos: false,
h2_ping_interval: None,
max_h2_streams: 5,
extra_proxy_headers: BTreeMap::new(),
curves: None,
second_keyshare: true, tracer: None,
};
type ArcedLB = Arc<LoadBalancer<RoundRobin>>;
pub struct Router;
pub struct RouterContext {
pub host: Option<Cow<'static, str>>,
pub current_lb: Option<ArcedLB>,
}
#[async_trait]
impl ProxyHttp for Router {
type CTX = RouterContext;
fn new_ctx(&self) -> Self::CTX {
RouterContext {
host: None,
current_lb: None,
}
}
async fn request_filter(
&self,
session: &mut Session,
ctx: &mut Self::CTX,
) -> pingora::Result<bool> {
let req_host = get_host(session);
let host_without_port = req_host.split(':').collect::<Vec<_>>()[0];
let upstream_lb = ROUTE_STORE.get(host_without_port);
if upstream_lb.is_none() {
return Err(pingora::Error::new(HTTPStatus(404)));
}
ctx.host = Some(Cow::Owned(host_without_port.to_string()));
ctx.current_lb = Some(upstream_lb.unwrap().clone());
Ok(false)
}
async fn upstream_peer(
&self,
_session: &mut Session,
ctx: &mut Self::CTX,
) -> pingora::Result<Box<HttpPeer>> {
let upstream = ctx.current_lb.clone();
if upstream.is_none() {
return Err(pingora::Error::new(HTTPStatus(404)));
}
let healthy_upstream = upstream.unwrap().select(b"", 32);
if healthy_upstream.is_none() {
info!("No healthy upstream found");
return Err(pingora::Error::new(HTTPStatus(503)));
}
let host = ctx.host.as_ref().unwrap();
let _b: &str = host.borrow();
let host = ctx.host.clone().unwrap();
let mut peer = HttpPeer::new(healthy_upstream.unwrap(), false, host.into_owned());
peer.options = DEFAULT_PEER_OPTIONS;
Ok(Box::new(peer))
}
async fn response_filter(
&self,
_session: &mut Session,
_upstream_response: &mut ResponseHeader,
_ctx: &mut Self::CTX,
) -> pingora::Result<()> {
Ok(())
}
}
fn get_host(session: &mut Session) -> &str {
if let Some(host) = session.get_header(http::header::HOST) {
return host.to_str().unwrap_or("");
}
if let Some(host) = session.req_header().uri.host() {
return host;
}
""
}