use super::{
Error, get_bool_conf, get_hash_key, get_int_conf, get_plugin_factory,
get_str_conf,
};
use async_trait::async_trait;
use ctor::ctor;
use http::HeaderValue;
use http::header::{
ACCEPT_ENCODING, CONTENT_ENCODING, CONTENT_LENGTH, CONTENT_TYPE,
TRANSFER_ENCODING,
};
use pingap_config::PluginConf;
use pingap_core::HTTP_HEADER_TRANSFER_CHUNKED;
use pingap_core::{
Ctx, ModifyResponseBody, Plugin, PluginStep, RequestPluginResult,
ResponseBodyPluginResult, ResponsePluginResult, new_internal_error,
};
use pingora::http::ResponseHeader;
use pingora::modules::http::compression::ResponseCompression;
use pingora::protocols::http::compression::{Algorithm, Encode};
use pingora::proxy::Session;
use std::borrow::Cow;
use std::str::FromStr;
use std::sync::Arc;
use tracing::debug;
type Result<T, E = Error> = std::result::Result<T, E>;
const ZSTD: &str = "zstd"; const BR: &str = "br"; const GZIP: &str = "gzip";
const UPSTREAM_RESPONSE_COMPRESS_MODE: &str = "upstream";
const PLUGIN_ID: &str = "_compress_";
struct Compressor {
compressor: Box<dyn Encode + Send + Sync>,
}
impl Compressor {
fn new(algorithm: Algorithm, level: u32) -> pingora::Result<Self> {
let compressor = algorithm.compressor(level).ok_or_else(|| {
new_internal_error(
500,
format!(
"Compress algorithm {} is not supported",
algorithm.as_str()
),
)
})?;
Ok(Self { compressor })
}
}
impl ModifyResponseBody for Compressor {
fn handle(
&mut self,
_session: &Session,
body: &mut Option<bytes::Bytes>,
end_of_stream: bool,
) -> pingora::Result<()> {
let input_data =
body.as_ref().map(|data| data.as_ref()).unwrap_or_default();
let data = self
.compressor
.as_mut()
.encode(input_data, end_of_stream)
.map_err(|e| new_internal_error(500, e))?;
*body = Some(data);
Ok(())
}
fn name(&self) -> String {
"compression".to_string()
}
}
pub struct Compression {
gzip_level: u32,
br_level: u32,
zstd_level: u32,
support_compression: bool,
decompression: Option<bool>,
plugin_step: PluginStep,
mode: String,
min_length: u64,
hash_value: String,
}
impl TryFrom<&PluginConf> for Compression {
type Error = Error;
fn try_from(value: &PluginConf) -> Result<Self> {
let hash_value = get_hash_key(value);
let mut decompression = None;
if value.contains_key("decompression") {
decompression = Some(get_bool_conf(value, "decompression"));
}
let gzip_level = get_int_conf(value, "gzip_level").min(9) as u32;
let br_level = get_int_conf(value, "br_level").min(11) as u32;
let zstd_level = get_int_conf(value, "zstd_level").min(22) as u32;
let mode = get_str_conf(value, "mode");
let support_compression = gzip_level + br_level + zstd_level > 0;
let min_length = get_int_conf(value, "min_length") as u64;
let params = Self {
hash_value,
gzip_level,
br_level,
zstd_level,
decompression,
support_compression,
mode,
min_length,
plugin_step: PluginStep::EarlyRequest,
};
Ok(params)
}
}
impl Compression {
pub fn new(params: &PluginConf) -> Result<Self> {
debug!(params = params.to_string(), "new compression plugin");
Self::try_from(params)
}
fn get_compress_level(&self, session: &Session) -> (u32, u32, u32) {
let header = session.req_header();
let Some(accept_encoding) = header.headers.get(ACCEPT_ENCODING) else {
return (0, 0, 0);
};
let accept_encoding = accept_encoding.to_str().unwrap_or_default();
if accept_encoding.is_empty() {
return (0, 0, 0);
}
let mut zstd_level = 0;
let mut br_level = 0;
let mut gzip_level = 0;
if self.zstd_level > 0 && accept_encoding.contains(ZSTD) {
zstd_level = self.zstd_level;
}
if self.br_level > 0 && accept_encoding.contains(BR) {
br_level = self.br_level;
}
if self.gzip_level > 0 && accept_encoding.contains(GZIP) {
gzip_level = self.gzip_level;
}
(zstd_level, br_level, gzip_level)
}
}
#[async_trait]
impl Plugin for Compression {
#[inline]
fn config_key(&self) -> Cow<'_, str> {
Cow::Borrowed(&self.hash_value)
}
#[inline]
async fn handle_request(
&self,
step: PluginStep,
session: &mut Session,
ctx: &mut Ctx,
) -> pingora::Result<RequestPluginResult> {
if step == PluginStep::EarlyRequest {
if self.mode == UPSTREAM_RESPONSE_COMPRESS_MODE {
let (zstd_level, br_level, gzip_level) =
self.get_compress_level(session);
let key = if zstd_level > 0 {
ZSTD
} else if br_level > 0 {
BR
} else if gzip_level > 0 {
GZIP
} else {
""
};
if !key.is_empty() {
ctx.push_cache_key(key.to_string());
}
}
if self.decompression.unwrap_or_default()
&& let Some(c) = session
.downstream_modules_ctx
.get_mut::<ResponseCompression>()
{
c.adjust_decompression(true);
}
}
if step != self.plugin_step {
return Ok(RequestPluginResult::Skipped);
}
if !self.support_compression {
return Ok(RequestPluginResult::Skipped);
}
if self.mode == UPSTREAM_RESPONSE_COMPRESS_MODE {
return Ok(RequestPluginResult::Skipped);
}
let (zstd_level, br_level, gzip_level) =
self.get_compress_level(session);
debug!(
zstd_level,
br_level, gzip_level, "response compression level"
);
if zstd_level == 0 && br_level == 0 && gzip_level == 0 {
return Ok(RequestPluginResult::Skipped);
}
let Some(c) = session
.downstream_modules_ctx
.get_mut::<ResponseCompression>()
else {
return Ok(RequestPluginResult::Skipped);
};
if zstd_level > 0 {
c.adjust_algorithm_level(Algorithm::Zstd, zstd_level);
}
if br_level > 0 {
c.adjust_algorithm_level(Algorithm::Brotli, br_level);
}
if gzip_level > 0 {
c.adjust_algorithm_level(Algorithm::Gzip, gzip_level);
}
Ok(RequestPluginResult::Continue)
}
fn handle_upstream_response(
&self,
session: &mut Session,
ctx: &mut Ctx,
upstream_response: &mut ResponseHeader,
) -> pingora::Result<ResponsePluginResult> {
if !self.support_compression
|| self.mode != UPSTREAM_RESPONSE_COMPRESS_MODE
{
return Ok(ResponsePluginResult::Unchanged);
}
if upstream_response.headers.contains_key(CONTENT_ENCODING) {
return Ok(ResponsePluginResult::Unchanged);
}
let Some(content_type) = upstream_response.headers.get(CONTENT_TYPE)
else {
return Ok(ResponsePluginResult::Unchanged);
};
if !is_compressible_content_type(content_type) {
return Ok(ResponsePluginResult::Unchanged);
}
let (zstd_level, br_level, gzip_level) =
self.get_compress_level(session);
if zstd_level == 0 && br_level == 0 && gzip_level == 0 {
return Ok(ResponsePluginResult::Unchanged);
}
if self.min_length > 0 {
let is_too_small = upstream_response
.headers
.get(CONTENT_LENGTH)
.and_then(|header| header.to_str().ok())
.and_then(|s| s.parse::<u64>().ok())
.map(|content_length| content_length < self.min_length)
.unwrap_or(false);
if is_too_small {
return Ok(ResponsePluginResult::Unchanged);
}
}
debug!(
zstd_level,
br_level, gzip_level, "upstream response body compression level"
);
upstream_response.remove_header(&CONTENT_LENGTH);
let _ = upstream_response.insert_header(
TRANSFER_ENCODING,
HTTP_HEADER_TRANSFER_CHUNKED.1.clone(),
);
let (handler, encoding) = if zstd_level > 0 {
(
Box::new(Compressor::new(Algorithm::Zstd, zstd_level)?),
ZSTD,
)
} else if br_level > 0 {
(Box::new(Compressor::new(Algorithm::Brotli, br_level)?), BR)
} else {
(
Box::new(Compressor::new(Algorithm::Gzip, gzip_level)?),
GZIP,
)
};
ctx.add_modify_body_handler(PLUGIN_ID, handler);
let _ = upstream_response.insert_header(CONTENT_ENCODING, encoding);
Ok(ResponsePluginResult::Modified)
}
fn handle_upstream_response_body(
&self,
session: &mut Session,
ctx: &mut Ctx,
body: &mut Option<bytes::Bytes>,
end_of_stream: bool,
) -> pingora::Result<ResponseBodyPluginResult> {
if let Some(modifier) = ctx.get_modify_body_handler(PLUGIN_ID) {
modifier.handle(session, body, end_of_stream)?;
let result = if end_of_stream {
ResponseBodyPluginResult::FullyReplaced
} else {
ResponseBodyPluginResult::PartialReplaced
};
Ok(result)
} else {
Ok(ResponseBodyPluginResult::Unchanged)
}
}
}
fn is_compressible_content_type(content_type: &HeaderValue) -> bool {
let Ok(content_type) = content_type.to_str() else {
return false;
};
let Ok(mime) = mime_guess::Mime::from_str(content_type) else {
return false;
};
match mime.essence_str() {
"application/json" | "application/xml" | "text/html" => true,
_ => mime.type_() == "text",
}
}
#[ctor]
fn init() {
get_plugin_factory().register("compression", |params| {
Ok(Arc::new(Compression::new(params)?))
});
}
#[cfg(test)]
mod tests {
use super::*;
use pingap_config::PluginConf;
use pingap_core::{Ctx, PluginStep, RequestPluginResult};
use pingora::modules::http::HttpModules;
use pingora::modules::http::compression::{
ResponseCompression, ResponseCompressionBuilder,
};
use pingora::proxy::Session;
use pretty_assertions::assert_eq;
use tokio_test::io::Builder;
#[test]
fn test_compression_params() {
let params = Compression::try_from(
&toml::from_str::<PluginConf>(
r###"
step = "early_request"
gzip_level = 9
br_level = 8
zstd_level = 6
"###,
)
.unwrap(),
)
.unwrap();
assert_eq!("early_request", params.plugin_step.to_string());
assert_eq!(9, params.gzip_level);
assert_eq!(8, params.br_level);
assert_eq!(6, params.zstd_level);
}
#[tokio::test]
async fn test_compression() {
let compression = Compression::new(
&toml::from_str::<PluginConf>(
r###"
step = "early_request"
gzip_level = 9
br_level = 8
zstd_level = 7
"###,
)
.unwrap(),
)
.unwrap();
let headers = ["Accept-Encoding: gzip"].join("\r\n");
let input_header =
format!("GET /vicanso/pingap?size=1 HTTP/1.1\r\n{headers}\r\n\r\n");
let mock_io = Builder::new().read(input_header.as_bytes()).build();
let mut modules = HttpModules::new();
modules.add_module(ResponseCompressionBuilder::enable(0));
let mut session =
Session::new_h1_with_modules(Box::new(mock_io), &modules);
session.read_request().await.unwrap();
let result = compression
.handle_request(
PluginStep::EarlyRequest,
&mut session,
&mut Ctx::default(),
)
.await
.unwrap();
assert_eq!(true, result == RequestPluginResult::Continue);
assert_eq!(
true,
session
.downstream_modules_ctx
.get::<ResponseCompression>()
.unwrap()
.is_enabled()
);
let headers = ["Accept-Encoding: br"].join("\r\n");
let input_header =
format!("GET /vicanso/pingap?size=1 HTTP/1.1\r\n{headers}\r\n\r\n");
let mock_io = Builder::new().read(input_header.as_bytes()).build();
let mut modules = HttpModules::new();
modules.add_module(ResponseCompressionBuilder::enable(0));
let mut session =
Session::new_h1_with_modules(Box::new(mock_io), &modules);
session.read_request().await.unwrap();
let result = compression
.handle_request(
PluginStep::EarlyRequest,
&mut session,
&mut Ctx::default(),
)
.await
.unwrap();
assert_eq!(true, result == RequestPluginResult::Continue);
assert_eq!(
true,
session
.downstream_modules_ctx
.get::<ResponseCompression>()
.unwrap()
.is_enabled()
);
let headers = ["Accept-Encoding: zstd"].join("\r\n");
let input_header =
format!("GET /vicanso/pingap?size=1 HTTP/1.1\r\n{headers}\r\n\r\n");
let mock_io = Builder::new().read(input_header.as_bytes()).build();
let mut modules = HttpModules::new();
modules.add_module(ResponseCompressionBuilder::enable(0));
let mut session =
Session::new_h1_with_modules(Box::new(mock_io), &modules);
session.read_request().await.unwrap();
let result = compression
.handle_request(
PluginStep::EarlyRequest,
&mut session,
&mut Ctx::default(),
)
.await
.unwrap();
assert_eq!(true, result == RequestPluginResult::Continue);
assert_eq!(
true,
session
.downstream_modules_ctx
.get::<ResponseCompression>()
.unwrap()
.is_enabled()
);
let headers = ["Accept-Encoding: none"].join("\r\n");
let input_header =
format!("GET /vicanso/pingap?size=1 HTTP/1.1\r\n{headers}\r\n\r\n");
let mock_io = Builder::new().read(input_header.as_bytes()).build();
let mut modules = HttpModules::new();
modules.add_module(ResponseCompressionBuilder::enable(0));
let mut session =
Session::new_h1_with_modules(Box::new(mock_io), &modules);
session.read_request().await.unwrap();
let result = compression
.handle_request(
PluginStep::EarlyRequest,
&mut session,
&mut Ctx::default(),
)
.await
.unwrap();
assert_eq!(true, result == RequestPluginResult::Skipped);
assert_eq!(
false,
session
.downstream_modules_ctx
.get::<ResponseCompression>()
.unwrap()
.is_enabled()
);
}
}