use once_cell::sync::{Lazy, OnceCell};
use parking_lot::RwLock;
use pingora_cache::eviction::simple_lru::Manager as LruEvictionManager;
use pingora_cache::eviction::EvictionManager;
use pingora_cache::lock::CacheLock;
use pingora_cache::storage::Storage;
use pingora_cache::MemCache;
use regex::Regex;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tracing::{debug, error, info, trace, warn};
use crate::disk_cache::DiskCacheStorage;
use crate::hybrid_cache::HybridCacheStorage;
use zentinel_config::{CacheBackend, CacheStorageConfig};
const DEFAULT_CACHE_SIZE_BYTES: usize = 100 * 1024 * 1024;
const DEFAULT_EVICTION_LIMIT_BYTES: usize = 100 * 1024 * 1024;
const DEFAULT_LOCK_TIMEOUT_SECS: u64 = 10;
static CACHE_CONFIG: OnceCell<CacheStorageConfig> = OnceCell::new();
pub fn configure_cache(config: CacheStorageConfig) -> bool {
match CACHE_CONFIG.set(config) {
Ok(()) => {
info!("Cache storage configured");
true
}
Err(_) => {
warn!("Cache already initialized, configuration ignored");
false
}
}
}
fn get_cache_config() -> &'static CacheStorageConfig {
CACHE_CONFIG.get_or_init(CacheStorageConfig::default)
}
pub fn is_cache_enabled() -> bool {
get_cache_config().enabled
}
static HTTP_CACHE_STORAGE: Lazy<&'static (dyn Storage + Sync)> = Lazy::new(|| {
let config = get_cache_config();
info!(
cache_size_mb = config.max_size_bytes / 1024 / 1024,
backend = ?config.backend,
"Initializing HTTP cache storage"
);
match config.backend {
CacheBackend::Memory => Box::leak(Box::new(MemCache::new())),
CacheBackend::Disk => {
let path = config
.disk_path
.as_ref()
.expect("disk-path is required for disk backend (validated by config parser)");
Box::leak(Box::new(DiskCacheStorage::new(
path,
config.disk_shards,
config.max_size_bytes,
)))
}
CacheBackend::Hybrid => {
let path = config
.disk_path
.as_ref()
.expect("disk-path is required for hybrid backend (validated by config parser)");
let disk_max = config.disk_max_size_bytes.unwrap_or(config.max_size_bytes);
let memory: &'static MemCache = Box::leak(Box::new(MemCache::new()));
let disk: &'static DiskCacheStorage = Box::leak(Box::new(DiskCacheStorage::new(
path,
config.disk_shards,
disk_max,
)));
Box::leak(Box::new(HybridCacheStorage::new(memory, disk)))
}
}
});
static HTTP_CACHE_EVICTION: Lazy<LruEvictionManager> = Lazy::new(|| {
let config = get_cache_config();
let limit = config.eviction_limit_bytes.unwrap_or(config.max_size_bytes);
info!(
eviction_limit_mb = limit / 1024 / 1024,
"Initializing HTTP cache eviction manager"
);
LruEvictionManager::new(limit)
});
static HTTP_CACHE_LOCK: Lazy<CacheLock> = Lazy::new(|| {
let config = get_cache_config();
info!(
lock_timeout_secs = config.lock_timeout_secs,
"Initializing HTTP cache lock"
);
CacheLock::new(Duration::from_secs(config.lock_timeout_secs))
});
pub fn get_cache_storage() -> &'static (dyn Storage + Sync) {
*HTTP_CACHE_STORAGE
}
pub fn get_cache_eviction() -> &'static LruEvictionManager {
&HTTP_CACHE_EVICTION
}
pub fn get_cache_lock() -> &'static CacheLock {
&HTTP_CACHE_LOCK
}
pub async fn init_disk_cache_state() {
let config = get_cache_config();
if matches!(config.backend, CacheBackend::Disk | CacheBackend::Hybrid) {
let path = config.disk_path.as_ref().unwrap();
let eviction = get_cache_eviction();
let eviction_dir = path.join("eviction");
if eviction_dir.exists() {
if let Err(e) = eviction
.load(eviction_dir.to_str().unwrap_or_default())
.await
{
warn!(error = %e, "Failed to load saved eviction state, rebuilding from disk");
} else {
info!("Loaded saved eviction state");
}
}
crate::disk_cache::rebuild_eviction_state(path, config.disk_shards, eviction).await;
}
}
pub async fn save_disk_cache_state() {
let config = get_cache_config();
if matches!(config.backend, CacheBackend::Disk | CacheBackend::Hybrid) {
let eviction_path = config.disk_path.as_ref().unwrap().join("eviction");
if let Err(e) = std::fs::create_dir_all(&eviction_path) {
error!(error = %e, "Failed to create eviction state directory");
return;
}
if let Err(e) = get_cache_eviction()
.save(eviction_path.to_str().unwrap_or_default())
.await
{
error!(error = %e, "Failed to save eviction state");
} else {
info!("Saved disk cache eviction state");
}
}
}
#[derive(Debug, Clone)]
pub struct CacheConfig {
pub enabled: bool,
pub default_ttl_secs: u64,
pub max_size_bytes: usize,
pub cache_private: bool,
pub stale_while_revalidate_secs: u64,
pub stale_if_error_secs: u64,
pub cacheable_methods: Vec<String>,
pub cacheable_status_codes: Vec<u16>,
pub exclude_extensions: Vec<String>,
pub exclude_paths: Vec<Regex>,
}
impl Default for CacheConfig {
fn default() -> Self {
Self {
enabled: false, default_ttl_secs: 3600,
max_size_bytes: 10 * 1024 * 1024, cache_private: false,
stale_while_revalidate_secs: 60,
stale_if_error_secs: 300,
cacheable_methods: vec!["GET".to_string(), "HEAD".to_string()],
cacheable_status_codes: vec![200, 203, 204, 206, 300, 301, 308, 404, 410],
exclude_extensions: Vec::new(),
exclude_paths: Vec::new(),
}
}
}
#[derive(Debug, Default)]
pub struct HttpCacheStats {
hits: std::sync::atomic::AtomicU64,
misses: std::sync::atomic::AtomicU64,
stores: std::sync::atomic::AtomicU64,
evictions: std::sync::atomic::AtomicU64,
memory_hits: std::sync::atomic::AtomicU64,
disk_hits: std::sync::atomic::AtomicU64,
}
impl HttpCacheStats {
pub fn record_hit(&self) {
self.hits.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
pub fn record_miss(&self) {
self.misses
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
pub fn record_store(&self) {
self.stores
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
pub fn record_eviction(&self) {
self.evictions
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
pub fn hits(&self) -> u64 {
self.hits.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn misses(&self) -> u64 {
self.misses.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn hit_ratio(&self) -> f64 {
let hits = self.hits() as f64;
let total = hits + self.misses() as f64;
if total == 0.0 {
0.0
} else {
hits / total
}
}
pub fn stores(&self) -> u64 {
self.stores.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn evictions(&self) -> u64 {
self.evictions.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn record_memory_hit(&self) {
self.memory_hits
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
pub fn record_disk_hit(&self) {
self.disk_hits
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
pub fn memory_hits(&self) -> u64 {
self.memory_hits.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn disk_hits(&self) -> u64 {
self.disk_hits.load(std::sync::atomic::Ordering::Relaxed)
}
}
#[derive(Debug, Clone)]
struct PurgeEntry {
created_at: Instant,
pattern: Option<String>,
}
const PURGE_ENTRY_LIFETIME: Duration = Duration::from_secs(60);
pub struct CacheManager {
route_configs: RwLock<HashMap<String, CacheConfig>>,
stats: Arc<HttpCacheStats>,
purged_keys: RwLock<HashMap<String, Instant>>,
purge_patterns: RwLock<Vec<PurgeEntry>>,
compiled_patterns: RwLock<Vec<(Regex, Instant)>>,
}
impl CacheManager {
pub fn new() -> Self {
Self {
route_configs: RwLock::new(HashMap::new()),
stats: Arc::new(HttpCacheStats::default()),
purged_keys: RwLock::new(HashMap::new()),
purge_patterns: RwLock::new(Vec::new()),
compiled_patterns: RwLock::new(Vec::new()),
}
}
pub fn stats(&self) -> Arc<HttpCacheStats> {
self.stats.clone()
}
pub fn register_route(&self, route_id: &str, config: CacheConfig) {
trace!(
route_id = route_id,
enabled = config.enabled,
default_ttl = config.default_ttl_secs,
"Registering cache configuration for route"
);
self.route_configs
.write()
.insert(route_id.to_string(), config);
}
pub fn get_route_config(&self, route_id: &str) -> Option<CacheConfig> {
self.route_configs.read().get(route_id).cloned()
}
pub fn is_enabled(&self, route_id: &str) -> bool {
self.route_configs
.read()
.get(route_id)
.map(|c| c.enabled)
.unwrap_or(false)
}
pub fn generate_cache_key(method: &str, host: &str, path: &str, query: Option<&str>) -> String {
match query {
Some(q) => format!("{}:{}:{}?{}", method, host, path, q),
None => format!("{}:{}:{}", method, host, path),
}
}
pub fn is_method_cacheable(&self, route_id: &str, method: &str) -> bool {
self.route_configs
.read()
.get(route_id)
.map(|c| {
c.cacheable_methods
.iter()
.any(|m| m.eq_ignore_ascii_case(method))
})
.unwrap_or(false)
}
pub fn is_path_cacheable(&self, route_id: &str, path: &str) -> bool {
let configs = self.route_configs.read();
let config = match configs.get(route_id) {
Some(c) => c,
None => return true,
};
if !config.exclude_extensions.is_empty() {
if let Some(ext) = path.rsplit('.').next() {
if path.contains('.') {
let ext_lower = ext.to_lowercase();
if config
.exclude_extensions
.iter()
.any(|e| e.eq_ignore_ascii_case(&ext_lower))
{
trace!(
route_id = route_id,
path = path,
extension = %ext_lower,
"Path excluded from cache by extension"
);
return false;
}
}
}
}
for regex in &config.exclude_paths {
if regex.is_match(path) {
trace!(
route_id = route_id,
path = path,
pattern = %regex.as_str(),
"Path excluded from cache by pattern"
);
return false;
}
}
true
}
pub fn is_status_cacheable(&self, route_id: &str, status: u16) -> bool {
self.route_configs
.read()
.get(route_id)
.map(|c| c.cacheable_status_codes.contains(&status))
.unwrap_or(false)
}
pub fn parse_max_age(header_value: &str) -> Option<u64> {
for directive in header_value.split(',') {
let directive = directive.trim();
if let Some(value) = directive.strip_prefix("max-age=") {
if let Ok(secs) = value.trim().parse::<u64>() {
return Some(secs);
}
}
if let Some(value) = directive.strip_prefix("s-maxage=") {
if let Ok(secs) = value.trim().parse::<u64>() {
return Some(secs);
}
}
}
None
}
pub fn is_no_cache(header_value: &str) -> bool {
let lower = header_value.to_lowercase();
lower.contains("no-store") || lower.contains("no-cache") || lower.contains("private")
}
pub fn calculate_ttl(&self, route_id: &str, cache_control: Option<&str>) -> Duration {
let config = self.get_route_config(route_id).unwrap_or_default();
if let Some(cc) = cache_control {
if Self::is_no_cache(cc) && !config.cache_private {
return Duration::ZERO;
}
if let Some(max_age) = Self::parse_max_age(cc) {
return Duration::from_secs(max_age);
}
}
Duration::from_secs(config.default_ttl_secs)
}
pub fn should_serve_stale(
&self,
route_id: &str,
stale_duration: Duration,
is_error: bool,
) -> bool {
let config = self.get_route_config(route_id).unwrap_or_default();
if is_error {
stale_duration.as_secs() <= config.stale_if_error_secs
} else {
stale_duration.as_secs() <= config.stale_while_revalidate_secs
}
}
pub fn route_count(&self) -> usize {
self.route_configs.read().len()
}
pub fn purge(&self, path: &str) -> usize {
let keys_to_purge: Vec<String> =
vec![format!("GET:*:{}", path), format!("HEAD:*:{}", path)];
let now = Instant::now();
let mut purged = self.purged_keys.write();
for key in &keys_to_purge {
purged.insert(key.clone(), now);
}
purged.insert(path.to_string(), now);
debug!(
path = %path,
purged_keys = keys_to_purge.len() + 1,
"Purged cache entry"
);
self.stats.record_eviction();
1
}
pub fn purge_wildcard(&self, pattern: &str) -> usize {
let regex_pattern = glob_to_regex(pattern);
match Regex::new(®ex_pattern) {
Ok(regex) => {
let now = Instant::now();
self.compiled_patterns.write().push((regex, now));
self.purge_patterns.write().push(PurgeEntry {
created_at: now,
pattern: Some(pattern.to_string()),
});
debug!(
pattern = %pattern,
regex = %regex_pattern,
"Registered wildcard cache purge"
);
self.stats.record_eviction();
1
}
Err(e) => {
warn!(
pattern = %pattern,
error = %e,
"Failed to compile purge pattern as regex"
);
0
}
}
}
pub fn should_invalidate(&self, cache_key: &str) -> bool {
self.cleanup_expired_purges();
{
let purged = self.purged_keys.read();
if purged.contains_key(cache_key) {
trace!(cache_key = %cache_key, "Cache key matches purged key");
return true;
}
if let Some(path) = extract_path_from_cache_key(cache_key) {
if purged.contains_key(path) {
trace!(cache_key = %cache_key, path = %path, "Cache path matches purged path");
return true;
}
}
}
{
let patterns = self.compiled_patterns.read();
let path = extract_path_from_cache_key(cache_key).unwrap_or(cache_key);
for (regex, _) in patterns.iter() {
if regex.is_match(path) {
trace!(
cache_key = %cache_key,
path = %path,
pattern = %regex.as_str(),
"Cache key matches purge pattern"
);
return true;
}
}
}
false
}
fn cleanup_expired_purges(&self) {
let now = Instant::now();
{
let mut purged = self.purged_keys.write();
purged.retain(|_, created_at| now.duration_since(*created_at) < PURGE_ENTRY_LIFETIME);
}
{
let mut patterns = self.purge_patterns.write();
patterns.retain(|entry| now.duration_since(entry.created_at) < PURGE_ENTRY_LIFETIME);
}
{
let mut compiled = self.compiled_patterns.write();
compiled
.retain(|(_, created_at)| now.duration_since(*created_at) < PURGE_ENTRY_LIFETIME);
}
}
pub fn active_purge_count(&self) -> usize {
self.purged_keys.read().len() + self.purge_patterns.read().len()
}
#[cfg(test)]
pub fn clear_purges(&self) {
self.purged_keys.write().clear();
self.purge_patterns.write().clear();
self.compiled_patterns.write().clear();
}
}
pub fn compile_glob_to_regex(pattern: &str) -> String {
glob_to_regex(pattern)
}
fn glob_to_regex(pattern: &str) -> String {
let mut regex = String::with_capacity(pattern.len() * 2);
regex.push('^');
let chars: Vec<char> = pattern.chars().collect();
let mut i = 0;
while i < chars.len() {
let c = chars[i];
match c {
'*' => {
if i + 1 < chars.len() && chars[i + 1] == '*' {
regex.push_str(".*");
i += 2;
} else {
regex.push_str("[^/]*");
i += 1;
}
}
'?' => {
regex.push('.');
i += 1;
}
'.' | '+' | '^' | '$' | '(' | ')' | '[' | ']' | '{' | '}' | '|' | '\\' => {
regex.push('\\');
regex.push(c);
i += 1;
}
_ => {
regex.push(c);
i += 1;
}
}
}
regex.push('$');
regex
}
fn extract_path_from_cache_key(cache_key: &str) -> Option<&str> {
let mut colon_count = 0;
for (i, c) in cache_key.char_indices() {
if c == ':' {
colon_count += 1;
if colon_count == 2 {
return Some(&cache_key[i + 1..]);
}
}
}
None
}
impl Default for CacheManager {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_cache_key_generation() {
let key = CacheManager::generate_cache_key("GET", "example.com", "/api/users", None);
assert_eq!(key, "GET:example.com:/api/users");
let key_with_query = CacheManager::generate_cache_key(
"GET",
"example.com",
"/api/users",
Some("page=1&limit=10"),
);
assert_eq!(key_with_query, "GET:example.com:/api/users?page=1&limit=10");
}
#[test]
fn test_cache_config_defaults() {
let config = CacheConfig::default();
assert!(!config.enabled);
assert_eq!(config.default_ttl_secs, 3600);
assert!(config.cacheable_methods.contains(&"GET".to_string()));
assert!(config.cacheable_status_codes.contains(&200));
}
#[test]
fn test_route_config_registration() {
let manager = CacheManager::new();
manager.register_route(
"api",
CacheConfig {
enabled: true,
default_ttl_secs: 300,
..Default::default()
},
);
assert!(manager.is_enabled("api"));
assert!(!manager.is_enabled("unknown"));
}
#[test]
fn test_method_cacheability() {
let manager = CacheManager::new();
manager.register_route(
"api",
CacheConfig {
enabled: true,
cacheable_methods: vec!["GET".to_string(), "HEAD".to_string()],
..Default::default()
},
);
assert!(manager.is_method_cacheable("api", "GET"));
assert!(manager.is_method_cacheable("api", "get"));
assert!(!manager.is_method_cacheable("api", "POST"));
}
#[test]
fn test_parse_max_age() {
assert_eq!(CacheManager::parse_max_age("max-age=3600"), Some(3600));
assert_eq!(
CacheManager::parse_max_age("public, max-age=300"),
Some(300)
);
assert_eq!(
CacheManager::parse_max_age("s-maxage=600, max-age=300"),
Some(600)
);
assert_eq!(CacheManager::parse_max_age("no-store"), None);
}
#[test]
fn test_is_no_cache() {
assert!(CacheManager::is_no_cache("no-store"));
assert!(CacheManager::is_no_cache("no-cache"));
assert!(CacheManager::is_no_cache("private"));
assert!(CacheManager::is_no_cache("private, max-age=300"));
assert!(!CacheManager::is_no_cache("public, max-age=3600"));
}
#[test]
fn test_cache_stats() {
let stats = HttpCacheStats::default();
stats.record_hit();
stats.record_hit();
stats.record_miss();
assert_eq!(stats.hits(), 2);
assert_eq!(stats.misses(), 1);
assert!((stats.hit_ratio() - 0.666).abs() < 0.01);
}
#[test]
fn test_calculate_ttl() {
let manager = CacheManager::new();
manager.register_route(
"api",
CacheConfig {
enabled: true,
default_ttl_secs: 600,
..Default::default()
},
);
let ttl = manager.calculate_ttl("api", Some("max-age=3600"));
assert_eq!(ttl.as_secs(), 3600);
let ttl = manager.calculate_ttl("api", None);
assert_eq!(ttl.as_secs(), 600);
let ttl = manager.calculate_ttl("api", Some("no-store"));
assert_eq!(ttl.as_secs(), 0);
}
#[test]
fn test_purge_single_entry() {
let manager = CacheManager::new();
let count = manager.purge("/api/users/123");
assert_eq!(count, 1);
assert!(manager.active_purge_count() > 0);
let cache_key =
CacheManager::generate_cache_key("GET", "example.com", "/api/users/123", None);
assert!(manager.should_invalidate(&cache_key));
let other_key =
CacheManager::generate_cache_key("GET", "example.com", "/api/users/456", None);
assert!(!manager.should_invalidate(&other_key));
manager.clear_purges();
}
#[test]
fn test_purge_wildcard_pattern() {
let manager = CacheManager::new();
let count = manager.purge_wildcard("/api/users/*");
assert_eq!(count, 1);
assert!(manager.should_invalidate("/api/users/123"));
assert!(manager.should_invalidate("/api/users/456"));
assert!(manager.should_invalidate("/api/users/abc"));
assert!(!manager.should_invalidate("/api/posts/123"));
assert!(!manager.should_invalidate("/api/users"));
manager.clear_purges();
}
#[test]
fn test_purge_double_wildcard() {
let manager = CacheManager::new();
let count = manager.purge_wildcard("/api/**");
assert_eq!(count, 1);
assert!(manager.should_invalidate("/api/users/123"));
assert!(manager.should_invalidate("/api/posts/456/comments"));
assert!(manager.should_invalidate("/api/deep/nested/path"));
assert!(!manager.should_invalidate("/other/path"));
manager.clear_purges();
}
#[test]
fn test_glob_to_regex() {
let regex = glob_to_regex("/api/users/*");
assert_eq!(regex, "^/api/users/[^/]*$");
let regex = glob_to_regex("/api/**");
assert_eq!(regex, "^/api/.*$");
let regex = glob_to_regex("/api/user?");
assert_eq!(regex, "^/api/user.$");
let regex = glob_to_regex("/api/v1.0/users");
assert_eq!(regex, "^/api/v1\\.0/users$");
}
#[test]
fn test_extract_path_from_cache_key() {
let path = extract_path_from_cache_key("GET:example.com:/api/users");
assert_eq!(path, Some("/api/users"));
let path = extract_path_from_cache_key("GET:example.com:/api/users?page=1");
assert_eq!(path, Some("/api/users?page=1"));
let path = extract_path_from_cache_key("invalid");
assert_eq!(path, None);
}
#[test]
fn test_purge_eviction_stats() {
let manager = CacheManager::new();
let initial_evictions = manager.stats().evictions();
manager.purge("/path1");
manager.purge("/path2");
manager.purge_wildcard("/pattern/*");
assert_eq!(manager.stats().evictions(), initial_evictions + 3);
manager.clear_purges();
}
}