Skip to main content

pingap_cache/
lib.rs

1// Copyright 2024-2025 Tree xie.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use bytesize::ByteSize;
16use serde::{Deserialize, Serialize};
17use snafu::Snafu;
18use std::collections::HashMap;
19use std::str::FromStr;
20use std::sync::Arc;
21use std::sync::LazyLock;
22use std::sync::Mutex;
23use std::sync::OnceLock;
24use std::sync::atomic::AtomicU64;
25use std::sync::atomic::Ordering;
26use tracing::info;
27
28mod file;
29mod http_cache;
30mod tiny;
31
32pub static PAGE_SIZE: usize = 4096;
33
34/// Category name for cache related logging
35pub static LOG_TARGET: &str = "pingap::cache";
36
37#[derive(Debug, Snafu)]
38pub enum Error {
39    #[snafu(display("Io error: {source}"))]
40    Io { source: std::io::Error },
41    #[snafu(display("{message}"))]
42    Invalid { message: String },
43    #[snafu(display("Over quota error, max: {max}, {message}"))]
44    OverQuota { max: u32, message: String },
45    #[snafu(display("{message}"))]
46    Prometheus { message: String },
47}
48pub type Result<T, E = Error> = std::result::Result<T, E>;
49
50impl From<Error> for pingora::BError {
51    fn from(value: Error) -> Self {
52        pingap_core::new_internal_error(500, value)
53    }
54}
55
56fn new_tiny_ufo_cache(mode: &str, size: usize) -> HttpCache {
57    let mode = CacheMode::from_str(mode).unwrap_or_default();
58    HttpCache {
59        directory: None,
60        cache: Arc::new(tiny::TinyUfoCache::new(mode, size / PAGE_SIZE, size)),
61        max_size: size as u64,
62    }
63}
64fn new_file_cache(dir: &str) -> Result<HttpCache> {
65    let cache = FileCache::new(dir)?;
66    Ok(HttpCache {
67        directory: Some(cache.directory.clone()),
68        cache: Arc::new(cache),
69        max_size: 0,
70    })
71}
72
73struct CacheBackendProvider {
74    cache_backends: Mutex<HashMap<String, &'static HttpCache>>,
75}
76
77static BACKENDS: LazyLock<CacheBackendProvider> =
78    LazyLock::new(|| CacheBackendProvider {
79        cache_backends: Mutex::new(HashMap::new()),
80    });
81
82static MEMORY_BACKEND: OnceLock<HttpCache> = OnceLock::new();
83
84const MAX_MEMORY_SIZE: usize = 1024 * 1024 * 1024;
85
86pub(crate) fn get_file_backends() -> Vec<&'static HttpCache> {
87    if let Ok(backends) = BACKENDS.cache_backends.lock() {
88        backends.values().copied().collect()
89    } else {
90        Vec::new()
91    }
92}
93
94static AVAILABLE_MEMORY: AtomicU64 = AtomicU64::new(0);
95
96pub fn update_available_memory(available_memory: u64) {
97    AVAILABLE_MEMORY.store(available_memory, Ordering::Relaxed);
98}
99
100fn parse_byte_size<'de, D>(deserializer: D) -> Result<Option<usize>, D::Error>
101where
102    D: serde::de::Deserializer<'de>,
103{
104    let s: String = String::deserialize(deserializer)?;
105    if s.is_empty() {
106        return Ok(None);
107    }
108    let size = ByteSize::from_str(&s)
109        .map_err(|e| serde::de::Error::custom(e.to_string()))?;
110    Ok(Some(size.as_u64() as usize))
111}
112#[derive(Debug, PartialEq, Deserialize, Serialize, Default)]
113struct MemoryCacheParams {
114    #[serde(default)]
115    #[serde(deserialize_with = "parse_byte_size")]
116    max_size: Option<usize>,
117    mode: Option<String>,
118}
119impl TryFrom<&str> for MemoryCacheParams {
120    type Error = Error;
121    fn try_from(value: &str) -> Result<Self> {
122        let params = if let Some((_, query)) = value.split_once('?') {
123            serde_qs::from_str(query).map_err(|e| Error::Invalid {
124                message: e.to_string(),
125            })?
126        } else {
127            MemoryCacheParams::default()
128        };
129        Ok(params)
130    }
131}
132
133fn try_init_memory_backend(value: &str) -> &'static HttpCache {
134    MEMORY_BACKEND.get_or_init(|| {
135        let params = MemoryCacheParams::try_from(value).unwrap_or_default();
136        let available_memory =
137            AVAILABLE_MEMORY.load(Ordering::Relaxed) as usize;
138        let max_memory = if available_memory > 0 {
139            available_memory / 4
140        } else {
141            ByteSize::mb(256).as_u64() as usize
142        };
143
144        // Determine cache size from config or use default MAX_MEMORY_SIZE
145        let mut size = if let Some(cache_max_size) = params.max_size {
146            // if memory is less than 10MB, use the percentage of memory
147            if cache_max_size < 10 * 1024 * 1024 {
148                max_memory * cache_max_size.min(100) / 100
149            } else {
150                cache_max_size
151            }
152        } else {
153            max_memory
154        };
155
156        let cache_mode = params.mode.unwrap_or_default();
157
158        size = size.min(MAX_MEMORY_SIZE);
159        info!(
160            target: LOG_TARGET,
161            size = ByteSize(size as u64).to_string(),
162            cache_mode,
163            "init memory cache backend success"
164        );
165        new_tiny_ufo_cache(&cache_mode, size)
166    })
167}
168
169pub fn new_cache_backend(directory: &str) -> Result<&'static HttpCache> {
170    if directory.is_empty() || directory.starts_with("memory://") {
171        return Ok(try_init_memory_backend(directory));
172    }
173    let mut cache_backends =
174        BACKENDS.cache_backends.lock().map_err(|e| Error::Invalid {
175            message: e.to_string(),
176        })?;
177    if let Some(backend) = cache_backends.get(directory) {
178        return Ok(backend);
179    }
180
181    // Use file-based cache if directory is specified
182    let cache = new_file_cache(directory).map_err(|e| Error::Invalid {
183        message: e.to_string(),
184    })?;
185    info!(
186        target: LOG_TARGET,
187        inactive = cache.cache.inactive().map(|v| v.as_secs()),
188        "init file cache backend success"
189    );
190
191    let cache_ref: &'static HttpCache = Box::leak(Box::new(cache));
192    cache_backends.insert(directory.to_string(), cache_ref);
193
194    Ok(cache_ref)
195}
196
197pub use http_cache::{HttpCache, new_storage_clear_service};
198
199#[cfg(feature = "tracing")]
200mod prom;
201#[cfg(feature = "tracing")]
202pub use prom::{CACHE_READING_TIME, CACHE_WRITING_TIME};
203
204use crate::file::FileCache;
205use crate::tiny::CacheMode;
206
207#[cfg(test)]
208mod tests {
209    use super::*;
210    use pretty_assertions::assert_eq;
211    use tempfile::TempDir;
212
213    #[test]
214    fn test_convert_error() {
215        let err = Error::Invalid {
216            message: "invalid error".to_string(),
217        };
218
219        let b_error: pingora::BError = err.into();
220
221        assert_eq!(
222            " HTTPStatus context: invalid error cause:  InternalError",
223            b_error.to_string()
224        );
225    }
226
227    #[test]
228    fn test_cache() {
229        let _ = new_tiny_ufo_cache("compact", 1024);
230
231        let dir = TempDir::new().unwrap();
232        let result = new_file_cache(&dir.keep().to_string_lossy());
233        assert_eq!(true, result.is_ok());
234    }
235}