pingap_cache/
http_cache.rs

1// Copyright 2024-2025 Tree xie.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::{get_cache_backend, is_cache_backend_init, LOG_CATEGORY};
16use super::{Error, Result, PAGE_SIZE};
17use async_trait::async_trait;
18use bytes::{Buf, BufMut, Bytes, BytesMut};
19use pingap_core::Error as ServiceError;
20use pingap_core::SimpleServiceTaskFuture;
21use pingora::cache::key::{CacheHashKey, CompactCacheKey};
22use pingora::cache::storage::MissFinishType;
23use pingora::cache::storage::{HandleHit, HandleMiss};
24use pingora::cache::trace::SpanHandle;
25use pingora::cache::{
26    CacheKey, CacheMeta, HitHandler, MissHandler, PurgeType, Storage,
27};
28use std::any::Any;
29use std::sync::Arc;
30use std::time::{Duration, SystemTime};
31use tracing::info;
32
33type BinaryMeta = (Vec<u8>, Vec<u8>);
34
35/// Represents a cached object containing metadata and body content
36#[derive(Debug, Clone, Default, PartialEq)]
37pub struct CacheObject {
38    /// Tuple containing two metadata byte vectors (meta0, meta1)
39    pub meta: BinaryMeta,
40    /// The actual cached content
41    pub body: Bytes,
42}
43
44// Maximum size for a single cached object (40MB)
45static MAX_OBJECT_CACHE_SIZE: usize = 10 * 1024 * PAGE_SIZE;
46
47impl CacheObject {
48    pub fn get_weight(&self) -> u16 {
49        let size = self.body.len() + self.meta.0.len() + self.meta.1.len();
50        if size <= PAGE_SIZE {
51            return 1;
52        }
53        if size >= MAX_OBJECT_CACHE_SIZE {
54            return u16::MAX;
55        }
56        (size / PAGE_SIZE) as u16
57    }
58}
59
60const META_SIZE_LENGTH: usize = 8;
61
62/// Creates a CacheObject from bytes with the following format:
63/// - First 4 bytes: meta0 size (u32)
64/// - Next 4 bytes: meta1 size (u32)
65/// - Next meta0_size bytes: meta0 data
66/// - Next meta1_size bytes: meta1 data
67/// - Remaining bytes: body data
68impl From<Bytes> for CacheObject {
69    fn from(value: Bytes) -> Self {
70        // 8 bytes
71        if value.len() < META_SIZE_LENGTH {
72            return Self::default();
73        }
74        let mut data = value;
75
76        let meta0_size = data.get_u32() as usize;
77        let meta1_size = data.get_u32() as usize;
78
79        let meta0 = data.split_to(meta0_size).to_vec();
80        let meta1 = data.split_to(meta1_size).to_vec();
81
82        Self {
83            meta: (meta0, meta1),
84            body: data,
85        }
86    }
87}
88/// Converts a CacheObject into bytes with the following format:
89/// - First 4 bytes: meta0 size (u32)
90/// - Next 4 bytes: meta1 size (u32)
91/// - Next meta0_size bytes: meta0 data
92/// - Next meta1_size bytes: meta1 data
93/// - Remaining bytes: body data
94impl From<CacheObject> for Bytes {
95    fn from(value: CacheObject) -> Self {
96        let meta_size =
97            value.meta.0.len() + value.meta.1.len() + META_SIZE_LENGTH;
98        let mut buf = BytesMut::with_capacity(value.body.len() + meta_size);
99        let meta0_size = value.meta.0.len() as u32;
100        let meta1_size = value.meta.1.len() as u32;
101        buf.put_u32(meta0_size);
102        buf.put_u32(meta1_size);
103        buf.extend(value.meta.0);
104        buf.extend(value.meta.1);
105        buf.extend(value.body.iter());
106
107        buf.into()
108    }
109}
110
111#[derive(Debug)]
112pub struct HttpCacheStats {
113    pub reading: u32,
114    pub writing: u32,
115}
116
117/// Storage interface for HTTP caching operations
118///
119/// This trait defines the core operations needed to implement a storage backend
120/// for HTTP caching. Implementations must be both `Send` and `Sync` to support
121/// concurrent access.
122#[async_trait]
123pub trait HttpCacheStorage: Sync + Send {
124    /// Retrieves a cached object from storage
125    /// Returns None if not found or Some(CacheObject) if present
126    async fn get(
127        &self,
128        key: &str,
129        namespace: &[u8],
130    ) -> Result<Option<CacheObject>>;
131
132    /// Stores a cache object with the given key and namespace
133    async fn put(
134        &self,
135        key: &str,
136        namespace: &[u8],
137        data: CacheObject,
138    ) -> Result<()>;
139
140    /// Removes a cached object from storage.
141    ///
142    /// # Arguments
143    /// * `key` - The unique identifier for the cached object
144    /// * `namespace` - The namespace to scope the cache key
145    ///
146    /// # Returns
147    /// * `Result<Option<CacheObject>>` - The removed object if it existed
148    async fn remove(
149        &self,
150        _key: &str,
151        _namespace: &[u8],
152    ) -> Result<Option<CacheObject>> {
153        Ok(None)
154    }
155
156    /// Clears cached objects accessed before the specified time.
157    ///
158    /// # Arguments
159    /// * `access_before` - Remove items last accessed before this timestamp
160    ///
161    /// # Returns
162    /// * `Result<(i32, i32)>` - Count of (successful, failed) removals
163    async fn clear(
164        &self,
165        _access_before: std::time::SystemTime,
166    ) -> Result<(i32, i32)> {
167        Ok((-1, -1))
168    }
169
170    /// Returns current storage statistics.
171    ///
172    /// # Returns
173    /// * `Option<HttpCacheStats>` - Current read/write statistics if available
174    fn stats(&self) -> Option<HttpCacheStats> {
175        None
176    }
177
178    /// Returns the inactive duration for the cache storage.
179    ///
180    /// # Returns
181    /// * `Option<Duration>` - The inactive duration for the cache storage
182    fn inactive(&self) -> Option<Duration> {
183        None
184    }
185}
186
187async fn do_file_storage_clear(
188    count: u32,
189    cache: Arc<dyn HttpCacheStorage>,
190) -> Result<bool, ServiceError> {
191    // Add 1 every loop
192    let offset = 60;
193    if count % offset != 0 {
194        return Ok(false);
195    }
196
197    let Some(inactive_duration) = cache.inactive() else {
198        return Ok(false);
199    };
200
201    let Some(access_before) = SystemTime::now().checked_sub(inactive_duration)
202    else {
203        return Ok(false);
204    };
205
206    let Ok((success, fail)) = cache.clear(access_before).await else {
207        return Ok(true);
208    };
209    if success < 0 {
210        return Ok(true);
211    }
212    info!(
213        category = LOG_CATEGORY,
214        success, fail, "file cache storage clear"
215    );
216    Ok(true)
217}
218
219pub fn new_storage_clear_service() -> Option<(String, SimpleServiceTaskFuture)>
220{
221    // if cache backend not initialized, do not create storage clear service
222    if !is_cache_backend_init() {
223        return None;
224    }
225    // because the cache backend is initialized once,
226    // so we can use the default option
227    let Ok(backend) = get_cache_backend(None) else {
228        return None;
229    };
230    backend.cache.inactive()?;
231    let task: SimpleServiceTaskFuture = Box::new(move |count: u32| {
232        Box::pin({
233            let value = backend.cache.clone();
234            async move {
235                let value = value.clone();
236                do_file_storage_clear(count, value).await
237            }
238        })
239    });
240    Some(("cache_storage_clear".to_string(), task))
241}
242
243pub struct HttpCache {
244    pub directory: Option<String>,
245    pub cache: Arc<dyn HttpCacheStorage>,
246}
247
248impl HttpCache {
249    #[inline]
250    pub fn stats(&self) -> Option<HttpCacheStats> {
251        self.cache.stats()
252    }
253}
254
255/// Handles cache hits by managing access to cached content
256pub struct CompleteHit {
257    /// The cached content
258    body: Bytes,
259    /// Whether the content has been read
260    done: bool,
261    /// Start position for range requests
262    range_start: usize,
263    /// End position for range requests
264    range_end: usize,
265}
266
267impl CompleteHit {
268    fn get(&mut self) -> Option<Bytes> {
269        if self.done {
270            None
271        } else {
272            self.done = true;
273            Some(self.body.slice(self.range_start..self.range_end))
274        }
275    }
276
277    fn seek(&mut self, start: usize, end: Option<usize>) -> Result<()> {
278        if start >= self.body.len() {
279            return Err(Error::Invalid {
280                message: format!(
281                    "seek start out of range {start} >= {}",
282                    self.body.len()
283                ),
284            });
285        }
286        self.range_start = start;
287        if let Some(end) = end {
288            // end over the actual last byte is allowed, we just need to return the actual bytes
289            self.range_end = std::cmp::min(self.body.len(), end);
290        }
291        // seek resets read so that one handler can be used for multiple ranges
292        self.done = false;
293        Ok(())
294    }
295}
296
297#[async_trait]
298impl HandleHit for CompleteHit {
299    async fn read_body(&mut self) -> pingora::Result<Option<Bytes>> {
300        Ok(self.get())
301    }
302    async fn finish(
303        self: Box<Self>, // because self is always used as a trait object
304        _storage: &'static (dyn Storage + Sync),
305        _key: &CacheKey,
306        _trace: &SpanHandle,
307    ) -> pingora::Result<()> {
308        Ok(())
309    }
310
311    fn can_seek(&self) -> bool {
312        true
313    }
314
315    fn seek(
316        &mut self,
317        start: usize,
318        end: Option<usize>,
319    ) -> pingora::Result<()> {
320        self.seek(start, end)?;
321        Ok(())
322    }
323
324    fn as_any(&self) -> &(dyn Any + Send + Sync) {
325        self
326    }
327
328    fn as_any_mut(&mut self) -> &mut (dyn Any + Send + Sync) {
329        self
330    }
331}
332
333/// Handles cache misses by collecting and storing new content
334pub struct ObjectMissHandler {
335    /// Metadata to store with the cached content
336    meta: BinaryMeta,
337    /// Buffer for collecting the body content
338    body: BytesMut,
339    /// Cache key for storing the final object
340    key: String,
341    /// Primary key for storing the final object
342    primary_key: String,
343    /// Namespace for storing the final object
344    namespace: Vec<u8>,
345    /// Reference to the storage backend
346    cache: Arc<dyn HttpCacheStorage>,
347}
348
349#[async_trait]
350impl HandleMiss for ObjectMissHandler {
351    async fn write_body(
352        &mut self,
353        data: bytes::Bytes,
354        _eof: bool,
355    ) -> pingora::Result<()> {
356        self.body.extend(&data);
357        Ok(())
358    }
359
360    async fn finish(self: Box<Self>) -> pingora::Result<MissFinishType> {
361        let size = self.body.len(); // FIXME: this just body size, also track meta size
362        info!(
363            category = LOG_CATEGORY,
364            key = self.key,
365            primary_key = self.primary_key,
366            namespace = std::str::from_utf8(&self.namespace).ok(),
367            size,
368            "put data to cache"
369        );
370        let _ = self
371            .cache
372            .put(
373                &self.key,
374                &self.namespace,
375                CacheObject {
376                    meta: self.meta,
377                    body: self.body.into(),
378                },
379            )
380            .await?;
381
382        Ok(MissFinishType::Created(size))
383    }
384}
385
386#[async_trait]
387impl Storage for HttpCache {
388    async fn lookup(
389        &'static self,
390        key: &CacheKey,
391        _trace: &SpanHandle,
392    ) -> pingora::Result<Option<(CacheMeta, HitHandler)>> {
393        let namespace = key.namespace();
394        let hash = key.combined();
395        if let Some(obj) = self.cache.get(&hash, namespace).await? {
396            let meta = CacheMeta::deserialize(&obj.meta.0, &obj.meta.1)?;
397            let size = obj.body.len();
398            let hit_handler = CompleteHit {
399                body: obj.body,
400                done: false,
401                range_start: 0,
402                range_end: size,
403            };
404            Ok(Some((meta, Box::new(hit_handler))))
405        } else {
406            Ok(None)
407        }
408    }
409
410    async fn get_miss_handler(
411        &'static self,
412        key: &CacheKey,
413        meta: &CacheMeta,
414        _trace: &SpanHandle,
415    ) -> pingora::Result<MissHandler> {
416        // TODO: support multiple concurrent writes or panic if the is already a writer
417        let capacity = 5 * 1024;
418        let size = if let Some(content_length) =
419            meta.headers().get(http::header::CONTENT_LENGTH)
420        {
421            content_length
422                .to_str()
423                .unwrap_or_default()
424                .parse::<usize>()
425                .unwrap_or(capacity)
426        } else {
427            capacity
428        };
429        let hash = key.combined();
430        let meta = meta.serialize()?;
431        let miss_handler = ObjectMissHandler {
432            meta,
433            key: hash,
434            primary_key: key.primary_key_str().unwrap_or_default().to_string(),
435            namespace: key.namespace().to_vec(),
436            cache: self.cache.clone(),
437            body: BytesMut::with_capacity(size),
438        };
439        Ok(Box::new(miss_handler))
440    }
441
442    async fn purge(
443        &'static self,
444        key: &CompactCacheKey,
445        _type: PurgeType,
446        _trace: &SpanHandle,
447    ) -> pingora::Result<bool> {
448        // This usually purges the primary key because, without a lookup,
449        // the variance key is usually empty
450        let hash = key.combined();
451        // TODO get namespace of cache key
452        let cache_removed =
453            if let Ok(result) = self.cache.remove(&hash, b"").await {
454                result.is_some()
455            } else {
456                false
457            };
458        Ok(cache_removed)
459    }
460
461    async fn update_meta(
462        &'static self,
463        key: &CacheKey,
464        meta: &CacheMeta,
465        _trace: &SpanHandle,
466    ) -> pingora::Result<bool> {
467        let namespace = key.namespace();
468        let hash = key.combined();
469        if let Some(mut obj) = self.cache.get(&hash, namespace).await? {
470            obj.meta = meta.serialize()?;
471            let _ = self.cache.put(&hash, namespace, obj).await?;
472            Ok(true)
473        } else {
474            Err(Error::Invalid {
475                message: "no meta found".to_string(),
476            }
477            .into())
478        }
479    }
480
481    fn support_streaming_partial_write(&self) -> bool {
482        false
483    }
484
485    fn as_any(&self) -> &(dyn Any + Send + Sync) {
486        self
487    }
488}
489
490#[cfg(test)]
491mod tests {
492    use super::*;
493    use crate::tiny::new_tiny_ufo_cache;
494    use bytes::{Bytes, BytesMut};
495    use pingora::cache::storage::{HitHandler, MissHandler};
496    use pretty_assertions::assert_eq;
497    use std::sync::Arc;
498
499    #[tokio::test]
500    async fn test_complete_hit() {
501        let body = Bytes::from_static(b"Hello World!");
502        let size = body.len();
503        let hit = CompleteHit {
504            body,
505            done: false,
506            range_start: 0,
507            range_end: size,
508        };
509        let mut handle: HitHandler = Box::new(hit);
510        let body = handle.read_body().await.unwrap();
511        assert_eq!(true, body.is_some());
512        assert_eq!(b"Hello World!", body.unwrap().as_ref());
513
514        handle.seek(1, Some(size - 1)).unwrap();
515        let body = handle.read_body().await.unwrap();
516        assert_eq!(true, body.is_some());
517        assert_eq!(b"ello World", body.unwrap().as_ref());
518    }
519
520    #[tokio::test]
521    async fn test_object_miss_handler() {
522        let key = "key";
523
524        let cache = Arc::new(new_tiny_ufo_cache("", 10, 10));
525        let obj = ObjectMissHandler {
526            meta: (b"Hello".to_vec(), b"World".to_vec()),
527            body: BytesMut::new(),
528            key: key.to_string(),
529            primary_key: "".to_string(),
530            namespace: b"".to_vec(),
531            cache: cache.clone(),
532        };
533        let mut handle: MissHandler = Box::new(obj);
534
535        handle
536            .write_body(Bytes::from_static(b"Hello World!"), true)
537            .await
538            .unwrap();
539        handle.finish().await.unwrap();
540
541        let data = cache.get(key, b"").await.unwrap().unwrap();
542        assert_eq!("Hello World!", std::str::from_utf8(&data.body).unwrap());
543    }
544
545    #[test]
546    fn test_cache_object_get_weight() {
547        // data less than one page
548        let obj = CacheObject {
549            meta: (b"Hello".to_vec(), b"World".to_vec()),
550            body: Bytes::from_static(b"Hello World!"),
551        };
552        assert_eq!(1, obj.get_weight());
553
554        let obj = CacheObject {
555            meta: (b"Hello".to_vec(), b"World".to_vec()),
556            body: vec![0; PAGE_SIZE * 2].into(),
557        };
558        assert_eq!(2, obj.get_weight());
559
560        // data larger than max size
561        let obj = CacheObject {
562            meta: (b"Hello".to_vec(), b"World".to_vec()),
563            body: vec![0; MAX_OBJECT_CACHE_SIZE + 1].into(),
564        };
565        assert_eq!(u16::MAX, obj.get_weight());
566    }
567}