pingora_cache/
meta.rs

1// Copyright 2025 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Metadata for caching
16
17pub use http::Extensions;
18use pingora_error::{Error, ErrorType::*, OrErr, Result};
19use pingora_http::{HMap, ResponseHeader};
20use serde::{Deserialize, Serialize};
21use std::time::{Duration, SystemTime};
22
23use crate::key::HashBinary;
24
25pub(crate) type InternalMeta = internal_meta::InternalMetaLatest;
26mod internal_meta {
27    use super::*;
28
29    pub(crate) type InternalMetaLatest = InternalMetaV2;
30
31    #[derive(Debug, Deserialize, Serialize, Clone)]
32    pub(crate) struct InternalMetaV0 {
33        pub(crate) fresh_until: SystemTime,
34        pub(crate) created: SystemTime,
35        pub(crate) stale_while_revalidate_sec: u32,
36        pub(crate) stale_if_error_sec: u32,
37        // Do not add more field
38    }
39
40    impl InternalMetaV0 {
41        #[allow(dead_code)]
42        fn serialize(&self) -> Result<Vec<u8>> {
43            rmp_serde::encode::to_vec(self).or_err(InternalError, "failed to encode cache meta")
44        }
45
46        fn deserialize(buf: &[u8]) -> Result<Self> {
47            rmp_serde::decode::from_slice(buf)
48                .or_err(InternalError, "failed to decode cache meta v0")
49        }
50    }
51
52    #[derive(Debug, Deserialize, Serialize, Clone)]
53    pub(crate) struct InternalMetaV1 {
54        pub(crate) version: u8,
55        pub(crate) fresh_until: SystemTime,
56        pub(crate) created: SystemTime,
57        pub(crate) stale_while_revalidate_sec: u32,
58        pub(crate) stale_if_error_sec: u32,
59        // Do not add more field
60    }
61
62    impl InternalMetaV1 {
63        #[allow(dead_code)]
64        pub const VERSION: u8 = 1;
65
66        #[allow(dead_code)]
67        pub fn serialize(&self) -> Result<Vec<u8>> {
68            assert_eq!(self.version, 1);
69            rmp_serde::encode::to_vec(self).or_err(InternalError, "failed to encode cache meta")
70        }
71
72        fn deserialize(buf: &[u8]) -> Result<Self> {
73            rmp_serde::decode::from_slice(buf)
74                .or_err(InternalError, "failed to decode cache meta v1")
75        }
76    }
77
78    #[derive(Debug, Deserialize, Serialize, Clone)]
79    pub(crate) struct InternalMetaV2 {
80        pub(crate) version: u8,
81        pub(crate) fresh_until: SystemTime,
82        pub(crate) created: SystemTime,
83        pub(crate) updated: SystemTime,
84        pub(crate) stale_while_revalidate_sec: u32,
85        pub(crate) stale_if_error_sec: u32,
86        // Only the extended field to be added below. One field at a time.
87        // 1. serde default in order to accept an older version schema without the field existing
88        // 2. serde skip_serializing_if in order for software with only an older version of this
89        //    schema to decode it
90        // After full releases, remove `skip_serializing_if` so that we can add the next extended field.
91        #[serde(default)]
92        #[serde(skip_serializing_if = "Option::is_none")]
93        pub(crate) variance: Option<HashBinary>,
94    }
95
96    impl Default for InternalMetaV2 {
97        fn default() -> Self {
98            let epoch = SystemTime::UNIX_EPOCH;
99            InternalMetaV2 {
100                version: InternalMetaV2::VERSION,
101                fresh_until: epoch,
102                created: epoch,
103                updated: epoch,
104                stale_while_revalidate_sec: 0,
105                stale_if_error_sec: 0,
106                variance: None,
107            }
108        }
109    }
110
111    impl InternalMetaV2 {
112        pub const VERSION: u8 = 2;
113
114        pub fn serialize(&self) -> Result<Vec<u8>> {
115            assert_eq!(self.version, Self::VERSION);
116            rmp_serde::encode::to_vec(self).or_err(InternalError, "failed to encode cache meta")
117        }
118
119        fn deserialize(buf: &[u8]) -> Result<Self> {
120            rmp_serde::decode::from_slice(buf)
121                .or_err(InternalError, "failed to decode cache meta v2")
122        }
123    }
124
125    impl From<InternalMetaV0> for InternalMetaV2 {
126        fn from(v0: InternalMetaV0) -> Self {
127            InternalMetaV2 {
128                version: InternalMetaV2::VERSION,
129                fresh_until: v0.fresh_until,
130                created: v0.created,
131                updated: v0.created,
132                stale_while_revalidate_sec: v0.stale_while_revalidate_sec,
133                stale_if_error_sec: v0.stale_if_error_sec,
134                ..Default::default()
135            }
136        }
137    }
138
139    impl From<InternalMetaV1> for InternalMetaV2 {
140        fn from(v1: InternalMetaV1) -> Self {
141            InternalMetaV2 {
142                version: InternalMetaV2::VERSION,
143                fresh_until: v1.fresh_until,
144                created: v1.created,
145                updated: v1.created,
146                stale_while_revalidate_sec: v1.stale_while_revalidate_sec,
147                stale_if_error_sec: v1.stale_if_error_sec,
148                ..Default::default()
149            }
150        }
151    }
152
153    // cross version decode
154    pub(crate) fn deserialize(buf: &[u8]) -> Result<InternalMetaLatest> {
155        const MIN_SIZE: usize = 10; // a small number to read the first few bytes
156        if buf.len() < MIN_SIZE {
157            return Error::e_explain(
158                InternalError,
159                format!("Buf too short ({}) to be InternalMeta", buf.len()),
160            );
161        }
162        let preread_buf = &mut &buf[..MIN_SIZE];
163        // the struct is always packed as a fixed size array
164        match rmp::decode::read_array_len(preread_buf)
165            .or_err(InternalError, "failed to decode cache meta array size")?
166        {
167            // v0 has 4 items and no version number
168            4 => Ok(InternalMetaV0::deserialize(buf)?.into()),
169            // other V should have version number encoded
170            _ => {
171                // rmp will encode `version` < 128 into a fixint (one byte),
172                // so we use read_pfix
173                let version = rmp::decode::read_pfix(preread_buf)
174                    .or_err(InternalError, "failed to decode meta version")?;
175                match version {
176                    1 => Ok(InternalMetaV1::deserialize(buf)?.into()),
177                    2 => InternalMetaV2::deserialize(buf),
178                    _ => Error::e_explain(
179                        InternalError,
180                        format!("Unknown InternalMeta version {version}"),
181                    ),
182                }
183            }
184        }
185    }
186
187    #[cfg(test)]
188    mod tests {
189        use super::*;
190
191        #[test]
192        fn test_internal_meta_serde_v0() {
193            let meta = InternalMetaV0 {
194                fresh_until: SystemTime::now(),
195                created: SystemTime::now(),
196                stale_while_revalidate_sec: 0,
197                stale_if_error_sec: 0,
198            };
199            let binary = meta.serialize().unwrap();
200            let meta2 = InternalMetaV0::deserialize(&binary).unwrap();
201            assert_eq!(meta.fresh_until, meta2.fresh_until);
202        }
203
204        #[test]
205        fn test_internal_meta_serde_v1() {
206            let meta = InternalMetaV1 {
207                version: InternalMetaV1::VERSION,
208                fresh_until: SystemTime::now(),
209                created: SystemTime::now(),
210                stale_while_revalidate_sec: 0,
211                stale_if_error_sec: 0,
212            };
213            let binary = meta.serialize().unwrap();
214            let meta2 = InternalMetaV1::deserialize(&binary).unwrap();
215            assert_eq!(meta.fresh_until, meta2.fresh_until);
216        }
217
218        #[test]
219        fn test_internal_meta_serde_v2() {
220            let meta = InternalMetaV2::default();
221            let binary = meta.serialize().unwrap();
222            let meta2 = InternalMetaV2::deserialize(&binary).unwrap();
223            assert_eq!(meta2.version, 2);
224            assert_eq!(meta.fresh_until, meta2.fresh_until);
225            assert_eq!(meta.created, meta2.created);
226            assert_eq!(meta.updated, meta2.updated);
227        }
228
229        #[test]
230        fn test_internal_meta_serde_across_versions() {
231            let meta = InternalMetaV0 {
232                fresh_until: SystemTime::now(),
233                created: SystemTime::now(),
234                stale_while_revalidate_sec: 0,
235                stale_if_error_sec: 0,
236            };
237            let binary = meta.serialize().unwrap();
238            let meta2 = deserialize(&binary).unwrap();
239            assert_eq!(meta2.version, 2);
240            assert_eq!(meta.fresh_until, meta2.fresh_until);
241
242            let meta = InternalMetaV1 {
243                version: 1,
244                fresh_until: SystemTime::now(),
245                created: SystemTime::now(),
246                stale_while_revalidate_sec: 0,
247                stale_if_error_sec: 0,
248            };
249            let binary = meta.serialize().unwrap();
250            let meta2 = deserialize(&binary).unwrap();
251            assert_eq!(meta2.version, 2);
252            assert_eq!(meta.fresh_until, meta2.fresh_until);
253            // `updated` == `created` when upgrading to v2
254            assert_eq!(meta2.created, meta2.updated);
255        }
256
257        #[test]
258        fn test_internal_meta_serde_v2_extend_fields() {
259            // make sure that v2 format is backward compatible
260            // this is the base version of v2 without any extended fields
261            #[derive(Deserialize, Serialize)]
262            pub(crate) struct InternalMetaV2Base {
263                pub(crate) version: u8,
264                pub(crate) fresh_until: SystemTime,
265                pub(crate) created: SystemTime,
266                pub(crate) updated: SystemTime,
267                pub(crate) stale_while_revalidate_sec: u32,
268                pub(crate) stale_if_error_sec: u32,
269            }
270
271            impl InternalMetaV2Base {
272                pub const VERSION: u8 = 2;
273                pub fn serialize(&self) -> Result<Vec<u8>> {
274                    assert!(self.version >= Self::VERSION);
275                    rmp_serde::encode::to_vec(self)
276                        .or_err(InternalError, "failed to encode cache meta")
277                }
278                fn deserialize(buf: &[u8]) -> Result<Self> {
279                    rmp_serde::decode::from_slice(buf)
280                        .or_err(InternalError, "failed to decode cache meta v2")
281                }
282            }
283
284            // ext V2 to base v2
285            let meta = InternalMetaV2::default();
286            let binary = meta.serialize().unwrap();
287            let meta2 = InternalMetaV2Base::deserialize(&binary).unwrap();
288            assert_eq!(meta2.version, 2);
289            assert_eq!(meta.fresh_until, meta2.fresh_until);
290            assert_eq!(meta.created, meta2.created);
291            assert_eq!(meta.updated, meta2.updated);
292
293            // base V2 to ext v2
294            let now = SystemTime::now();
295            let meta = InternalMetaV2Base {
296                version: InternalMetaV2::VERSION,
297                fresh_until: now,
298                created: now,
299                updated: now,
300                stale_while_revalidate_sec: 0,
301                stale_if_error_sec: 0,
302            };
303            let binary = meta.serialize().unwrap();
304            let meta2 = InternalMetaV2::deserialize(&binary).unwrap();
305            assert_eq!(meta2.version, 2);
306            assert_eq!(meta.fresh_until, meta2.fresh_until);
307            assert_eq!(meta.created, meta2.created);
308            assert_eq!(meta.updated, meta2.updated);
309        }
310    }
311}
312
313#[derive(Debug)]
314pub(crate) struct CacheMetaInner {
315    // http header and Internal meta have different ways of serialization, so keep them separated
316    pub(crate) internal: InternalMeta,
317    pub(crate) header: ResponseHeader,
318    /// An opaque type map to hold extra information for communication between cache backends
319    /// and users. This field is **not** guaranteed be persistently stored in the cache backend.
320    pub extensions: Extensions,
321}
322
323/// The cacheable response header and cache metadata
324#[derive(Debug)]
325pub struct CacheMeta(pub(crate) Box<CacheMetaInner>);
326
327impl CacheMeta {
328    /// Create a [CacheMeta] from the given metadata and the response header
329    pub fn new(
330        fresh_until: SystemTime,
331        created: SystemTime,
332        stale_while_revalidate_sec: u32,
333        stale_if_error_sec: u32,
334        header: ResponseHeader,
335    ) -> CacheMeta {
336        CacheMeta(Box::new(CacheMetaInner {
337            internal: InternalMeta {
338                version: InternalMeta::VERSION,
339                fresh_until,
340                created,
341                updated: created, // created == updated for new meta
342                stale_while_revalidate_sec,
343                stale_if_error_sec,
344                ..Default::default()
345            },
346            header,
347            extensions: Extensions::new(),
348        }))
349    }
350
351    /// When the asset was created/admitted to cache
352    pub fn created(&self) -> SystemTime {
353        self.0.internal.created
354    }
355
356    /// The last time the asset was revalidated
357    ///
358    /// This value will be the same as [Self::created()] if no revalidation ever happens
359    pub fn updated(&self) -> SystemTime {
360        self.0.internal.updated
361    }
362
363    /// Is the asset still valid
364    pub fn is_fresh(&self, time: SystemTime) -> bool {
365        // NOTE: HTTP cache time resolution is second
366        self.0.internal.fresh_until >= time
367    }
368
369    /// How long (in seconds) the asset should be fresh since its admission/revalidation
370    ///
371    /// This is essentially the max-age value (or its equivalence)
372    pub fn fresh_sec(&self) -> u64 {
373        // swallow `duration_since` error, assets that are always stale have earlier `fresh_until` than `created`
374        // practically speaking we can always treat these as 0 ttl
375        // XXX: return Error if `fresh_until` is much earlier than expected?
376        self.0
377            .internal
378            .fresh_until
379            .duration_since(self.0.internal.updated)
380            .map_or(0, |duration| duration.as_secs())
381    }
382
383    /// Until when the asset is considered fresh
384    pub fn fresh_until(&self) -> SystemTime {
385        self.0.internal.fresh_until
386    }
387
388    /// How old the asset is since its admission/revalidation
389    pub fn age(&self) -> Duration {
390        SystemTime::now()
391            .duration_since(self.updated())
392            .unwrap_or_default()
393    }
394
395    /// The stale-while-revalidate limit in seconds
396    pub fn stale_while_revalidate_sec(&self) -> u32 {
397        self.0.internal.stale_while_revalidate_sec
398    }
399
400    /// The stale-if-error limit in seconds
401    pub fn stale_if_error_sec(&self) -> u32 {
402        self.0.internal.stale_if_error_sec
403    }
404
405    /// Can the asset be used to serve stale during revalidation at the given time.
406    ///
407    /// NOTE: the serve stale functions do not check !is_fresh(time),
408    /// i.e. the object is already assumed to be stale.
409    pub fn serve_stale_while_revalidate(&self, time: SystemTime) -> bool {
410        self.can_serve_stale(self.0.internal.stale_while_revalidate_sec, time)
411    }
412
413    /// Can the asset be used to serve stale after error at the given time.
414    ///
415    /// NOTE: the serve stale functions do not check !is_fresh(time),
416    /// i.e. the object is already assumed to be stale.
417    pub fn serve_stale_if_error(&self, time: SystemTime) -> bool {
418        self.can_serve_stale(self.0.internal.stale_if_error_sec, time)
419    }
420
421    /// Disable serve stale for this asset
422    pub fn disable_serve_stale(&mut self) {
423        self.0.internal.stale_if_error_sec = 0;
424        self.0.internal.stale_while_revalidate_sec = 0;
425    }
426
427    /// Get the variance hash of this asset
428    pub fn variance(&self) -> Option<HashBinary> {
429        self.0.internal.variance
430    }
431
432    /// Set the variance key of this asset
433    pub fn set_variance_key(&mut self, variance_key: HashBinary) {
434        self.0.internal.variance = Some(variance_key);
435    }
436
437    /// Set the variance (hash) of this asset
438    pub fn set_variance(&mut self, variance: HashBinary) {
439        self.0.internal.variance = Some(variance)
440    }
441
442    /// Removes the variance (hash) of this asset
443    pub fn remove_variance(&mut self) {
444        self.0.internal.variance = None
445    }
446
447    /// Get the response header in this asset
448    pub fn response_header(&self) -> &ResponseHeader {
449        &self.0.header
450    }
451
452    /// Modify the header in this asset
453    pub fn response_header_mut(&mut self) -> &mut ResponseHeader {
454        &mut self.0.header
455    }
456
457    /// Expose the extensions to read
458    pub fn extensions(&self) -> &Extensions {
459        &self.0.extensions
460    }
461
462    /// Expose the extensions to modify
463    pub fn extensions_mut(&mut self) -> &mut Extensions {
464        &mut self.0.extensions
465    }
466
467    /// Get a copy of the response header
468    pub fn response_header_copy(&self) -> ResponseHeader {
469        self.0.header.clone()
470    }
471
472    /// get all the headers of this asset
473    pub fn headers(&self) -> &HMap {
474        &self.0.header.headers
475    }
476
477    fn can_serve_stale(&self, serve_stale_sec: u32, time: SystemTime) -> bool {
478        if serve_stale_sec == 0 {
479            return false;
480        }
481        if let Some(stale_until) = self
482            .0
483            .internal
484            .fresh_until
485            .checked_add(Duration::from_secs(serve_stale_sec.into()))
486        {
487            stale_until >= time
488        } else {
489            // overflowed: treat as infinite ttl
490            true
491        }
492    }
493
494    /// Serialize this object
495    pub fn serialize(&self) -> Result<(Vec<u8>, Vec<u8>)> {
496        let internal = self.0.internal.serialize()?;
497        let header = header_serialize(&self.0.header)?;
498        Ok((internal, header))
499    }
500
501    /// Deserialize from the binary format
502    pub fn deserialize(internal: &[u8], header: &[u8]) -> Result<Self> {
503        let internal = internal_meta::deserialize(internal)?;
504        let header = header_deserialize(header)?;
505        Ok(CacheMeta(Box::new(CacheMetaInner {
506            internal,
507            header,
508            extensions: Extensions::new(),
509        })))
510    }
511}
512
513use http::StatusCode;
514
515/// The function to generate TTL from the given [StatusCode].
516pub type FreshSecByStatusFn = fn(StatusCode) -> Option<u32>;
517
518/// The default settings to generate [CacheMeta]
519pub struct CacheMetaDefaults {
520    // if a status code is not included in fresh_sec, it's not considered cacheable by default.
521    fresh_sec_fn: FreshSecByStatusFn,
522    stale_while_revalidate_sec: u32,
523    // TODO: allow "error" condition to be configurable?
524    stale_if_error_sec: u32,
525}
526
527impl CacheMetaDefaults {
528    /// Create a new [CacheMetaDefaults]
529    pub const fn new(
530        fresh_sec_fn: FreshSecByStatusFn,
531        stale_while_revalidate_sec: u32,
532        stale_if_error_sec: u32,
533    ) -> Self {
534        CacheMetaDefaults {
535            fresh_sec_fn,
536            stale_while_revalidate_sec,
537            stale_if_error_sec,
538        }
539    }
540
541    /// Return the default TTL for the given [StatusCode]
542    ///
543    /// `None`: do no cache this code.
544    pub fn fresh_sec(&self, resp_status: StatusCode) -> Option<u32> {
545        // safe guard to make sure 304 response to share the same default ttl of 200
546        if resp_status == StatusCode::NOT_MODIFIED {
547            (self.fresh_sec_fn)(StatusCode::OK)
548        } else {
549            (self.fresh_sec_fn)(resp_status)
550        }
551    }
552
553    /// The default SWR seconds
554    pub fn serve_stale_while_revalidate_sec(&self) -> u32 {
555        self.stale_while_revalidate_sec
556    }
557
558    /// The default SIE seconds
559    pub fn serve_stale_if_error_sec(&self) -> u32 {
560        self.stale_if_error_sec
561    }
562}
563
564use log::warn;
565use once_cell::sync::{Lazy, OnceCell};
566use pingora_header_serde::HeaderSerde;
567use std::fs::File;
568use std::io::Read;
569
570/* load header compression engine and its dictionary globally */
571pub(crate) static COMPRESSION_DICT_PATH: OnceCell<String> = OnceCell::new();
572
573fn load_file(path: &String) -> Option<Vec<u8>> {
574    let mut file = File::open(path)
575        .map_err(|e| {
576            warn!(
577                "failed to open header compress dictionary file at {}, {:?}",
578                path, e
579            );
580            e
581        })
582        .ok()?;
583    let mut dict = Vec::new();
584    file.read_to_end(&mut dict)
585        .map_err(|e| {
586            warn!(
587                "failed to read header compress dictionary file at {}, {:?}",
588                path, e
589            );
590            e
591        })
592        .ok()?;
593
594    Some(dict)
595}
596
597static HEADER_SERDE: Lazy<HeaderSerde> = Lazy::new(|| {
598    let dict_path_opt = COMPRESSION_DICT_PATH.get();
599
600    if dict_path_opt.is_none() {
601        warn!("COMPRESSION_DICT_PATH is not set");
602    }
603
604    let result = dict_path_opt.and_then(load_file);
605
606    if result.is_none() {
607        warn!("HeaderSerde not loaded from file");
608    }
609
610    HeaderSerde::new(result)
611});
612
613pub(crate) fn header_serialize(header: &ResponseHeader) -> Result<Vec<u8>> {
614    HEADER_SERDE.serialize(header)
615}
616
617pub(crate) fn header_deserialize<T: AsRef<[u8]>>(buf: T) -> Result<ResponseHeader> {
618    HEADER_SERDE.deserialize(buf.as_ref())
619}