use super::{
Error, get_bool_conf, get_hash_key, get_plugin_factory, get_str_conf,
get_str_slice_conf,
};
use async_trait::async_trait;
use bstr::ByteSlice;
use bytes::Bytes;
use bytesize::ByteSize;
use ctor::ctor;
use fancy_regex::Regex;
use http::{Method, StatusCode};
use humantime::parse_duration;
use pingap_cache::{HttpCache, new_cache_backend};
use pingap_config::{PluginCategory, PluginConf};
use pingap_core::{
Ctx, HttpResponse, Plugin, PluginStep, RequestPluginResult, get_cache_key,
get_client_ip,
};
use pingap_util::IpRules;
use pingora::cache::eviction::EvictionManager;
use pingora::cache::eviction::simple_lru::Manager;
use pingora::cache::key::CacheHashKey;
use pingora::cache::lock::{CacheKeyLock, CacheLock};
use pingora::cache::predictor::{CacheablePredictor, Predictor};
use pingora::proxy::Session;
use std::borrow::Cow;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::LazyLock;
use std::sync::OnceLock;
use std::time::Duration;
use tracing::{debug, error};
type Result<T> = std::result::Result<T, Error>;
static PREDICTOR: OnceLock<Predictor<32>> = OnceLock::new();
static EVICTION_MANAGER: OnceLock<Manager> = OnceLock::new();
static CACHE_LOCK_ONE_SECOND: LazyLock<
Box<dyn CacheKeyLock + std::marker::Send + Sync + 'static>,
> = LazyLock::new(|| CacheLock::new_boxed(std::time::Duration::from_secs(1)));
static CACHE_LOCK_TWO_SECONDS: LazyLock<
Box<dyn CacheKeyLock + std::marker::Send + Sync + 'static>,
> = LazyLock::new(|| CacheLock::new_boxed(std::time::Duration::from_secs(2)));
static CACHE_LOCK_THREE_SECONDS: LazyLock<
Box<dyn CacheKeyLock + std::marker::Send + Sync + 'static>,
> = LazyLock::new(|| CacheLock::new_boxed(std::time::Duration::from_secs(3)));
pub struct Cache {
plugin_step: PluginStep,
eviction: Option<&'static (dyn EvictionManager + Sync)>,
predictor: Option<&'static (dyn CacheablePredictor + Sync)>,
lock: Option<&'static (dyn CacheKeyLock + Send + Sync)>,
http_cache: &'static HttpCache,
max_file_size: usize,
max_ttl: Option<Duration>,
namespace: Option<String>,
headers: Option<Vec<String>>,
check_cache_control: bool,
purge_ip_rules: IpRules,
skip: Option<Regex>,
hash_value: String,
}
fn get_eviction_manager(cache_max_size: u64) -> &'static Manager {
EVICTION_MANAGER.get_or_init(|| Manager::new(cache_max_size as usize))
}
fn get_cache_lock(
lock: Duration,
) -> Option<&'static (dyn CacheKeyLock + Send + Sync)> {
match lock.as_secs() {
1 => Some(CACHE_LOCK_ONE_SECOND.as_ref()),
2 => Some(CACHE_LOCK_TWO_SECONDS.as_ref()),
3 => Some(CACHE_LOCK_THREE_SECONDS.as_ref()),
_ => None,
}
}
fn get_predictor() -> &'static (dyn CacheablePredictor + Sync) {
PREDICTOR.get_or_init(|| Predictor::new(128, None))
}
impl TryFrom<&PluginConf> for Cache {
type Error = Error;
fn try_from(value: &PluginConf) -> Result<Self> {
let hash_value = get_hash_key(value);
let directory = get_str_conf(value, "directory");
let cache = new_cache_backend(directory.as_str()).map_err(|e| {
Error::Invalid {
category: "cache".to_string(),
message: e.to_string(),
}
})?;
let cache_max_size = cache.max_size;
let eviction = if value.contains_key("eviction") && cache_max_size > 0 {
let eviction = get_eviction_manager(cache_max_size);
Some(eviction as &'static (dyn EvictionManager + Sync))
} else {
None
};
let lock = get_str_conf(value, "lock");
let lock = if !lock.is_empty() {
parse_duration(&lock).map_err(|e| Error::Invalid {
category: PluginCategory::Cache.to_string(),
message: e.to_string(),
})?
} else {
Duration::from_secs(1)
};
let max_ttl = get_str_conf(value, "max_ttl");
let max_ttl = if !max_ttl.is_empty() {
Some(parse_duration(&max_ttl).map_err(|e| Error::Invalid {
category: PluginCategory::Cache.to_string(),
message: e.to_string(),
})?)
} else {
None
};
let max_file_size = get_str_conf(value, "max_file_size");
let max_file_size = if !max_file_size.is_empty() {
ByteSize::from_str(&max_file_size).map_err(|e| Error::Invalid {
category: PluginCategory::Cache.to_string(),
message: e.to_string(),
})?
} else {
ByteSize::mb(1)
};
let namespace = get_str_conf(value, "namespace");
if !namespace.is_empty() && cache.directory.is_some() {
let path = format!(
"{}/{namespace}",
cache.directory.clone().unwrap_or_default()
);
if let Err(e) = std::fs::create_dir_all(&path) {
error!(
error = e.to_string(),
path, "create directory of cache fail"
);
}
}
let namespace = if namespace.is_empty() {
None
} else {
Some(namespace)
};
let headers = get_str_slice_conf(value, "headers");
let headers = if headers.is_empty() {
None
} else {
Some(headers)
};
let predictor = if value.contains_key("predictor") {
Some(get_predictor())
} else {
None
};
let purge_ip_rules =
IpRules::new(&get_str_slice_conf(value, "purge_ip_list"));
let skip_value = get_str_conf(value, "skip");
let skip = if skip_value.is_empty() {
None
} else {
Some(Regex::new(&skip_value).map_err(|e| Error::Regex {
category: "cache".to_string(),
source: Box::new(e),
})?)
};
let params = Self {
hash_value,
http_cache: cache,
plugin_step: PluginStep::Request,
eviction,
predictor,
lock: get_cache_lock(lock),
max_ttl,
max_file_size: max_file_size.as_u64() as usize,
namespace,
headers,
purge_ip_rules,
check_cache_control: get_bool_conf(value, "check_cache_control"),
skip,
};
Ok(params)
}
}
impl Cache {
pub fn new(params: &PluginConf) -> Result<Self> {
debug!(params = params.to_string(), "new http cache plugin");
Self::try_from(params)
}
}
static METHOD_PURGE: LazyLock<Method> = LazyLock::new(|| {
Method::from_bytes(b"PURGE").expect("Failed to create PURGE method")
});
#[async_trait]
impl Plugin for Cache {
#[inline]
fn config_key(&self) -> Cow<'_, str> {
Cow::Borrowed(&self.hash_value)
}
#[inline]
async fn handle_request(
&self,
step: PluginStep,
session: &mut Session,
ctx: &mut Ctx,
) -> pingora::Result<RequestPluginResult> {
if step != self.plugin_step {
return Ok(RequestPluginResult::Skipped);
}
let req_header = session.req_header();
let method = &req_header.method;
if ![&Method::GET, &Method::HEAD, &*METHOD_PURGE].contains(&method) {
return Ok(RequestPluginResult::Skipped);
}
if let Some(skip) = &self.skip
&& let Some(value) = req_header.uri.path_and_query()
&& skip.is_match(value.as_str()).unwrap_or_default()
{
return Ok(RequestPluginResult::Skipped);
}
let mut keys = Vec::with_capacity(4);
{
let cache_info = ctx.cache.get_or_insert_default();
cache_info.namespace = self.namespace.clone();
}
if let Some(headers) = &self.headers {
for key in headers.iter() {
let buf = session.get_header_bytes(key).to_str_lossy();
if !buf.is_empty() {
keys.push(buf.to_string());
}
}
}
if !keys.is_empty() {
ctx.extend_cache_keys(keys);
if let Some(cache_info) = &ctx.cache {
debug!("Cache keys: {:?}", cache_info.keys);
}
}
if method == *METHOD_PURGE {
let ip = ctx
.conn
.client_ip
.get_or_insert_with(|| get_client_ip(session));
let found = match self.purge_ip_rules.is_match(ip) {
Ok(matched) => matched,
Err(e) => {
return Ok(RequestPluginResult::Respond(
HttpResponse::bad_request(e.to_string()),
));
},
};
if !found {
return Ok(RequestPluginResult::Respond(HttpResponse {
status: StatusCode::FORBIDDEN,
body: Bytes::from_static(b"Forbidden, ip is not allowed"),
..Default::default()
}));
}
let key = get_cache_key(
ctx,
Method::GET.as_ref(),
&session.req_header().uri,
);
self.http_cache
.cache
.remove(&key.combined(), key.namespace())
.await?;
return Ok(
RequestPluginResult::Respond(HttpResponse::no_content()),
);
}
if let Some(cache_info) = &mut ctx.cache {
cache_info.max_ttl = self.max_ttl;
cache_info.check_cache_control = self.check_cache_control;
}
session.cache.enable(
self.http_cache,
self.eviction,
self.predictor,
self.lock,
None,
);
if self.max_file_size > 0 {
session.cache.set_max_file_size_bytes(self.max_file_size);
}
if let Some(stats) = self.http_cache.stats()
&& let Some(cache_info) = ctx.cache.as_mut()
{
cache_info.reading_count = Some(stats.reading);
cache_info.writing_count = Some(stats.writing);
}
Ok(RequestPluginResult::Continue)
}
}
#[ctor]
fn init() {
get_plugin_factory()
.register("cache", |params| Ok(Arc::new(Cache::new(params)?)));
}
#[cfg(test)]
mod tests {
use super::*;
use pingap_config::PluginConf;
use pingap_core::{Ctx, PluginStep};
use pingora::proxy::Session;
use pretty_assertions::assert_eq;
use tokio_test::io::Builder;
#[test]
fn test_cache_params() {
let params = Cache::try_from(
&toml::from_str::<PluginConf>(
r###"
eviction = true
headers = ["Accept-Encoding"]
lock = "2s"
max_file_size = "100kb"
predictor = true
max_ttl = "1m"
"###,
)
.unwrap(),
)
.unwrap();
assert_eq!(true, params.eviction.is_some());
assert_eq!(
r#"Some(["Accept-Encoding"])"#,
format!("{:?}", params.headers)
);
assert_eq!(true, params.lock.is_some());
assert_eq!(100 * 1000, params.max_file_size);
assert_eq!(60, params.max_ttl.unwrap().as_secs());
assert_eq!(true, params.predictor.is_some());
}
#[tokio::test]
async fn test_cache() {
let cache = Cache::try_from(
&toml::from_str::<PluginConf>(
r###"
namespace = "pingap"
eviction = true
headers = ["Accept-Encoding"]
purge_ip_list = ["127.0.0.1"]
lock = "2s"
max_file_size = "100kb"
predictor = true
max_ttl = "1m"
"###,
)
.unwrap(),
)
.unwrap();
let headers = ["Accept-Encoding: gzip"].join("\r\n");
let input_header =
format!("GET /vicanso/pingap?size=1 HTTP/1.1\r\n{headers}\r\n\r\n");
let mock_io = Builder::new().read(input_header.as_bytes()).build();
let mut session = Session::new_h1(Box::new(mock_io));
session.read_request().await.unwrap();
let mut ctx = Ctx::default();
cache
.handle_request(PluginStep::Request, &mut session, &mut ctx)
.await
.unwrap();
assert_eq!(
"pingap",
ctx.cache.as_ref().unwrap().namespace.as_ref().unwrap()
);
assert_eq!(
"gzip",
ctx.cache.as_ref().unwrap().keys.as_ref().unwrap().join(":")
);
assert_eq!(true, session.cache.enabled());
assert_eq!(100 * 1000, cache.max_file_size);
}
}