1pub use http::Extensions;
18use log::warn;
19use once_cell::sync::{Lazy, OnceCell};
20use pingora_error::{Error, ErrorType::*, OrErr, Result};
21use pingora_header_serde::HeaderSerde;
22use pingora_http::{HMap, ResponseHeader};
23use serde::{Deserialize, Serialize};
24use std::borrow::Cow;
25use std::time::{Duration, SystemTime};
26
27use crate::key::HashBinary;
28
29pub(crate) type InternalMeta = internal_meta::InternalMetaLatest;
30mod internal_meta {
31 use super::*;
32
33 pub(crate) type InternalMetaLatest = InternalMetaV2;
34
35 #[derive(Debug, Deserialize, Serialize, Clone)]
36 pub(crate) struct InternalMetaV0 {
37 pub(crate) fresh_until: SystemTime,
38 pub(crate) created: SystemTime,
39 pub(crate) stale_while_revalidate_sec: u32,
40 pub(crate) stale_if_error_sec: u32,
41 }
43
44 impl InternalMetaV0 {
45 #[allow(dead_code)]
46 fn serialize(&self) -> Result<Vec<u8>> {
47 rmp_serde::encode::to_vec(self).or_err(InternalError, "failed to encode cache meta")
48 }
49
50 fn deserialize(buf: &[u8]) -> Result<Self> {
51 rmp_serde::decode::from_slice(buf)
52 .or_err(InternalError, "failed to decode cache meta v0")
53 }
54 }
55
56 #[derive(Debug, Deserialize, Serialize, Clone)]
57 pub(crate) struct InternalMetaV1 {
58 pub(crate) version: u8,
59 pub(crate) fresh_until: SystemTime,
60 pub(crate) created: SystemTime,
61 pub(crate) stale_while_revalidate_sec: u32,
62 pub(crate) stale_if_error_sec: u32,
63 }
65
66 impl InternalMetaV1 {
67 #[allow(dead_code)]
68 pub const VERSION: u8 = 1;
69
70 #[allow(dead_code)]
71 pub fn serialize(&self) -> Result<Vec<u8>> {
72 assert_eq!(self.version, 1);
73 rmp_serde::encode::to_vec(self).or_err(InternalError, "failed to encode cache meta")
74 }
75
76 fn deserialize(buf: &[u8]) -> Result<Self> {
77 rmp_serde::decode::from_slice(buf)
78 .or_err(InternalError, "failed to decode cache meta v1")
79 }
80 }
81
82 #[derive(Debug, Deserialize, Serialize, Clone)]
83 pub(crate) struct InternalMetaV2 {
84 pub(crate) version: u8,
85 pub(crate) fresh_until: SystemTime,
86 pub(crate) created: SystemTime,
87 pub(crate) updated: SystemTime,
88 pub(crate) stale_while_revalidate_sec: u32,
89 pub(crate) stale_if_error_sec: u32,
90 #[serde(default)]
96 #[serde(skip_serializing_if = "Option::is_none")]
97 pub(crate) variance: Option<HashBinary>,
98 }
99
100 impl Default for InternalMetaV2 {
101 fn default() -> Self {
102 let epoch = SystemTime::UNIX_EPOCH;
103 InternalMetaV2 {
104 version: InternalMetaV2::VERSION,
105 fresh_until: epoch,
106 created: epoch,
107 updated: epoch,
108 stale_while_revalidate_sec: 0,
109 stale_if_error_sec: 0,
110 variance: None,
111 }
112 }
113 }
114
115 impl InternalMetaV2 {
116 pub const VERSION: u8 = 2;
117
118 pub fn serialize(&self) -> Result<Vec<u8>> {
119 assert_eq!(self.version, Self::VERSION);
120 rmp_serde::encode::to_vec(self).or_err(InternalError, "failed to encode cache meta")
121 }
122
123 fn deserialize(buf: &[u8]) -> Result<Self> {
124 rmp_serde::decode::from_slice(buf)
125 .or_err(InternalError, "failed to decode cache meta v2")
126 }
127 }
128
129 impl From<InternalMetaV0> for InternalMetaV2 {
130 fn from(v0: InternalMetaV0) -> Self {
131 InternalMetaV2 {
132 version: InternalMetaV2::VERSION,
133 fresh_until: v0.fresh_until,
134 created: v0.created,
135 updated: v0.created,
136 stale_while_revalidate_sec: v0.stale_while_revalidate_sec,
137 stale_if_error_sec: v0.stale_if_error_sec,
138 ..Default::default()
139 }
140 }
141 }
142
143 impl From<InternalMetaV1> for InternalMetaV2 {
144 fn from(v1: InternalMetaV1) -> Self {
145 InternalMetaV2 {
146 version: InternalMetaV2::VERSION,
147 fresh_until: v1.fresh_until,
148 created: v1.created,
149 updated: v1.created,
150 stale_while_revalidate_sec: v1.stale_while_revalidate_sec,
151 stale_if_error_sec: v1.stale_if_error_sec,
152 ..Default::default()
153 }
154 }
155 }
156
157 pub(crate) fn deserialize(buf: &[u8]) -> Result<InternalMetaLatest> {
159 const MIN_SIZE: usize = 10; if buf.len() < MIN_SIZE {
161 return Error::e_explain(
162 InternalError,
163 format!("Buf too short ({}) to be InternalMeta", buf.len()),
164 );
165 }
166 let preread_buf = &mut &buf[..MIN_SIZE];
167 match rmp::decode::read_array_len(preread_buf)
169 .or_err(InternalError, "failed to decode cache meta array size")?
170 {
171 4 => Ok(InternalMetaV0::deserialize(buf)?.into()),
173 _ => {
175 let version = rmp::decode::read_pfix(preread_buf)
178 .or_err(InternalError, "failed to decode meta version")?;
179 match version {
180 1 => Ok(InternalMetaV1::deserialize(buf)?.into()),
181 2 => InternalMetaV2::deserialize(buf),
182 _ => Error::e_explain(
183 InternalError,
184 format!("Unknown InternalMeta version {version}"),
185 ),
186 }
187 }
188 }
189 }
190
191 #[cfg(test)]
192 mod tests {
193 use super::*;
194
195 #[test]
196 fn test_internal_meta_serde_v0() {
197 let meta = InternalMetaV0 {
198 fresh_until: SystemTime::now(),
199 created: SystemTime::now(),
200 stale_while_revalidate_sec: 0,
201 stale_if_error_sec: 0,
202 };
203 let binary = meta.serialize().unwrap();
204 let meta2 = InternalMetaV0::deserialize(&binary).unwrap();
205 assert_eq!(meta.fresh_until, meta2.fresh_until);
206 }
207
208 #[test]
209 fn test_internal_meta_serde_v1() {
210 let meta = InternalMetaV1 {
211 version: InternalMetaV1::VERSION,
212 fresh_until: SystemTime::now(),
213 created: SystemTime::now(),
214 stale_while_revalidate_sec: 0,
215 stale_if_error_sec: 0,
216 };
217 let binary = meta.serialize().unwrap();
218 let meta2 = InternalMetaV1::deserialize(&binary).unwrap();
219 assert_eq!(meta.fresh_until, meta2.fresh_until);
220 }
221
222 #[test]
223 fn test_internal_meta_serde_v2() {
224 let meta = InternalMetaV2::default();
225 let binary = meta.serialize().unwrap();
226 let meta2 = InternalMetaV2::deserialize(&binary).unwrap();
227 assert_eq!(meta2.version, 2);
228 assert_eq!(meta.fresh_until, meta2.fresh_until);
229 assert_eq!(meta.created, meta2.created);
230 assert_eq!(meta.updated, meta2.updated);
231 }
232
233 #[test]
234 fn test_internal_meta_serde_across_versions() {
235 let meta = InternalMetaV0 {
236 fresh_until: SystemTime::now(),
237 created: SystemTime::now(),
238 stale_while_revalidate_sec: 0,
239 stale_if_error_sec: 0,
240 };
241 let binary = meta.serialize().unwrap();
242 let meta2 = deserialize(&binary).unwrap();
243 assert_eq!(meta2.version, 2);
244 assert_eq!(meta.fresh_until, meta2.fresh_until);
245
246 let meta = InternalMetaV1 {
247 version: 1,
248 fresh_until: SystemTime::now(),
249 created: SystemTime::now(),
250 stale_while_revalidate_sec: 0,
251 stale_if_error_sec: 0,
252 };
253 let binary = meta.serialize().unwrap();
254 let meta2 = deserialize(&binary).unwrap();
255 assert_eq!(meta2.version, 2);
256 assert_eq!(meta.fresh_until, meta2.fresh_until);
257 assert_eq!(meta2.created, meta2.updated);
259 }
260
261 #[test]
262 fn test_internal_meta_serde_v2_extend_fields() {
263 #[derive(Deserialize, Serialize)]
266 pub(crate) struct InternalMetaV2Base {
267 pub(crate) version: u8,
268 pub(crate) fresh_until: SystemTime,
269 pub(crate) created: SystemTime,
270 pub(crate) updated: SystemTime,
271 pub(crate) stale_while_revalidate_sec: u32,
272 pub(crate) stale_if_error_sec: u32,
273 }
274
275 impl InternalMetaV2Base {
276 pub const VERSION: u8 = 2;
277 pub fn serialize(&self) -> Result<Vec<u8>> {
278 assert!(self.version >= Self::VERSION);
279 rmp_serde::encode::to_vec(self)
280 .or_err(InternalError, "failed to encode cache meta")
281 }
282 fn deserialize(buf: &[u8]) -> Result<Self> {
283 rmp_serde::decode::from_slice(buf)
284 .or_err(InternalError, "failed to decode cache meta v2")
285 }
286 }
287
288 let meta = InternalMetaV2::default();
290 let binary = meta.serialize().unwrap();
291 let meta2 = InternalMetaV2Base::deserialize(&binary).unwrap();
292 assert_eq!(meta2.version, 2);
293 assert_eq!(meta.fresh_until, meta2.fresh_until);
294 assert_eq!(meta.created, meta2.created);
295 assert_eq!(meta.updated, meta2.updated);
296
297 let now = SystemTime::now();
299 let meta = InternalMetaV2Base {
300 version: InternalMetaV2::VERSION,
301 fresh_until: now,
302 created: now,
303 updated: now,
304 stale_while_revalidate_sec: 0,
305 stale_if_error_sec: 0,
306 };
307 let binary = meta.serialize().unwrap();
308 let meta2 = InternalMetaV2::deserialize(&binary).unwrap();
309 assert_eq!(meta2.version, 2);
310 assert_eq!(meta.fresh_until, meta2.fresh_until);
311 assert_eq!(meta.created, meta2.created);
312 assert_eq!(meta.updated, meta2.updated);
313 }
314 }
315}
316
317#[derive(Debug)]
318pub(crate) struct CacheMetaInner {
319 pub(crate) internal: InternalMeta,
321 pub(crate) header: ResponseHeader,
322 pub extensions: Extensions,
325}
326
327#[derive(Debug)]
329pub struct CacheMeta(pub(crate) Box<CacheMetaInner>);
330
331impl CacheMeta {
332 pub fn new(
334 fresh_until: SystemTime,
335 created: SystemTime,
336 stale_while_revalidate_sec: u32,
337 stale_if_error_sec: u32,
338 header: ResponseHeader,
339 ) -> CacheMeta {
340 CacheMeta(Box::new(CacheMetaInner {
341 internal: InternalMeta {
342 version: InternalMeta::VERSION,
343 fresh_until,
344 created,
345 updated: created, stale_while_revalidate_sec,
347 stale_if_error_sec,
348 ..Default::default()
349 },
350 header,
351 extensions: Extensions::new(),
352 }))
353 }
354
355 pub fn created(&self) -> SystemTime {
357 self.0.internal.created
358 }
359
360 pub fn updated(&self) -> SystemTime {
364 self.0.internal.updated
365 }
366
367 pub fn is_fresh(&self, time: SystemTime) -> bool {
369 self.0.internal.fresh_until >= time
371 }
372
373 pub fn fresh_sec(&self) -> u64 {
377 self.0
381 .internal
382 .fresh_until
383 .duration_since(self.0.internal.updated)
384 .map_or(0, |duration| duration.as_secs())
385 }
386
387 pub fn fresh_until(&self) -> SystemTime {
389 self.0.internal.fresh_until
390 }
391
392 pub fn age(&self) -> Duration {
394 SystemTime::now()
395 .duration_since(self.updated())
396 .unwrap_or_default()
397 }
398
399 pub fn stale_while_revalidate_sec(&self) -> u32 {
401 self.0.internal.stale_while_revalidate_sec
402 }
403
404 pub fn stale_if_error_sec(&self) -> u32 {
406 self.0.internal.stale_if_error_sec
407 }
408
409 pub fn serve_stale_while_revalidate(&self, time: SystemTime) -> bool {
414 self.can_serve_stale(self.0.internal.stale_while_revalidate_sec, time)
415 }
416
417 pub fn serve_stale_if_error(&self, time: SystemTime) -> bool {
422 self.can_serve_stale(self.0.internal.stale_if_error_sec, time)
423 }
424
425 pub fn disable_serve_stale(&mut self) {
427 self.0.internal.stale_if_error_sec = 0;
428 self.0.internal.stale_while_revalidate_sec = 0;
429 }
430
431 pub fn variance(&self) -> Option<HashBinary> {
433 self.0.internal.variance
434 }
435
436 pub fn set_variance_key(&mut self, variance_key: HashBinary) {
438 self.0.internal.variance = Some(variance_key);
439 }
440
441 pub fn set_variance(&mut self, variance: HashBinary) {
443 self.0.internal.variance = Some(variance)
444 }
445
446 pub fn remove_variance(&mut self) {
448 self.0.internal.variance = None
449 }
450
451 pub fn response_header(&self) -> &ResponseHeader {
453 &self.0.header
454 }
455
456 pub fn response_header_mut(&mut self) -> &mut ResponseHeader {
458 &mut self.0.header
459 }
460
461 pub fn extensions(&self) -> &Extensions {
463 &self.0.extensions
464 }
465
466 pub fn extensions_mut(&mut self) -> &mut Extensions {
468 &mut self.0.extensions
469 }
470
471 pub fn response_header_copy(&self) -> ResponseHeader {
473 self.0.header.clone()
474 }
475
476 pub fn headers(&self) -> &HMap {
478 &self.0.header.headers
479 }
480
481 fn can_serve_stale(&self, serve_stale_sec: u32, time: SystemTime) -> bool {
482 if serve_stale_sec == 0 {
483 return false;
484 }
485 if let Some(stale_until) = self
486 .0
487 .internal
488 .fresh_until
489 .checked_add(Duration::from_secs(serve_stale_sec.into()))
490 {
491 stale_until >= time
492 } else {
493 true
495 }
496 }
497
498 pub fn serialize(&self) -> Result<(Vec<u8>, Vec<u8>)> {
500 let internal = self.0.internal.serialize()?;
501 let header = header_serialize(&self.0.header)?;
502 Ok((internal, header))
503 }
504
505 pub fn deserialize(internal: &[u8], header: &[u8]) -> Result<Self> {
507 let internal = internal_meta::deserialize(internal)?;
508 let header = header_deserialize(header)?;
509 Ok(CacheMeta(Box::new(CacheMetaInner {
510 internal,
511 header,
512 extensions: Extensions::new(),
513 })))
514 }
515}
516
517use http::StatusCode;
518
519pub type FreshDurationByStatusFn = fn(StatusCode) -> Option<Duration>;
521
522pub struct CacheMetaDefaults {
524 fresh_sec_fn: FreshDurationByStatusFn,
526 stale_while_revalidate_sec: u32,
527 stale_if_error_sec: u32,
529}
530
531impl CacheMetaDefaults {
532 pub const fn new(
534 fresh_sec_fn: FreshDurationByStatusFn,
535 stale_while_revalidate_sec: u32,
536 stale_if_error_sec: u32,
537 ) -> Self {
538 CacheMetaDefaults {
539 fresh_sec_fn,
540 stale_while_revalidate_sec,
541 stale_if_error_sec,
542 }
543 }
544
545 pub fn fresh_sec(&self, resp_status: StatusCode) -> Option<Duration> {
549 if resp_status == StatusCode::NOT_MODIFIED {
551 (self.fresh_sec_fn)(StatusCode::OK)
552 } else {
553 (self.fresh_sec_fn)(resp_status)
554 }
555 }
556
557 pub fn serve_stale_while_revalidate_sec(&self) -> u32 {
559 self.stale_while_revalidate_sec
560 }
561
562 pub fn serve_stale_if_error_sec(&self) -> u32 {
564 self.stale_if_error_sec
565 }
566}
567
568static COMPRESSION_DICT_CONTENT: OnceCell<Cow<'static, [u8]>> = OnceCell::new();
572
573static HEADER_SERDE: Lazy<HeaderSerde> = Lazy::new(|| {
574 let dict_opt = if let Some(dict_content) = COMPRESSION_DICT_CONTENT.get() {
575 Some(dict_content.to_vec())
576 } else {
577 warn!("no header compression dictionary loaded - use set_compression_dict_content() or set_compression_dict_path() to set one");
578 None
579 };
580
581 HeaderSerde::new(dict_opt)
582});
583
584pub(crate) fn header_serialize(header: &ResponseHeader) -> Result<Vec<u8>> {
585 HEADER_SERDE.serialize(header)
586}
587
588pub(crate) fn header_deserialize<T: AsRef<[u8]>>(buf: T) -> Result<ResponseHeader> {
589 HEADER_SERDE.deserialize(buf.as_ref())
590}
591
592pub fn set_compression_dict_path(path: &str) -> bool {
598 match std::fs::read(path) {
599 Ok(dict) => COMPRESSION_DICT_CONTENT.set(dict.into()).is_ok(),
600 Err(e) => {
601 warn!(
602 "failed to read header compress dictionary file at {}, {:?}",
603 path, e
604 );
605 false
606 }
607 }
608}
609
610pub fn set_compression_dict_content(content: Cow<'static, [u8]>) -> bool {
617 COMPRESSION_DICT_CONTENT.set(content).is_ok()
618}