1use bytesize::ByteSize;
16use serde::{Deserialize, Serialize};
17use snafu::Snafu;
18use std::collections::HashMap;
19use std::str::FromStr;
20use std::sync::Arc;
21use std::sync::LazyLock;
22use std::sync::Mutex;
23use std::sync::OnceLock;
24use std::sync::atomic::AtomicU64;
25use std::sync::atomic::Ordering;
26use tracing::info;
27
28mod file;
29mod http_cache;
30mod tiny;
31
32pub static PAGE_SIZE: usize = 4096;
33
34pub static LOG_TARGET: &str = "pingap::cache";
36
37#[derive(Debug, Snafu)]
38pub enum Error {
39 #[snafu(display("Io error: {source}"))]
40 Io { source: std::io::Error },
41 #[snafu(display("{message}"))]
42 Invalid { message: String },
43 #[snafu(display("Over quota error, max: {max}, {message}"))]
44 OverQuota { max: u32, message: String },
45 #[snafu(display("{message}"))]
46 Prometheus { message: String },
47}
48pub type Result<T, E = Error> = std::result::Result<T, E>;
49
50impl From<Error> for pingora::BError {
51 fn from(value: Error) -> Self {
52 pingap_core::new_internal_error(500, value)
53 }
54}
55
56fn new_tiny_ufo_cache(mode: &str, size: usize) -> HttpCache {
57 let mode = CacheMode::from_str(mode).unwrap_or_default();
58 HttpCache {
59 directory: None,
60 cache: Arc::new(tiny::TinyUfoCache::new(mode, size / PAGE_SIZE, size)),
61 max_size: size as u64,
62 }
63}
64fn new_file_cache(dir: &str) -> Result<HttpCache> {
65 let cache = FileCache::new(dir)?;
66 Ok(HttpCache {
67 directory: Some(cache.directory.clone()),
68 cache: Arc::new(cache),
69 max_size: 0,
70 })
71}
72
73struct CacheBackendProvider {
74 cache_backends: Mutex<HashMap<String, &'static HttpCache>>,
75}
76
77static BACKENDS: LazyLock<CacheBackendProvider> =
78 LazyLock::new(|| CacheBackendProvider {
79 cache_backends: Mutex::new(HashMap::new()),
80 });
81
82static MEMORY_BACKEND: OnceLock<HttpCache> = OnceLock::new();
83
84const MAX_MEMORY_SIZE: usize = 1024 * 1024 * 1024;
85
86pub(crate) fn get_file_backends() -> Vec<&'static HttpCache> {
87 if let Ok(backends) = BACKENDS.cache_backends.lock() {
88 backends.values().copied().collect()
89 } else {
90 Vec::new()
91 }
92}
93
94static AVAILABLE_MEMORY: AtomicU64 = AtomicU64::new(0);
95
96pub fn update_available_memory(available_memory: u64) {
97 AVAILABLE_MEMORY.store(available_memory, Ordering::Relaxed);
98}
99
100fn parse_byte_size<'de, D>(deserializer: D) -> Result<Option<usize>, D::Error>
101where
102 D: serde::de::Deserializer<'de>,
103{
104 let s: String = String::deserialize(deserializer)?;
105 if s.is_empty() {
106 return Ok(None);
107 }
108 let size = ByteSize::from_str(&s)
109 .map_err(|e| serde::de::Error::custom(e.to_string()))?;
110 Ok(Some(size.as_u64() as usize))
111}
112#[derive(Debug, PartialEq, Deserialize, Serialize, Default)]
113struct MemoryCacheParams {
114 #[serde(default)]
115 #[serde(deserialize_with = "parse_byte_size")]
116 max_size: Option<usize>,
117 mode: Option<String>,
118}
119impl TryFrom<&str> for MemoryCacheParams {
120 type Error = Error;
121 fn try_from(value: &str) -> Result<Self> {
122 let params = if let Some((_, query)) = value.split_once('?') {
123 serde_qs::from_str(query).map_err(|e| Error::Invalid {
124 message: e.to_string(),
125 })?
126 } else {
127 MemoryCacheParams::default()
128 };
129 Ok(params)
130 }
131}
132
133fn try_init_memory_backend(value: &str) -> &'static HttpCache {
134 MEMORY_BACKEND.get_or_init(|| {
135 let params = MemoryCacheParams::try_from(value).unwrap_or_default();
136 let available_memory =
137 AVAILABLE_MEMORY.load(Ordering::Relaxed) as usize;
138 let max_memory = if available_memory > 0 {
139 available_memory / 4
140 } else {
141 ByteSize::mb(256).as_u64() as usize
142 };
143
144 let mut size = if let Some(cache_max_size) = params.max_size {
146 if cache_max_size < 10 * 1024 * 1024 {
148 max_memory * cache_max_size.min(100) / 100
149 } else {
150 cache_max_size
151 }
152 } else {
153 max_memory
154 };
155
156 let cache_mode = params.mode.unwrap_or_default();
157
158 size = size.min(MAX_MEMORY_SIZE);
159 info!(
160 target: LOG_TARGET,
161 size = ByteSize(size as u64).to_string(),
162 cache_mode,
163 "init memory cache backend success"
164 );
165 new_tiny_ufo_cache(&cache_mode, size)
166 })
167}
168
169pub fn new_cache_backend(directory: &str) -> Result<&'static HttpCache> {
170 if directory.is_empty() || directory.starts_with("memory://") {
171 return Ok(try_init_memory_backend(directory));
172 }
173 let mut cache_backends =
174 BACKENDS.cache_backends.lock().map_err(|e| Error::Invalid {
175 message: e.to_string(),
176 })?;
177 if let Some(backend) = cache_backends.get(directory) {
178 return Ok(backend);
179 }
180
181 let cache = new_file_cache(directory).map_err(|e| Error::Invalid {
183 message: e.to_string(),
184 })?;
185 info!(
186 target: LOG_TARGET,
187 inactive = cache.cache.inactive().map(|v| v.as_secs()),
188 "init file cache backend success"
189 );
190
191 let cache_ref: &'static HttpCache = Box::leak(Box::new(cache));
192 cache_backends.insert(directory.to_string(), cache_ref);
193
194 Ok(cache_ref)
195}
196
197pub use http_cache::{HttpCache, new_storage_clear_service};
198
199#[cfg(feature = "tracing")]
200mod prom;
201#[cfg(feature = "tracing")]
202pub use prom::{CACHE_READING_TIME, CACHE_WRITING_TIME};
203
204use crate::file::FileCache;
205use crate::tiny::CacheMode;
206
207#[cfg(test)]
208mod tests {
209 use super::*;
210 use pretty_assertions::assert_eq;
211 use tempfile::TempDir;
212
213 #[test]
214 fn test_convert_error() {
215 let err = Error::Invalid {
216 message: "invalid error".to_string(),
217 };
218
219 let b_error: pingora::BError = err.into();
220
221 assert_eq!(
222 " HTTPStatus context: invalid error cause: InternalError",
223 b_error.to_string()
224 );
225 }
226
227 #[test]
228 fn test_cache() {
229 let _ = new_tiny_ufo_cache("compact", 1024);
230
231 let dir = TempDir::new().unwrap();
232 let result = new_file_cache(&dir.keep().to_string_lossy());
233 assert_eq!(true, result.is_ok());
234 }
235}