pingap_cache/
http_cache.rs

1// Copyright 2024-2025 Tree xie.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::{get_cache_backend, is_cache_backend_init, LOG_CATEGORY};
16use super::{Error, Result, PAGE_SIZE};
17use async_trait::async_trait;
18use bytes::{Buf, BufMut, Bytes, BytesMut};
19use pingap_core::Error as ServiceError;
20use pingap_core::SimpleServiceTaskFuture;
21use pingora::cache::key::{CacheHashKey, CompactCacheKey};
22use pingora::cache::storage::MissFinishType;
23use pingora::cache::storage::{HandleHit, HandleMiss};
24use pingora::cache::trace::SpanHandle;
25use pingora::cache::{
26    CacheKey, CacheMeta, HitHandler, MissHandler, PurgeType, Storage,
27};
28use std::any::Any;
29use std::sync::Arc;
30use std::time::{Duration, SystemTime};
31use tracing::info;
32
33type BinaryMeta = (Vec<u8>, Vec<u8>);
34
35/// Represents a cached object containing metadata and body content
36#[derive(Debug, Clone, Default, PartialEq)]
37pub struct CacheObject {
38    /// Tuple containing two metadata byte vectors (meta0, meta1)
39    pub meta: BinaryMeta,
40    /// The actual cached content
41    pub body: Bytes,
42}
43
44// Maximum size for a single cached object (40MB)
45static MAX_OBJECT_CACHE_SIZE: usize = 10 * 1024 * PAGE_SIZE;
46
47impl CacheObject {
48    pub fn get_weight(&self) -> u16 {
49        let size = self.body.len() + self.meta.0.len() + self.meta.1.len();
50        if size <= PAGE_SIZE {
51            return 1;
52        }
53        if size >= MAX_OBJECT_CACHE_SIZE {
54            return u16::MAX;
55        }
56        (size / PAGE_SIZE) as u16
57    }
58}
59
60const META_SIZE_LENGTH: usize = 8;
61
62/// Creates a CacheObject from bytes with the following format:
63/// - First 4 bytes: meta0 size (u32)
64/// - Next 4 bytes: meta1 size (u32)
65/// - Next meta0_size bytes: meta0 data
66/// - Next meta1_size bytes: meta1 data
67/// - Remaining bytes: body data
68impl From<Bytes> for CacheObject {
69    fn from(value: Bytes) -> Self {
70        // 8 bytes
71        if value.len() < META_SIZE_LENGTH {
72            return Self::default();
73        }
74        let mut data = value;
75
76        let meta0_size = data.get_u32() as usize;
77        let meta1_size = data.get_u32() as usize;
78
79        let meta0 = data.split_to(meta0_size).to_vec();
80        let meta1 = data.split_to(meta1_size).to_vec();
81
82        Self {
83            meta: (meta0, meta1),
84            body: data,
85        }
86    }
87}
88/// Converts a CacheObject into bytes with the following format:
89/// - First 4 bytes: meta0 size (u32)
90/// - Next 4 bytes: meta1 size (u32)
91/// - Next meta0_size bytes: meta0 data
92/// - Next meta1_size bytes: meta1 data
93/// - Remaining bytes: body data
94impl From<CacheObject> for Bytes {
95    fn from(value: CacheObject) -> Self {
96        let meta_size =
97            value.meta.0.len() + value.meta.1.len() + META_SIZE_LENGTH;
98        let mut buf = BytesMut::with_capacity(value.body.len() + meta_size);
99        let meta0_size = value.meta.0.len() as u32;
100        let meta1_size = value.meta.1.len() as u32;
101        buf.put_u32(meta0_size);
102        buf.put_u32(meta1_size);
103        buf.extend(value.meta.0);
104        buf.extend(value.meta.1);
105        buf.extend(value.body.iter());
106
107        buf.into()
108    }
109}
110
111#[derive(Debug)]
112pub struct HttpCacheStats {
113    pub reading: u32,
114    pub writing: u32,
115}
116
117/// Storage interface for HTTP caching operations
118///
119/// This trait defines the core operations needed to implement a storage backend
120/// for HTTP caching. Implementations must be both `Send` and `Sync` to support
121/// concurrent access.
122#[async_trait]
123pub trait HttpCacheStorage: Sync + Send {
124    /// Retrieves a cached object from storage
125    /// Returns None if not found or Some(CacheObject) if present
126    async fn get(
127        &self,
128        key: &str,
129        namespace: &str,
130    ) -> Result<Option<CacheObject>>;
131
132    /// Stores a cache object with the given key and namespace
133    async fn put(
134        &self,
135        key: &str,
136        namespace: &str,
137        data: CacheObject,
138    ) -> Result<()>;
139
140    /// Removes a cached object from storage.
141    ///
142    /// # Arguments
143    /// * `key` - The unique identifier for the cached object
144    /// * `namespace` - The namespace to scope the cache key
145    ///
146    /// # Returns
147    /// * `Result<Option<CacheObject>>` - The removed object if it existed
148    async fn remove(
149        &self,
150        _key: &str,
151        _namespace: &str,
152    ) -> Result<Option<CacheObject>> {
153        Ok(None)
154    }
155
156    /// Clears cached objects accessed before the specified time.
157    ///
158    /// # Arguments
159    /// * `access_before` - Remove items last accessed before this timestamp
160    ///
161    /// # Returns
162    /// * `Result<(i32, i32)>` - Count of (successful, failed) removals
163    async fn clear(
164        &self,
165        _access_before: std::time::SystemTime,
166    ) -> Result<(i32, i32)> {
167        Ok((-1, -1))
168    }
169
170    /// Returns current storage statistics.
171    ///
172    /// # Returns
173    /// * `Option<HttpCacheStats>` - Current read/write statistics if available
174    fn stats(&self) -> Option<HttpCacheStats> {
175        None
176    }
177
178    /// Returns whether this storage implementation supports the clear operation
179    ///
180    /// # Returns
181    /// * `bool` - Default implementation returns false, indicating no clear support
182    /// Implementations should override this to return true if they
183    /// support the clear operation
184    fn support_clear(&self) -> bool {
185        false
186    }
187}
188
189async fn do_file_storage_clear(
190    count: u32,
191    cache: Arc<dyn HttpCacheStorage>,
192) -> Result<bool, ServiceError> {
193    // Add 1 every loop
194    let offset = 60;
195    if count % offset != 0 {
196        return Ok(false);
197    }
198
199    let Some(access_before) =
200        SystemTime::now().checked_sub(Duration::from_secs(24 * 3600))
201    else {
202        return Ok(false);
203    };
204
205    let Ok((success, fail)) = cache.clear(access_before).await else {
206        return Ok(true);
207    };
208    if success < 0 {
209        return Ok(true);
210    }
211    info!(
212        category = LOG_CATEGORY,
213        success, fail, "file cache storage clear"
214    );
215    Ok(true)
216}
217
218pub fn new_storage_clear_service() -> Option<(String, SimpleServiceTaskFuture)>
219{
220    // if cache backend not initialized, do not create storage clear service
221    if !is_cache_backend_init() {
222        return None;
223    }
224    // because the cache backend is initialized once,
225    // so we can use the default option
226    let Ok(backend) = get_cache_backend(None) else {
227        return None;
228    };
229    if !backend.cache.support_clear() {
230        return None;
231    }
232    let task: SimpleServiceTaskFuture = Box::new(move |count: u32| {
233        Box::pin({
234            let value = backend.cache.clone();
235            async move {
236                let value = value.clone();
237                do_file_storage_clear(count, value).await
238            }
239        })
240    });
241    Some(("cache_storage_clear".to_string(), task))
242}
243
244pub struct HttpCache {
245    pub directory: Option<String>,
246    pub cache: Arc<dyn HttpCacheStorage>,
247}
248
249impl HttpCache {
250    #[inline]
251    pub fn stats(&self) -> Option<HttpCacheStats> {
252        self.cache.stats()
253    }
254}
255
256/// Handles cache hits by managing access to cached content
257pub struct CompleteHit {
258    /// The cached content
259    body: Bytes,
260    /// Whether the content has been read
261    done: bool,
262    /// Start position for range requests
263    range_start: usize,
264    /// End position for range requests
265    range_end: usize,
266}
267
268impl CompleteHit {
269    fn get(&mut self) -> Option<Bytes> {
270        if self.done {
271            None
272        } else {
273            self.done = true;
274            Some(self.body.slice(self.range_start..self.range_end))
275        }
276    }
277
278    fn seek(&mut self, start: usize, end: Option<usize>) -> Result<()> {
279        if start >= self.body.len() {
280            return Err(Error::Invalid {
281                message: format!(
282                    "seek start out of range {start} >= {}",
283                    self.body.len()
284                ),
285            });
286        }
287        self.range_start = start;
288        if let Some(end) = end {
289            // end over the actual last byte is allowed, we just need to return the actual bytes
290            self.range_end = std::cmp::min(self.body.len(), end);
291        }
292        // seek resets read so that one handler can be used for multiple ranges
293        self.done = false;
294        Ok(())
295    }
296}
297
298#[async_trait]
299impl HandleHit for CompleteHit {
300    async fn read_body(&mut self) -> pingora::Result<Option<Bytes>> {
301        Ok(self.get())
302    }
303    async fn finish(
304        self: Box<Self>, // because self is always used as a trait object
305        _storage: &'static (dyn Storage + Sync),
306        _key: &CacheKey,
307        _trace: &SpanHandle,
308    ) -> pingora::Result<()> {
309        Ok(())
310    }
311
312    fn can_seek(&self) -> bool {
313        true
314    }
315
316    fn seek(
317        &mut self,
318        start: usize,
319        end: Option<usize>,
320    ) -> pingora::Result<()> {
321        self.seek(start, end)?;
322        Ok(())
323    }
324
325    fn as_any(&self) -> &(dyn Any + Send + Sync) {
326        self
327    }
328}
329
330/// Handles cache misses by collecting and storing new content
331pub struct ObjectMissHandler {
332    /// Metadata to store with the cached content
333    meta: BinaryMeta,
334    /// Buffer for collecting the body content
335    body: BytesMut,
336    /// Cache key for storing the final object
337    key: String,
338    /// Namespace for storing the final object
339    namespace: String,
340    /// Reference to the storage backend
341    cache: Arc<dyn HttpCacheStorage>,
342}
343
344#[async_trait]
345impl HandleMiss for ObjectMissHandler {
346    async fn write_body(
347        &mut self,
348        data: bytes::Bytes,
349        _eof: bool,
350    ) -> pingora::Result<()> {
351        self.body.extend(&data);
352        Ok(())
353    }
354
355    async fn finish(self: Box<Self>) -> pingora::Result<MissFinishType> {
356        let size = self.body.len(); // FIXME: this just body size, also track meta size
357        let _ = self
358            .cache
359            .put(
360                &self.key,
361                &self.namespace,
362                CacheObject {
363                    meta: self.meta,
364                    body: self.body.into(),
365                },
366            )
367            .await?;
368
369        Ok(MissFinishType::Created(size))
370    }
371}
372
373#[async_trait]
374impl Storage for HttpCache {
375    async fn lookup(
376        &'static self,
377        key: &CacheKey,
378        _trace: &SpanHandle,
379    ) -> pingora::Result<Option<(CacheMeta, HitHandler)>> {
380        let namespace = key.namespace();
381        let hash = key.combined();
382        if let Some(obj) = self.cache.get(&hash, namespace).await? {
383            let meta = CacheMeta::deserialize(&obj.meta.0, &obj.meta.1)?;
384            let size = obj.body.len();
385            let hit_handler = CompleteHit {
386                body: obj.body,
387                done: false,
388                range_start: 0,
389                range_end: size,
390            };
391            Ok(Some((meta, Box::new(hit_handler))))
392        } else {
393            Ok(None)
394        }
395    }
396
397    async fn get_miss_handler(
398        &'static self,
399        key: &CacheKey,
400        meta: &CacheMeta,
401        _trace: &SpanHandle,
402    ) -> pingora::Result<MissHandler> {
403        // TODO: support multiple concurrent writes or panic if the is already a writer
404        let capacity = 5 * 1024;
405        let size = if let Some(content_length) =
406            meta.headers().get(http::header::CONTENT_LENGTH)
407        {
408            content_length
409                .to_str()
410                .unwrap_or_default()
411                .parse::<usize>()
412                .unwrap_or(capacity)
413        } else {
414            capacity
415        };
416        let hash = key.combined();
417        let meta = meta.serialize()?;
418        let miss_handler = ObjectMissHandler {
419            meta,
420            key: hash,
421            namespace: key.namespace().to_string(),
422            cache: self.cache.clone(),
423            body: BytesMut::with_capacity(size),
424        };
425        Ok(Box::new(miss_handler))
426    }
427
428    async fn purge(
429        &'static self,
430        key: &CompactCacheKey,
431        _type: PurgeType,
432        _trace: &SpanHandle,
433    ) -> pingora::Result<bool> {
434        // This usually purges the primary key because, without a lookup,
435        // the variance key is usually empty
436        let hash = key.combined();
437        // TODO get namespace of cache key
438        let cache_removed =
439            if let Ok(result) = self.cache.remove(&hash, "").await {
440                result.is_some()
441            } else {
442                false
443            };
444        Ok(cache_removed)
445    }
446
447    async fn update_meta(
448        &'static self,
449        key: &CacheKey,
450        meta: &CacheMeta,
451        _trace: &SpanHandle,
452    ) -> pingora::Result<bool> {
453        let namespace = key.namespace();
454        let hash = key.combined();
455        if let Some(mut obj) = self.cache.get(&hash, namespace).await? {
456            obj.meta = meta.serialize()?;
457            let _ = self.cache.put(&hash, namespace, obj).await?;
458            Ok(true)
459        } else {
460            Err(Error::Invalid {
461                message: "no meta found".to_string(),
462            }
463            .into())
464        }
465    }
466
467    fn support_streaming_partial_write(&self) -> bool {
468        false
469    }
470
471    fn as_any(&self) -> &(dyn Any + Send + Sync) {
472        self
473    }
474}
475
476#[cfg(test)]
477mod tests {
478    use super::*;
479    use crate::tiny::new_tiny_ufo_cache;
480    use bytes::{Bytes, BytesMut};
481    use pingora::cache::storage::{HitHandler, MissHandler};
482    use pretty_assertions::assert_eq;
483    use std::sync::Arc;
484
485    #[tokio::test]
486    async fn test_complete_hit() {
487        let body = Bytes::from_static(b"Hello World!");
488        let size = body.len();
489        let hit = CompleteHit {
490            body,
491            done: false,
492            range_start: 0,
493            range_end: size,
494        };
495        let mut handle: HitHandler = Box::new(hit);
496        let body = handle.read_body().await.unwrap();
497        assert_eq!(true, body.is_some());
498        assert_eq!(b"Hello World!", body.unwrap().as_ref());
499
500        handle.seek(1, Some(size - 1)).unwrap();
501        let body = handle.read_body().await.unwrap();
502        assert_eq!(true, body.is_some());
503        assert_eq!(b"ello World", body.unwrap().as_ref());
504    }
505
506    #[tokio::test]
507    async fn test_object_miss_handler() {
508        let key = "key";
509
510        let cache = Arc::new(new_tiny_ufo_cache("", 10, 10));
511        let obj = ObjectMissHandler {
512            meta: (b"Hello".to_vec(), b"World".to_vec()),
513            body: BytesMut::new(),
514            key: key.to_string(),
515            namespace: "".to_string(),
516            cache: cache.clone(),
517        };
518        let mut handle: MissHandler = Box::new(obj);
519
520        handle
521            .write_body(Bytes::from_static(b"Hello World!"), true)
522            .await
523            .unwrap();
524        handle.finish().await.unwrap();
525
526        let data = cache.get(key, "").await.unwrap().unwrap();
527        assert_eq!("Hello World!", std::str::from_utf8(&data.body).unwrap());
528    }
529
530    #[test]
531    fn test_cache_object_get_weight() {
532        // data less than one page
533        let obj = CacheObject {
534            meta: (b"Hello".to_vec(), b"World".to_vec()),
535            body: Bytes::from_static(b"Hello World!"),
536        };
537        assert_eq!(1, obj.get_weight());
538
539        let obj = CacheObject {
540            meta: (b"Hello".to_vec(), b"World".to_vec()),
541            body: vec![0; PAGE_SIZE * 2].into(),
542        };
543        assert_eq!(2, obj.get_weight());
544
545        // data larger than max size
546        let obj = CacheObject {
547            meta: (b"Hello".to_vec(), b"World".to_vec()),
548            body: vec![0; MAX_OBJECT_CACHE_SIZE + 1].into(),
549        };
550        assert_eq!(u16::MAX, obj.get_weight());
551    }
552}