use super::{
Error, get_bool_conf, get_hash_key, get_int_conf, get_plugin_factory,
get_str_conf,
};
use async_trait::async_trait;
use ctor::ctor;
use pingap_config::{PluginCategory, PluginConf};
use pingap_core::{Ctx, Plugin, PluginStep, RequestPluginResult};
use pingap_core::{get_cookie_value, get_req_header_value};
use pingora::proxy::Session;
use rand::{RngExt, rng};
use regex::Regex;
use std::borrow::Cow;
use std::sync::Arc;
use tracing::debug;
type Result<T, E = Error> = std::result::Result<T, E>;
pub struct TrafficSplitting {
hash_value: String,
upstream: Arc<str>,
weight: u8,
stickiness: bool,
sticky_header: Option<String>,
sticky_cookie: Option<String>,
matcher: Option<Regex>,
}
impl TryFrom<&PluginConf> for TrafficSplitting {
type Error = Error;
fn try_from(value: &PluginConf) -> Result<Self> {
let upstream = get_str_conf(value, "upstream");
if upstream.is_empty() {
return Err(Error::Invalid {
category: PluginCategory::TrafficSplitting.to_string(),
message: "upstream is not allowed empty".to_string(),
});
}
let weight = (get_int_conf(value, "weight").clamp(0, 100)) as u8;
let stickiness = get_bool_conf(value, "stickiness");
let sticky_cookie = get_str_conf(value, "sticky_cookie");
let sticky_cookie = if sticky_cookie.is_empty() {
None
} else {
Some(sticky_cookie)
};
let sticky_header = get_str_conf(value, "sticky_header");
let sticky_header = if sticky_header.is_empty() {
None
} else {
Some(sticky_header)
};
let matcher = get_str_conf(value, "matcher");
let matcher = if matcher.is_empty() {
None
} else {
Some(Regex::new(matcher.as_str()).map_err(|e| Error::Invalid {
category: PluginCategory::TrafficSplitting.to_string(),
message: e.to_string(),
})?)
};
if stickiness && sticky_cookie.is_none() && sticky_header.is_none() {
return Err(Error::Invalid {
category: PluginCategory::TrafficSplitting.to_string(),
message: "sticky_cookie and sticky_header cannot both be empty"
.to_string(),
});
}
Ok(Self {
hash_value: get_hash_key(value),
upstream: upstream.into(),
weight,
stickiness,
sticky_cookie,
sticky_header,
matcher,
})
}
}
impl TrafficSplitting {
pub fn new(params: &PluginConf) -> Result<Self> {
debug!(params = params.to_string(), "new traffic splitting plugin");
TrafficSplitting::try_from(params)
}
fn get_sticky_value<'a>(&self, session: &'a Session) -> Option<&'a str> {
if let Some(sticky_cookie) = &self.sticky_cookie {
return get_cookie_value(session.req_header(), sticky_cookie);
}
if let Some(sticky_header) = &self.sticky_header {
return get_req_header_value(session.req_header(), sticky_header);
}
None
}
}
#[async_trait]
impl Plugin for TrafficSplitting {
#[inline]
fn config_key(&self) -> Cow<'_, str> {
Cow::Borrowed(&self.hash_value)
}
#[inline]
async fn handle_request(
&self,
step: PluginStep,
session: &mut Session,
ctx: &mut Ctx,
) -> pingora::Result<RequestPluginResult> {
if step != PluginStep::Request {
return Ok(RequestPluginResult::Skipped);
}
let roll_value = if self.stickiness {
self.get_sticky_value(session)
.map(|value| {
if let Some(matcher) = &self.matcher {
if matcher.is_match(value) {
return 0;
} else {
return u8::MAX;
}
}
(crc32fast::hash(value.as_bytes()) % 100) as u8
})
.unwrap_or(u8::MAX)
} else {
rng().random_range(..100)
};
if roll_value < self.weight {
ctx.upstream.name = self.upstream.clone();
}
Ok(RequestPluginResult::Continue)
}
}
#[ctor]
fn init() {
get_plugin_factory().register("traffic_splitting", |params| {
Ok(Arc::new(TrafficSplitting::new(params)?))
});
}
#[cfg(test)]
mod tests {
use super::*;
use http::header::{COOKIE, HOST};
use pingap_config::PluginConf;
use pingap_core::{Ctx, RequestPluginResult};
use pingora::proxy::Session;
use pretty_assertions::assert_eq;
use tokio_test::io::Builder;
async fn create_test_session(headers: &[(&str, &str)]) -> Session {
let mut header_builder = Builder::new();
header_builder.read(b"GET / HTTP/1.1\r\n");
for (key, value) in headers {
header_builder.read(format!("{}: {}\r\n", key, value).as_bytes());
}
header_builder.read(b"\r\n");
let mut session = Session::new_h1(Box::new(header_builder.build()));
session.read_request().await.unwrap();
session
}
fn create_plugin_conf(
upstream: &str,
weight: u8,
stickiness: bool,
sticky_cookie: &str,
) -> PluginConf {
let config = format!(
r###"
upstream = "{upstream}"
weight = {weight}
stickiness = {stickiness}
sticky_cookie = "{sticky_cookie}"
"###
);
toml::from_str::<PluginConf>(&config).unwrap()
}
#[test]
fn test_config_parsing() {
let conf = create_plugin_conf("new-upstream", 50, true, "user-id");
let plugin = TrafficSplitting::try_from(&conf).unwrap();
assert_eq!(plugin.upstream.as_ref(), "new-upstream");
assert_eq!(plugin.weight, 50);
assert!(plugin.stickiness);
assert_eq!(plugin.sticky_cookie, Some("user-id".to_string()));
assert_eq!(plugin.sticky_header, None);
let conf = create_plugin_conf("", 50, false, "");
let result = TrafficSplitting::try_from(&conf);
assert!(result.is_err());
assert_eq!(
result.err().unwrap().to_string(),
"Plugin traffic_splitting invalid, message: upstream is not allowed empty"
);
let conf = create_plugin_conf("new-upstream", 50, true, "");
let result = TrafficSplitting::try_from(&conf);
assert!(result.is_err());
assert_eq!(
result.err().unwrap().to_string(),
"Plugin traffic_splitting invalid, message: sticky_cookie and sticky_header cannot both be empty"
);
let conf = create_plugin_conf("new-upstream", 150, false, "");
let plugin = TrafficSplitting::try_from(&conf).unwrap();
assert_eq!(plugin.weight, 100);
}
#[tokio::test]
async fn test_handle_request_non_sticky() {
let conf = create_plugin_conf("new-upstream", 100, false, "");
let plugin = TrafficSplitting::try_from(&conf).unwrap();
let mut session =
create_test_session(&[(HOST.as_str(), "example.com")]).await;
let mut ctx = Ctx::default();
let result = plugin
.handle_request(PluginStep::Request, &mut session, &mut ctx)
.await
.unwrap();
assert_eq!(true, result == RequestPluginResult::Continue);
assert_eq!(ctx.upstream.name.as_ref(), "new-upstream");
let conf = create_plugin_conf("new-upstream", 0, false, "");
let plugin = TrafficSplitting::try_from(&conf).unwrap();
let mut session =
create_test_session(&[(HOST.as_str(), "example.com")]).await;
let mut ctx = Ctx::default();
plugin
.handle_request(PluginStep::Request, &mut session, &mut ctx)
.await
.unwrap();
assert_eq!(ctx.upstream.name.as_ref(), ""); }
#[tokio::test]
async fn test_handle_request_sticky() {
let conf = create_plugin_conf("new-upstream", 50, true, "user-id");
let plugin = TrafficSplitting::try_from(&conf).unwrap();
let mut ctx = Ctx::default();
let mut session_no_cookie =
create_test_session(&[(HOST.as_str(), "example.com")]).await;
plugin
.handle_request(
PluginStep::Request,
&mut session_no_cookie,
&mut ctx,
)
.await
.unwrap();
assert_eq!(ctx.upstream.name.as_ref(), "");
let mut session_should_split = create_test_session(&[
(HOST.as_str(), "example.com"),
(COOKIE.as_str(), "user-id=user-a"),
])
.await;
plugin
.handle_request(
PluginStep::Request,
&mut session_should_split,
&mut ctx,
)
.await
.unwrap();
assert_eq!(ctx.upstream.name.as_ref(), "new-upstream");
ctx.upstream.name = String::new().into(); let mut session_should_not_split = create_test_session(&[
(HOST.as_str(), "example.com"),
(COOKIE.as_str(), "user-id=user-b"),
])
.await;
plugin
.handle_request(
PluginStep::Request,
&mut session_should_not_split,
&mut ctx,
)
.await
.unwrap();
assert_eq!(ctx.upstream.name.as_ref(), "");
}
#[tokio::test]
async fn test_handle_request_wrong_step() {
let conf = create_plugin_conf("new-upstream", 100, false, "");
let plugin = TrafficSplitting::try_from(&conf).unwrap();
let mut session =
create_test_session(&[(HOST.as_str(), "example.com")]).await;
let mut ctx = Ctx::default();
let result = plugin
.handle_request(PluginStep::ProxyUpstream, &mut session, &mut ctx)
.await
.unwrap();
assert_eq!(true, result == RequestPluginResult::Skipped);
assert_eq!(ctx.upstream.name.as_ref(), "");
}
}