use std::collections::BTreeMap;
use async_trait::async_trait;
use pingora::{server::Server, Error};
use pingora_core::{services::listening::Service, upstreams::peer::HttpPeer, Result};
use pingora_http::{RequestHeader, ResponseHeader};
use pingora_proxy::{HttpProxy, ProxyHttp, Session};
use crate::{
config::internal::{ListenerKind, PathControl, ProxyConfig},
proxy::request_modifiers::RequestModifyMod,
};
use self::response_modifiers::ResponseModifyMod;
pub mod request_modifiers;
pub mod response_modifiers;
pub struct RiverProxyService {
pub upstream: HttpPeer,
pub modifiers: Modifiers,
}
impl RiverProxyService {
pub fn from_basic_conf(conf: ProxyConfig, server: &Server) -> Service<HttpProxy<Self>> {
let modifiers = Modifiers::from_conf(&conf.path_control).unwrap();
let mut my_proxy = pingora_proxy::http_proxy_service_with_name(
&server.configuration,
Self {
upstream: conf.upstream,
modifiers,
},
&conf.name,
);
for list_cfg in conf.listeners {
match list_cfg.source {
ListenerKind::Tcp {
addr,
tls: Some(tls_cfg),
} => {
let cert_path = tls_cfg
.cert_path
.to_str()
.expect("cert path should be utf8");
let key_path = tls_cfg.key_path.to_str().expect("key path should be utf8");
my_proxy
.add_tls(&addr, cert_path, key_path)
.expect("adding TLS listener shouldn't fail");
}
ListenerKind::Tcp { addr, tls: None } => {
my_proxy.add_tcp(&addr);
}
ListenerKind::Uds(path) => {
let path = path.to_str().unwrap();
my_proxy.add_uds(path, None); }
}
}
my_proxy
}
}
pub struct Modifiers {
pub upstream_request_filters: Vec<Box<dyn RequestModifyMod>>,
pub upstream_response_filters: Vec<Box<dyn ResponseModifyMod>>,
}
impl Modifiers {
pub fn from_conf(conf: &PathControl) -> Result<Self> {
let mut conf = conf.clone();
let mut upstream_request_filters: Vec<Box<dyn RequestModifyMod>> = vec![];
for mut filter in conf.upstream_request_filters.drain(..) {
let kind = filter.remove("kind").unwrap();
let f: Box<dyn RequestModifyMod> = match kind.as_str() {
"remove-header-key-regex" => Box::new(
request_modifiers::RemoveHeaderKeyRegex::from_settings(filter).unwrap(),
),
"upsert-header" => {
Box::new(request_modifiers::UpsertHeader::from_settings(filter).unwrap())
}
_ => panic!(),
};
upstream_request_filters.push(f);
}
let mut upstream_response_filters: Vec<Box<dyn ResponseModifyMod>> = vec![];
for mut filter in conf.upstream_response_filters.drain(..) {
let kind = filter.remove("kind").unwrap();
let f: Box<dyn ResponseModifyMod> = match kind.as_str() {
"remove-header-key-regex" => Box::new(
response_modifiers::RemoveHeaderKeyRegex::from_settings(filter).unwrap(),
),
"upsert-header" => {
Box::new(response_modifiers::UpsertHeader::from_settings(filter).unwrap())
}
_ => panic!(),
};
upstream_response_filters.push(f);
}
Ok(Self {
upstream_request_filters,
upstream_response_filters,
})
}
}
pub struct RiverContext {}
#[async_trait]
impl ProxyHttp for RiverProxyService {
type CTX = RiverContext;
fn new_ctx(&self) -> Self::CTX {
RiverContext {}
}
async fn upstream_peer(
&self,
_session: &mut Session,
_ctx: &mut Self::CTX,
) -> Result<Box<HttpPeer>> {
Ok(Box::new(self.upstream.clone()))
}
async fn upstream_request_filter(
&self,
session: &mut Session,
header: &mut RequestHeader,
ctx: &mut Self::CTX,
) -> Result<()> {
for filter in &self.modifiers.upstream_request_filters {
filter.upstream_request_filter(session, header, ctx).await?;
}
Ok(())
}
fn upstream_response_filter(
&self,
session: &mut Session,
upstream_response: &mut ResponseHeader,
ctx: &mut Self::CTX,
) {
for filter in &self.modifiers.upstream_response_filters {
filter.upstream_response_filter(session, upstream_response, ctx);
}
}
}
fn extract_val(key: &str, map: &mut BTreeMap<String, String>) -> Result<String> {
map.remove(key).ok_or_else(|| {
tracing::error!("Missing key: '{key}'");
Error::new_str("Missing configuration field!")
})
}
fn ensure_empty(map: &BTreeMap<String, String>) -> Result<()> {
if !map.is_empty() {
let keys = map.keys().map(String::as_str).collect::<Vec<&str>>();
let all_keys = keys.join(", ");
tracing::error!("Extra keys found: '{all_keys}'");
Err(Error::new_str("Extra settings found!"))
} else {
Ok(())
}
}