pingora_cache/
meta.rs

1// Copyright 2025 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Metadata for caching
16
17pub use http::Extensions;
18use log::warn;
19use once_cell::sync::{Lazy, OnceCell};
20use pingora_error::{Error, ErrorType::*, OrErr, Result};
21use pingora_header_serde::HeaderSerde;
22use pingora_http::{HMap, ResponseHeader};
23use serde::{Deserialize, Serialize};
24use std::borrow::Cow;
25use std::time::{Duration, SystemTime};
26
27use crate::key::HashBinary;
28
29pub(crate) type InternalMeta = internal_meta::InternalMetaLatest;
30mod internal_meta {
31    use super::*;
32
33    pub(crate) type InternalMetaLatest = InternalMetaV2;
34
35    #[derive(Debug, Deserialize, Serialize, Clone)]
36    pub(crate) struct InternalMetaV0 {
37        pub(crate) fresh_until: SystemTime,
38        pub(crate) created: SystemTime,
39        pub(crate) stale_while_revalidate_sec: u32,
40        pub(crate) stale_if_error_sec: u32,
41        // Do not add more field
42    }
43
44    impl InternalMetaV0 {
45        #[allow(dead_code)]
46        fn serialize(&self) -> Result<Vec<u8>> {
47            rmp_serde::encode::to_vec(self).or_err(InternalError, "failed to encode cache meta")
48        }
49
50        fn deserialize(buf: &[u8]) -> Result<Self> {
51            rmp_serde::decode::from_slice(buf)
52                .or_err(InternalError, "failed to decode cache meta v0")
53        }
54    }
55
56    #[derive(Debug, Deserialize, Serialize, Clone)]
57    pub(crate) struct InternalMetaV1 {
58        pub(crate) version: u8,
59        pub(crate) fresh_until: SystemTime,
60        pub(crate) created: SystemTime,
61        pub(crate) stale_while_revalidate_sec: u32,
62        pub(crate) stale_if_error_sec: u32,
63        // Do not add more field
64    }
65
66    impl InternalMetaV1 {
67        #[allow(dead_code)]
68        pub const VERSION: u8 = 1;
69
70        #[allow(dead_code)]
71        pub fn serialize(&self) -> Result<Vec<u8>> {
72            assert_eq!(self.version, 1);
73            rmp_serde::encode::to_vec(self).or_err(InternalError, "failed to encode cache meta")
74        }
75
76        fn deserialize(buf: &[u8]) -> Result<Self> {
77            rmp_serde::decode::from_slice(buf)
78                .or_err(InternalError, "failed to decode cache meta v1")
79        }
80    }
81
82    #[derive(Debug, Deserialize, Serialize, Clone)]
83    pub(crate) struct InternalMetaV2 {
84        pub(crate) version: u8,
85        pub(crate) fresh_until: SystemTime,
86        pub(crate) created: SystemTime,
87        pub(crate) updated: SystemTime,
88        pub(crate) stale_while_revalidate_sec: u32,
89        pub(crate) stale_if_error_sec: u32,
90        // Only the extended field to be added below. One field at a time.
91        // 1. serde default in order to accept an older version schema without the field existing
92        // 2. serde skip_serializing_if in order for software with only an older version of this
93        //    schema to decode it
94        // After full releases, remove `skip_serializing_if` so that we can add the next extended field.
95        #[serde(default)]
96        #[serde(skip_serializing_if = "Option::is_none")]
97        pub(crate) variance: Option<HashBinary>,
98    }
99
100    impl Default for InternalMetaV2 {
101        fn default() -> Self {
102            let epoch = SystemTime::UNIX_EPOCH;
103            InternalMetaV2 {
104                version: InternalMetaV2::VERSION,
105                fresh_until: epoch,
106                created: epoch,
107                updated: epoch,
108                stale_while_revalidate_sec: 0,
109                stale_if_error_sec: 0,
110                variance: None,
111            }
112        }
113    }
114
115    impl InternalMetaV2 {
116        pub const VERSION: u8 = 2;
117
118        pub fn serialize(&self) -> Result<Vec<u8>> {
119            assert_eq!(self.version, Self::VERSION);
120            rmp_serde::encode::to_vec(self).or_err(InternalError, "failed to encode cache meta")
121        }
122
123        fn deserialize(buf: &[u8]) -> Result<Self> {
124            rmp_serde::decode::from_slice(buf)
125                .or_err(InternalError, "failed to decode cache meta v2")
126        }
127    }
128
129    impl From<InternalMetaV0> for InternalMetaV2 {
130        fn from(v0: InternalMetaV0) -> Self {
131            InternalMetaV2 {
132                version: InternalMetaV2::VERSION,
133                fresh_until: v0.fresh_until,
134                created: v0.created,
135                updated: v0.created,
136                stale_while_revalidate_sec: v0.stale_while_revalidate_sec,
137                stale_if_error_sec: v0.stale_if_error_sec,
138                ..Default::default()
139            }
140        }
141    }
142
143    impl From<InternalMetaV1> for InternalMetaV2 {
144        fn from(v1: InternalMetaV1) -> Self {
145            InternalMetaV2 {
146                version: InternalMetaV2::VERSION,
147                fresh_until: v1.fresh_until,
148                created: v1.created,
149                updated: v1.created,
150                stale_while_revalidate_sec: v1.stale_while_revalidate_sec,
151                stale_if_error_sec: v1.stale_if_error_sec,
152                ..Default::default()
153            }
154        }
155    }
156
157    // cross version decode
158    pub(crate) fn deserialize(buf: &[u8]) -> Result<InternalMetaLatest> {
159        const MIN_SIZE: usize = 10; // a small number to read the first few bytes
160        if buf.len() < MIN_SIZE {
161            return Error::e_explain(
162                InternalError,
163                format!("Buf too short ({}) to be InternalMeta", buf.len()),
164            );
165        }
166        let preread_buf = &mut &buf[..MIN_SIZE];
167        // the struct is always packed as a fixed size array
168        match rmp::decode::read_array_len(preread_buf)
169            .or_err(InternalError, "failed to decode cache meta array size")?
170        {
171            // v0 has 4 items and no version number
172            4 => Ok(InternalMetaV0::deserialize(buf)?.into()),
173            // other V should have version number encoded
174            _ => {
175                // rmp will encode `version` < 128 into a fixint (one byte),
176                // so we use read_pfix
177                let version = rmp::decode::read_pfix(preread_buf)
178                    .or_err(InternalError, "failed to decode meta version")?;
179                match version {
180                    1 => Ok(InternalMetaV1::deserialize(buf)?.into()),
181                    2 => InternalMetaV2::deserialize(buf),
182                    _ => Error::e_explain(
183                        InternalError,
184                        format!("Unknown InternalMeta version {version}"),
185                    ),
186                }
187            }
188        }
189    }
190
191    #[cfg(test)]
192    mod tests {
193        use super::*;
194
195        #[test]
196        fn test_internal_meta_serde_v0() {
197            let meta = InternalMetaV0 {
198                fresh_until: SystemTime::now(),
199                created: SystemTime::now(),
200                stale_while_revalidate_sec: 0,
201                stale_if_error_sec: 0,
202            };
203            let binary = meta.serialize().unwrap();
204            let meta2 = InternalMetaV0::deserialize(&binary).unwrap();
205            assert_eq!(meta.fresh_until, meta2.fresh_until);
206        }
207
208        #[test]
209        fn test_internal_meta_serde_v1() {
210            let meta = InternalMetaV1 {
211                version: InternalMetaV1::VERSION,
212                fresh_until: SystemTime::now(),
213                created: SystemTime::now(),
214                stale_while_revalidate_sec: 0,
215                stale_if_error_sec: 0,
216            };
217            let binary = meta.serialize().unwrap();
218            let meta2 = InternalMetaV1::deserialize(&binary).unwrap();
219            assert_eq!(meta.fresh_until, meta2.fresh_until);
220        }
221
222        #[test]
223        fn test_internal_meta_serde_v2() {
224            let meta = InternalMetaV2::default();
225            let binary = meta.serialize().unwrap();
226            let meta2 = InternalMetaV2::deserialize(&binary).unwrap();
227            assert_eq!(meta2.version, 2);
228            assert_eq!(meta.fresh_until, meta2.fresh_until);
229            assert_eq!(meta.created, meta2.created);
230            assert_eq!(meta.updated, meta2.updated);
231        }
232
233        #[test]
234        fn test_internal_meta_serde_across_versions() {
235            let meta = InternalMetaV0 {
236                fresh_until: SystemTime::now(),
237                created: SystemTime::now(),
238                stale_while_revalidate_sec: 0,
239                stale_if_error_sec: 0,
240            };
241            let binary = meta.serialize().unwrap();
242            let meta2 = deserialize(&binary).unwrap();
243            assert_eq!(meta2.version, 2);
244            assert_eq!(meta.fresh_until, meta2.fresh_until);
245
246            let meta = InternalMetaV1 {
247                version: 1,
248                fresh_until: SystemTime::now(),
249                created: SystemTime::now(),
250                stale_while_revalidate_sec: 0,
251                stale_if_error_sec: 0,
252            };
253            let binary = meta.serialize().unwrap();
254            let meta2 = deserialize(&binary).unwrap();
255            assert_eq!(meta2.version, 2);
256            assert_eq!(meta.fresh_until, meta2.fresh_until);
257            // `updated` == `created` when upgrading to v2
258            assert_eq!(meta2.created, meta2.updated);
259        }
260
261        #[test]
262        fn test_internal_meta_serde_v2_extend_fields() {
263            // make sure that v2 format is backward compatible
264            // this is the base version of v2 without any extended fields
265            #[derive(Deserialize, Serialize)]
266            pub(crate) struct InternalMetaV2Base {
267                pub(crate) version: u8,
268                pub(crate) fresh_until: SystemTime,
269                pub(crate) created: SystemTime,
270                pub(crate) updated: SystemTime,
271                pub(crate) stale_while_revalidate_sec: u32,
272                pub(crate) stale_if_error_sec: u32,
273            }
274
275            impl InternalMetaV2Base {
276                pub const VERSION: u8 = 2;
277                pub fn serialize(&self) -> Result<Vec<u8>> {
278                    assert!(self.version >= Self::VERSION);
279                    rmp_serde::encode::to_vec(self)
280                        .or_err(InternalError, "failed to encode cache meta")
281                }
282                fn deserialize(buf: &[u8]) -> Result<Self> {
283                    rmp_serde::decode::from_slice(buf)
284                        .or_err(InternalError, "failed to decode cache meta v2")
285                }
286            }
287
288            // ext V2 to base v2
289            let meta = InternalMetaV2::default();
290            let binary = meta.serialize().unwrap();
291            let meta2 = InternalMetaV2Base::deserialize(&binary).unwrap();
292            assert_eq!(meta2.version, 2);
293            assert_eq!(meta.fresh_until, meta2.fresh_until);
294            assert_eq!(meta.created, meta2.created);
295            assert_eq!(meta.updated, meta2.updated);
296
297            // base V2 to ext v2
298            let now = SystemTime::now();
299            let meta = InternalMetaV2Base {
300                version: InternalMetaV2::VERSION,
301                fresh_until: now,
302                created: now,
303                updated: now,
304                stale_while_revalidate_sec: 0,
305                stale_if_error_sec: 0,
306            };
307            let binary = meta.serialize().unwrap();
308            let meta2 = InternalMetaV2::deserialize(&binary).unwrap();
309            assert_eq!(meta2.version, 2);
310            assert_eq!(meta.fresh_until, meta2.fresh_until);
311            assert_eq!(meta.created, meta2.created);
312            assert_eq!(meta.updated, meta2.updated);
313        }
314    }
315}
316
317#[derive(Debug)]
318pub(crate) struct CacheMetaInner {
319    // http header and Internal meta have different ways of serialization, so keep them separated
320    pub(crate) internal: InternalMeta,
321    pub(crate) header: ResponseHeader,
322    /// An opaque type map to hold extra information for communication between cache backends
323    /// and users. This field is **not** guaranteed be persistently stored in the cache backend.
324    pub extensions: Extensions,
325}
326
327/// The cacheable response header and cache metadata
328#[derive(Debug)]
329pub struct CacheMeta(pub(crate) Box<CacheMetaInner>);
330
331impl CacheMeta {
332    /// Create a [CacheMeta] from the given metadata and the response header
333    pub fn new(
334        fresh_until: SystemTime,
335        created: SystemTime,
336        stale_while_revalidate_sec: u32,
337        stale_if_error_sec: u32,
338        header: ResponseHeader,
339    ) -> CacheMeta {
340        CacheMeta(Box::new(CacheMetaInner {
341            internal: InternalMeta {
342                version: InternalMeta::VERSION,
343                fresh_until,
344                created,
345                updated: created, // created == updated for new meta
346                stale_while_revalidate_sec,
347                stale_if_error_sec,
348                ..Default::default()
349            },
350            header,
351            extensions: Extensions::new(),
352        }))
353    }
354
355    /// When the asset was created/admitted to cache
356    pub fn created(&self) -> SystemTime {
357        self.0.internal.created
358    }
359
360    /// The last time the asset was revalidated
361    ///
362    /// This value will be the same as [Self::created()] if no revalidation ever happens
363    pub fn updated(&self) -> SystemTime {
364        self.0.internal.updated
365    }
366
367    /// Is the asset still valid
368    pub fn is_fresh(&self, time: SystemTime) -> bool {
369        // NOTE: HTTP cache time resolution is second
370        self.0.internal.fresh_until >= time
371    }
372
373    /// How long (in seconds) the asset should be fresh since its admission/revalidation
374    ///
375    /// This is essentially the max-age value (or its equivalence)
376    pub fn fresh_sec(&self) -> u64 {
377        // swallow `duration_since` error, assets that are always stale have earlier `fresh_until` than `created`
378        // practically speaking we can always treat these as 0 ttl
379        // XXX: return Error if `fresh_until` is much earlier than expected?
380        self.0
381            .internal
382            .fresh_until
383            .duration_since(self.0.internal.updated)
384            .map_or(0, |duration| duration.as_secs())
385    }
386
387    /// Until when the asset is considered fresh
388    pub fn fresh_until(&self) -> SystemTime {
389        self.0.internal.fresh_until
390    }
391
392    /// How old the asset is since its admission/revalidation
393    pub fn age(&self) -> Duration {
394        SystemTime::now()
395            .duration_since(self.updated())
396            .unwrap_or_default()
397    }
398
399    /// The stale-while-revalidate limit in seconds
400    pub fn stale_while_revalidate_sec(&self) -> u32 {
401        self.0.internal.stale_while_revalidate_sec
402    }
403
404    /// The stale-if-error limit in seconds
405    pub fn stale_if_error_sec(&self) -> u32 {
406        self.0.internal.stale_if_error_sec
407    }
408
409    /// Can the asset be used to serve stale during revalidation at the given time.
410    ///
411    /// NOTE: the serve stale functions do not check !is_fresh(time),
412    /// i.e. the object is already assumed to be stale.
413    pub fn serve_stale_while_revalidate(&self, time: SystemTime) -> bool {
414        self.can_serve_stale(self.0.internal.stale_while_revalidate_sec, time)
415    }
416
417    /// Can the asset be used to serve stale after error at the given time.
418    ///
419    /// NOTE: the serve stale functions do not check !is_fresh(time),
420    /// i.e. the object is already assumed to be stale.
421    pub fn serve_stale_if_error(&self, time: SystemTime) -> bool {
422        self.can_serve_stale(self.0.internal.stale_if_error_sec, time)
423    }
424
425    /// Disable serve stale for this asset
426    pub fn disable_serve_stale(&mut self) {
427        self.0.internal.stale_if_error_sec = 0;
428        self.0.internal.stale_while_revalidate_sec = 0;
429    }
430
431    /// Get the variance hash of this asset
432    pub fn variance(&self) -> Option<HashBinary> {
433        self.0.internal.variance
434    }
435
436    /// Set the variance key of this asset
437    pub fn set_variance_key(&mut self, variance_key: HashBinary) {
438        self.0.internal.variance = Some(variance_key);
439    }
440
441    /// Set the variance (hash) of this asset
442    pub fn set_variance(&mut self, variance: HashBinary) {
443        self.0.internal.variance = Some(variance)
444    }
445
446    /// Removes the variance (hash) of this asset
447    pub fn remove_variance(&mut self) {
448        self.0.internal.variance = None
449    }
450
451    /// Get the response header in this asset
452    pub fn response_header(&self) -> &ResponseHeader {
453        &self.0.header
454    }
455
456    /// Modify the header in this asset
457    pub fn response_header_mut(&mut self) -> &mut ResponseHeader {
458        &mut self.0.header
459    }
460
461    /// Expose the extensions to read
462    pub fn extensions(&self) -> &Extensions {
463        &self.0.extensions
464    }
465
466    /// Expose the extensions to modify
467    pub fn extensions_mut(&mut self) -> &mut Extensions {
468        &mut self.0.extensions
469    }
470
471    /// Get a copy of the response header
472    pub fn response_header_copy(&self) -> ResponseHeader {
473        self.0.header.clone()
474    }
475
476    /// get all the headers of this asset
477    pub fn headers(&self) -> &HMap {
478        &self.0.header.headers
479    }
480
481    fn can_serve_stale(&self, serve_stale_sec: u32, time: SystemTime) -> bool {
482        if serve_stale_sec == 0 {
483            return false;
484        }
485        if let Some(stale_until) = self
486            .0
487            .internal
488            .fresh_until
489            .checked_add(Duration::from_secs(serve_stale_sec.into()))
490        {
491            stale_until >= time
492        } else {
493            // overflowed: treat as infinite ttl
494            true
495        }
496    }
497
498    /// Serialize this object
499    pub fn serialize(&self) -> Result<(Vec<u8>, Vec<u8>)> {
500        let internal = self.0.internal.serialize()?;
501        let header = header_serialize(&self.0.header)?;
502        Ok((internal, header))
503    }
504
505    /// Deserialize from the binary format
506    pub fn deserialize(internal: &[u8], header: &[u8]) -> Result<Self> {
507        let internal = internal_meta::deserialize(internal)?;
508        let header = header_deserialize(header)?;
509        Ok(CacheMeta(Box::new(CacheMetaInner {
510            internal,
511            header,
512            extensions: Extensions::new(),
513        })))
514    }
515}
516
517use http::StatusCode;
518
519/// The function to generate TTL from the given [StatusCode].
520pub type FreshDurationByStatusFn = fn(StatusCode) -> Option<Duration>;
521
522/// The default settings to generate [CacheMeta]
523pub struct CacheMetaDefaults {
524    // if a status code is not included in fresh_sec, it's not considered cacheable by default.
525    fresh_sec_fn: FreshDurationByStatusFn,
526    stale_while_revalidate_sec: u32,
527    // TODO: allow "error" condition to be configurable?
528    stale_if_error_sec: u32,
529}
530
531impl CacheMetaDefaults {
532    /// Create a new [CacheMetaDefaults]
533    pub const fn new(
534        fresh_sec_fn: FreshDurationByStatusFn,
535        stale_while_revalidate_sec: u32,
536        stale_if_error_sec: u32,
537    ) -> Self {
538        CacheMetaDefaults {
539            fresh_sec_fn,
540            stale_while_revalidate_sec,
541            stale_if_error_sec,
542        }
543    }
544
545    /// Return the default TTL for the given [StatusCode]
546    ///
547    /// `None`: do no cache this code.
548    pub fn fresh_sec(&self, resp_status: StatusCode) -> Option<Duration> {
549        // safe guard to make sure 304 response to share the same default ttl of 200
550        if resp_status == StatusCode::NOT_MODIFIED {
551            (self.fresh_sec_fn)(StatusCode::OK)
552        } else {
553            (self.fresh_sec_fn)(resp_status)
554        }
555    }
556
557    /// The default SWR seconds
558    pub fn serve_stale_while_revalidate_sec(&self) -> u32 {
559        self.stale_while_revalidate_sec
560    }
561
562    /// The default SIE seconds
563    pub fn serve_stale_if_error_sec(&self) -> u32 {
564        self.stale_if_error_sec
565    }
566}
567
568/// The dictionary content for header compression.
569///
570/// Used during initialization of [`HEADER_SERDE`].
571static COMPRESSION_DICT_CONTENT: OnceCell<Cow<'static, [u8]>> = OnceCell::new();
572
573static HEADER_SERDE: Lazy<HeaderSerde> = Lazy::new(|| {
574    let dict_opt = if let Some(dict_content) = COMPRESSION_DICT_CONTENT.get() {
575        Some(dict_content.to_vec())
576    } else {
577        warn!("no header compression dictionary loaded - use set_compression_dict_content() or set_compression_dict_path() to set one");
578        None
579    };
580
581    HeaderSerde::new(dict_opt)
582});
583
584pub(crate) fn header_serialize(header: &ResponseHeader) -> Result<Vec<u8>> {
585    HEADER_SERDE.serialize(header)
586}
587
588pub(crate) fn header_deserialize<T: AsRef<[u8]>>(buf: T) -> Result<ResponseHeader> {
589    HEADER_SERDE.deserialize(buf.as_ref())
590}
591
592/// Load the header compression dictionary from a file, which helps serialize http header.
593///
594/// Returns false if it is already set or if the file could not be read.
595///
596/// Use [`set_compression_dict_content`] to set the dictionary from memory instead.
597pub fn set_compression_dict_path(path: &str) -> bool {
598    match std::fs::read(path) {
599        Ok(dict) => COMPRESSION_DICT_CONTENT.set(dict.into()).is_ok(),
600        Err(e) => {
601            warn!(
602                "failed to read header compress dictionary file at {}, {:?}",
603                path, e
604            );
605            false
606        }
607    }
608}
609
610/// Set the header compression dictionary content, which helps serialize http header.
611///
612/// Returns false if it is already set.
613///
614/// This is an alernative to [`set_compression_dict_path`], allowing use of
615/// a dictionary without an external file.
616pub fn set_compression_dict_content(content: Cow<'static, [u8]>) -> bool {
617    COMPRESSION_DICT_CONTENT.set(content).is_ok()
618}