Skip to main content

pingap_cache/
http_cache.rs

1// Copyright 2024-2025 Tree xie.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::LOG_TARGET;
16use super::{Error, PAGE_SIZE, Result};
17use crate::get_file_backends;
18use async_trait::async_trait;
19use bytes::{Buf, BufMut, Bytes, BytesMut};
20use pingap_core::BackgroundTask;
21use pingap_core::Error as ServiceError;
22use pingora::cache::key::{CacheHashKey, CompactCacheKey};
23use pingora::cache::storage::MissFinishType;
24use pingora::cache::storage::{HandleHit, HandleMiss};
25use pingora::cache::trace::SpanHandle;
26use pingora::cache::{
27    CacheKey, CacheMeta, HitHandler, MissHandler, PurgeType, Storage,
28};
29use std::any::Any;
30use std::sync::Arc;
31use std::time::Duration;
32use std::time::SystemTime;
33use tracing::info;
34
35type BinaryMeta = (Vec<u8>, Vec<u8>);
36
37/// Represents a cached object containing metadata and body content
38#[derive(Debug, Clone, Default, PartialEq)]
39pub struct CacheObject {
40    /// Tuple containing two metadata byte vectors (meta0, meta1)
41    pub meta: BinaryMeta,
42    /// The actual cached content
43    pub body: Bytes,
44}
45
46// Maximum size for a single cached object (40MB)
47static MAX_OBJECT_CACHE_SIZE: usize = 10 * 1024 * PAGE_SIZE;
48
49impl CacheObject {
50    pub fn get_weight(&self) -> u16 {
51        let size = self.body.len() + self.meta.0.len() + self.meta.1.len();
52        if size <= PAGE_SIZE {
53            return 1;
54        }
55        if size >= MAX_OBJECT_CACHE_SIZE {
56            return u16::MAX;
57        }
58        (size / PAGE_SIZE) as u16
59    }
60}
61
62const META_SIZE_LENGTH: usize = 8;
63
64/// Creates a CacheObject from bytes with the following format:
65/// - First 4 bytes: meta0 size (u32)
66/// - Next 4 bytes: meta1 size (u32)
67/// - Next meta0_size bytes: meta0 data
68/// - Next meta1_size bytes: meta1 data
69/// - Remaining bytes: body data
70impl From<Bytes> for CacheObject {
71    fn from(value: Bytes) -> Self {
72        // 8 bytes
73        if value.len() < META_SIZE_LENGTH {
74            return Self::default();
75        }
76        let mut data = value;
77
78        let meta0_size = data.get_u32() as usize;
79        let meta1_size = data.get_u32() as usize;
80
81        let meta0 = data.split_to(meta0_size).to_vec();
82        let meta1 = data.split_to(meta1_size).to_vec();
83
84        Self {
85            meta: (meta0, meta1),
86            body: data,
87        }
88    }
89}
90/// Converts a CacheObject into bytes with the following format:
91/// - First 4 bytes: meta0 size (u32)
92/// - Next 4 bytes: meta1 size (u32)
93/// - Next meta0_size bytes: meta0 data
94/// - Next meta1_size bytes: meta1 data
95/// - Remaining bytes: body data
96impl From<CacheObject> for Bytes {
97    fn from(value: CacheObject) -> Self {
98        let meta_size =
99            value.meta.0.len() + value.meta.1.len() + META_SIZE_LENGTH;
100        let mut buf = BytesMut::with_capacity(value.body.len() + meta_size);
101        let meta0_size = value.meta.0.len() as u32;
102        let meta1_size = value.meta.1.len() as u32;
103        buf.put_u32(meta0_size);
104        buf.put_u32(meta1_size);
105        buf.extend(value.meta.0);
106        buf.extend(value.meta.1);
107        buf.extend(value.body.iter());
108
109        buf.into()
110    }
111}
112
113#[derive(Debug)]
114pub struct HttpCacheStats {
115    pub reading: u32,
116    pub writing: u32,
117}
118
119#[derive(Debug, Clone)]
120pub struct HttpCacheClearStats {
121    pub success: i32,
122    pub fail: i32,
123    pub description: String,
124}
125
126/// Storage interface for HTTP caching operations
127///
128/// This trait defines the core operations needed to implement a storage backend
129/// for HTTP caching. Implementations must be both `Send` and `Sync` to support
130/// concurrent access.
131#[async_trait]
132pub trait HttpCacheStorage: Sync + Send {
133    /// Retrieves a cached object from storage
134    /// Returns None if not found or Some(CacheObject) if present
135    async fn get(
136        &self,
137        key: &str,
138        namespace: &[u8],
139    ) -> Result<Option<CacheObject>>;
140
141    /// Stores a cache object with the given key and namespace
142    async fn put(
143        &self,
144        key: &str,
145        namespace: &[u8],
146        data: CacheObject,
147    ) -> Result<()>;
148
149    /// Removes a cached object from storage.
150    ///
151    /// # Arguments
152    /// * `key` - The unique identifier for the cached object
153    /// * `namespace` - The namespace to scope the cache key
154    ///
155    /// # Returns
156    /// * `Result<Option<CacheObject>>` - The removed object if it existed
157    async fn remove(
158        &self,
159        _key: &str,
160        _namespace: &[u8],
161    ) -> Result<Option<CacheObject>> {
162        Ok(None)
163    }
164
165    /// Clears cached objects accessed before the specified time.
166    ///
167    /// # Arguments
168    /// * `access_before` - Remove items last accessed before this timestamp
169    ///
170    /// # Returns
171    /// * `Result<HttpCacheClearStats>` - Clear stats
172    async fn clear(
173        &self,
174        _access_before: std::time::SystemTime,
175    ) -> Result<HttpCacheClearStats> {
176        Ok(HttpCacheClearStats {
177            success: -1,
178            fail: -1,
179            description: "".to_string(),
180        })
181    }
182
183    /// Returns current storage statistics.
184    ///
185    /// # Returns
186    /// * `Option<HttpCacheStats>` - Current read/write statistics if available
187    fn stats(&self) -> Option<HttpCacheStats> {
188        None
189    }
190
191    /// Returns the inactive duration for the cache storage.
192    ///
193    /// # Returns
194    /// * `Option<Duration>` - The inactive duration for the cache storage
195    fn inactive(&self) -> Option<Duration> {
196        None
197    }
198}
199
200async fn do_file_storage_clear(count: u32) -> Result<bool, ServiceError> {
201    // Add 1 every loop
202    let offset = 60;
203    if count % offset != 0 {
204        return Ok(false);
205    }
206
207    let backends = get_file_backends();
208    for backend in backends {
209        let cache = &backend.cache;
210        let Some(inactive_duration) = cache.inactive() else {
211            continue;
212        };
213
214        let Some(access_before) =
215            SystemTime::now().checked_sub(inactive_duration)
216        else {
217            return Ok(false);
218        };
219
220        let Ok(stats) = cache.clear(access_before).await else {
221            return Ok(true);
222        };
223        info!(
224            target: LOG_TARGET,
225            success = stats.success,
226            fail = stats.fail,
227            description = stats.description,
228        );
229    }
230    Ok(true)
231}
232
233struct StorageClearTask {}
234
235#[async_trait]
236impl BackgroundTask for StorageClearTask {
237    async fn execute(&self, count: u32) -> Result<bool, ServiceError> {
238        do_file_storage_clear(count).await?;
239        Ok(true)
240    }
241}
242
243pub fn new_storage_clear_service() -> Option<Box<dyn BackgroundTask>> {
244    Some(Box::new(StorageClearTask {}))
245}
246
247pub struct HttpCache {
248    pub directory: Option<String>,
249    pub cache: Arc<dyn HttpCacheStorage>,
250    pub max_size: u64,
251}
252
253impl HttpCache {
254    #[inline]
255    pub fn stats(&self) -> Option<HttpCacheStats> {
256        self.cache.stats()
257    }
258    pub fn max_size(&self) -> u64 {
259        self.max_size
260    }
261}
262
263/// Handles cache hits by managing access to cached content
264pub struct CompleteHit {
265    /// The cached content
266    body: Bytes,
267    /// Whether the content has been read
268    done: bool,
269    /// Start position for range requests
270    range_start: usize,
271    /// End position for range requests
272    range_end: usize,
273}
274
275impl CompleteHit {
276    fn get(&mut self) -> Option<Bytes> {
277        if self.done {
278            None
279        } else {
280            self.done = true;
281            Some(self.body.slice(self.range_start..self.range_end))
282        }
283    }
284
285    fn seek(&mut self, start: usize, end: Option<usize>) -> Result<()> {
286        if start >= self.body.len() {
287            return Err(Error::Invalid {
288                message: format!(
289                    "seek start out of range {start} >= {}",
290                    self.body.len()
291                ),
292            });
293        }
294        self.range_start = start;
295        if let Some(end) = end {
296            // end over the actual last byte is allowed, we just need to return the actual bytes
297            self.range_end = std::cmp::min(self.body.len(), end);
298        }
299        // seek resets read so that one handler can be used for multiple ranges
300        self.done = false;
301        Ok(())
302    }
303}
304
305#[async_trait]
306impl HandleHit for CompleteHit {
307    async fn read_body(&mut self) -> pingora::Result<Option<Bytes>> {
308        Ok(self.get())
309    }
310    async fn finish(
311        self: Box<Self>, // because self is always used as a trait object
312        _storage: &'static (dyn Storage + Sync),
313        _key: &CacheKey,
314        _trace: &SpanHandle,
315    ) -> pingora::Result<()> {
316        Ok(())
317    }
318
319    fn can_seek(&self) -> bool {
320        true
321    }
322
323    fn seek(
324        &mut self,
325        start: usize,
326        end: Option<usize>,
327    ) -> pingora::Result<()> {
328        self.seek(start, end)?;
329        Ok(())
330    }
331
332    fn as_any(&self) -> &(dyn Any + Send + Sync) {
333        self
334    }
335
336    fn as_any_mut(&mut self) -> &mut (dyn Any + Send + Sync) {
337        self
338    }
339}
340
341/// Handles cache misses by collecting and storing new content
342pub struct ObjectMissHandler {
343    /// Metadata to store with the cached content
344    meta: BinaryMeta,
345    /// Buffer for collecting the body content
346    body: BytesMut,
347    /// Cache key for storing the final object
348    key: String,
349    /// Primary key for storing the final object
350    primary_key: String,
351    /// Namespace for storing the final object
352    namespace: Vec<u8>,
353    /// Reference to the storage backend
354    cache: Arc<dyn HttpCacheStorage>,
355}
356
357#[async_trait]
358impl HandleMiss for ObjectMissHandler {
359    async fn write_body(
360        &mut self,
361        data: bytes::Bytes,
362        _eof: bool,
363    ) -> pingora::Result<()> {
364        self.body.extend(&data);
365        Ok(())
366    }
367
368    async fn finish(self: Box<Self>) -> pingora::Result<MissFinishType> {
369        let size = self.body.len(); // FIXME: this just body size, also track meta size
370        info!(
371            target: LOG_TARGET,
372            key = self.key,
373            primary_key = self.primary_key,
374            namespace = std::str::from_utf8(&self.namespace).ok(),
375            size,
376            "put data to cache"
377        );
378        let _ = self
379            .cache
380            .put(
381                &self.key,
382                &self.namespace,
383                CacheObject {
384                    meta: self.meta,
385                    body: self.body.into(),
386                },
387            )
388            .await?;
389
390        Ok(MissFinishType::Created(size))
391    }
392}
393
394#[async_trait]
395impl Storage for HttpCache {
396    async fn lookup(
397        &'static self,
398        key: &CacheKey,
399        _trace: &SpanHandle,
400    ) -> pingora::Result<Option<(CacheMeta, HitHandler)>> {
401        let namespace = key.namespace();
402        let hash = key.combined();
403        if let Some(obj) = self.cache.get(&hash, namespace).await? {
404            let meta = CacheMeta::deserialize(&obj.meta.0, &obj.meta.1)?;
405            let size = obj.body.len();
406            let hit_handler = CompleteHit {
407                body: obj.body,
408                done: false,
409                range_start: 0,
410                range_end: size,
411            };
412            Ok(Some((meta, Box::new(hit_handler))))
413        } else {
414            Ok(None)
415        }
416    }
417
418    async fn get_miss_handler(
419        &'static self,
420        key: &CacheKey,
421        meta: &CacheMeta,
422        _trace: &SpanHandle,
423    ) -> pingora::Result<MissHandler> {
424        // TODO: support multiple concurrent writes or panic if the is already a writer
425        let capacity = 5 * 1024;
426        let size = if let Some(content_length) =
427            meta.headers().get(http::header::CONTENT_LENGTH)
428        {
429            content_length
430                .to_str()
431                .unwrap_or_default()
432                .parse::<usize>()
433                .unwrap_or(capacity)
434        } else {
435            capacity
436        };
437        let hash = key.combined();
438        let meta = meta.serialize()?;
439        let miss_handler = ObjectMissHandler {
440            meta,
441            key: hash,
442            primary_key: key.primary_key_str().unwrap_or_default().to_string(),
443            namespace: key.namespace().to_vec(),
444            cache: self.cache.clone(),
445            body: BytesMut::with_capacity(size),
446        };
447        Ok(Box::new(miss_handler))
448    }
449
450    async fn purge(
451        &'static self,
452        key: &CompactCacheKey,
453        _type: PurgeType,
454        _trace: &SpanHandle,
455    ) -> pingora::Result<bool> {
456        // This usually purges the primary key because, without a lookup,
457        // the variance key is usually empty
458        let hash = key.combined();
459        // TODO get namespace of cache key
460        let cache_removed =
461            if let Ok(result) = self.cache.remove(&hash, b"").await {
462                result.is_some()
463            } else {
464                false
465            };
466        Ok(cache_removed)
467    }
468
469    async fn update_meta(
470        &'static self,
471        key: &CacheKey,
472        meta: &CacheMeta,
473        _trace: &SpanHandle,
474    ) -> pingora::Result<bool> {
475        let namespace = key.namespace();
476        let hash = key.combined();
477        if let Some(mut obj) = self.cache.get(&hash, namespace).await? {
478            obj.meta = meta.serialize()?;
479            let _ = self.cache.put(&hash, namespace, obj).await?;
480            Ok(true)
481        } else {
482            Err(Error::Invalid {
483                message: "no meta found".to_string(),
484            }
485            .into())
486        }
487    }
488
489    fn support_streaming_partial_write(&self) -> bool {
490        false
491    }
492
493    fn as_any(&self) -> &(dyn Any + Send + Sync) {
494        self
495    }
496}
497
498#[cfg(test)]
499mod tests {
500    use super::*;
501    use crate::tiny::{CacheMode, TinyUfoCache};
502    use bytes::{Bytes, BytesMut};
503    use pingora::cache::storage::{HitHandler, MissHandler};
504    use pretty_assertions::assert_eq;
505    use std::sync::Arc;
506
507    #[tokio::test]
508    async fn test_complete_hit() {
509        let body = Bytes::from_static(b"Hello World!");
510        let size = body.len();
511        let hit = CompleteHit {
512            body,
513            done: false,
514            range_start: 0,
515            range_end: size,
516        };
517        let mut handle: HitHandler = Box::new(hit);
518        let body = handle.read_body().await.unwrap();
519        assert_eq!(true, body.is_some());
520        assert_eq!(b"Hello World!", body.unwrap().as_ref());
521
522        handle.seek(1, Some(size - 1)).unwrap();
523        let body = handle.read_body().await.unwrap();
524        assert_eq!(true, body.is_some());
525        assert_eq!(b"ello World", body.unwrap().as_ref());
526    }
527
528    #[tokio::test]
529    async fn test_object_miss_handler() {
530        let key = "key";
531
532        let cache = Arc::new(TinyUfoCache::new(CacheMode::Normal, 10, 10));
533        let obj = ObjectMissHandler {
534            meta: (b"Hello".to_vec(), b"World".to_vec()),
535            body: BytesMut::new(),
536            key: key.to_string(),
537            primary_key: "".to_string(),
538            namespace: b"".to_vec(),
539            cache: cache.clone(),
540        };
541        let mut handle: MissHandler = Box::new(obj);
542
543        handle
544            .write_body(Bytes::from_static(b"Hello World!"), true)
545            .await
546            .unwrap();
547        handle.finish().await.unwrap();
548
549        let data = cache.get(key, b"").await.unwrap().unwrap();
550        assert_eq!("Hello World!", std::str::from_utf8(&data.body).unwrap());
551    }
552
553    #[test]
554    fn test_cache_object_get_weight() {
555        // data less than one page
556        let obj = CacheObject {
557            meta: (b"Hello".to_vec(), b"World".to_vec()),
558            body: Bytes::from_static(b"Hello World!"),
559        };
560        assert_eq!(1, obj.get_weight());
561
562        let obj = CacheObject {
563            meta: (b"Hello".to_vec(), b"World".to_vec()),
564            body: vec![0; PAGE_SIZE * 2].into(),
565        };
566        assert_eq!(2, obj.get_weight());
567
568        // data larger than max size
569        let obj = CacheObject {
570            meta: (b"Hello".to_vec(), b"World".to_vec()),
571            body: vec![0; MAX_OBJECT_CACHE_SIZE + 1].into(),
572        };
573        assert_eq!(u16::MAX, obj.get_weight());
574    }
575}