1pub use http::Extensions;
18use pingora_error::{Error, ErrorType::*, OrErr, Result};
19use pingora_http::{HMap, ResponseHeader};
20use serde::{Deserialize, Serialize};
21use std::time::{Duration, SystemTime};
22
23use crate::key::HashBinary;
24
25pub(crate) type InternalMeta = internal_meta::InternalMetaLatest;
26mod internal_meta {
27 use super::*;
28
29 pub(crate) type InternalMetaLatest = InternalMetaV2;
30
31 #[derive(Debug, Deserialize, Serialize, Clone)]
32 pub(crate) struct InternalMetaV0 {
33 pub(crate) fresh_until: SystemTime,
34 pub(crate) created: SystemTime,
35 pub(crate) stale_while_revalidate_sec: u32,
36 pub(crate) stale_if_error_sec: u32,
37 }
39
40 impl InternalMetaV0 {
41 #[allow(dead_code)]
42 fn serialize(&self) -> Result<Vec<u8>> {
43 rmp_serde::encode::to_vec(self).or_err(InternalError, "failed to encode cache meta")
44 }
45
46 fn deserialize(buf: &[u8]) -> Result<Self> {
47 rmp_serde::decode::from_slice(buf)
48 .or_err(InternalError, "failed to decode cache meta v0")
49 }
50 }
51
52 #[derive(Debug, Deserialize, Serialize, Clone)]
53 pub(crate) struct InternalMetaV1 {
54 pub(crate) version: u8,
55 pub(crate) fresh_until: SystemTime,
56 pub(crate) created: SystemTime,
57 pub(crate) stale_while_revalidate_sec: u32,
58 pub(crate) stale_if_error_sec: u32,
59 }
61
62 impl InternalMetaV1 {
63 #[allow(dead_code)]
64 pub const VERSION: u8 = 1;
65
66 #[allow(dead_code)]
67 pub fn serialize(&self) -> Result<Vec<u8>> {
68 assert_eq!(self.version, 1);
69 rmp_serde::encode::to_vec(self).or_err(InternalError, "failed to encode cache meta")
70 }
71
72 fn deserialize(buf: &[u8]) -> Result<Self> {
73 rmp_serde::decode::from_slice(buf)
74 .or_err(InternalError, "failed to decode cache meta v1")
75 }
76 }
77
78 #[derive(Debug, Deserialize, Serialize, Clone)]
79 pub(crate) struct InternalMetaV2 {
80 pub(crate) version: u8,
81 pub(crate) fresh_until: SystemTime,
82 pub(crate) created: SystemTime,
83 pub(crate) updated: SystemTime,
84 pub(crate) stale_while_revalidate_sec: u32,
85 pub(crate) stale_if_error_sec: u32,
86 #[serde(default)]
92 #[serde(skip_serializing_if = "Option::is_none")]
93 pub(crate) variance: Option<HashBinary>,
94 }
95
96 impl Default for InternalMetaV2 {
97 fn default() -> Self {
98 let epoch = SystemTime::UNIX_EPOCH;
99 InternalMetaV2 {
100 version: InternalMetaV2::VERSION,
101 fresh_until: epoch,
102 created: epoch,
103 updated: epoch,
104 stale_while_revalidate_sec: 0,
105 stale_if_error_sec: 0,
106 variance: None,
107 }
108 }
109 }
110
111 impl InternalMetaV2 {
112 pub const VERSION: u8 = 2;
113
114 pub fn serialize(&self) -> Result<Vec<u8>> {
115 assert_eq!(self.version, Self::VERSION);
116 rmp_serde::encode::to_vec(self).or_err(InternalError, "failed to encode cache meta")
117 }
118
119 fn deserialize(buf: &[u8]) -> Result<Self> {
120 rmp_serde::decode::from_slice(buf)
121 .or_err(InternalError, "failed to decode cache meta v2")
122 }
123 }
124
125 impl From<InternalMetaV0> for InternalMetaV2 {
126 fn from(v0: InternalMetaV0) -> Self {
127 InternalMetaV2 {
128 version: InternalMetaV2::VERSION,
129 fresh_until: v0.fresh_until,
130 created: v0.created,
131 updated: v0.created,
132 stale_while_revalidate_sec: v0.stale_while_revalidate_sec,
133 stale_if_error_sec: v0.stale_if_error_sec,
134 ..Default::default()
135 }
136 }
137 }
138
139 impl From<InternalMetaV1> for InternalMetaV2 {
140 fn from(v1: InternalMetaV1) -> Self {
141 InternalMetaV2 {
142 version: InternalMetaV2::VERSION,
143 fresh_until: v1.fresh_until,
144 created: v1.created,
145 updated: v1.created,
146 stale_while_revalidate_sec: v1.stale_while_revalidate_sec,
147 stale_if_error_sec: v1.stale_if_error_sec,
148 ..Default::default()
149 }
150 }
151 }
152
153 pub(crate) fn deserialize(buf: &[u8]) -> Result<InternalMetaLatest> {
155 const MIN_SIZE: usize = 10; if buf.len() < MIN_SIZE {
157 return Error::e_explain(
158 InternalError,
159 format!("Buf too short ({}) to be InternalMeta", buf.len()),
160 );
161 }
162 let preread_buf = &mut &buf[..MIN_SIZE];
163 match rmp::decode::read_array_len(preread_buf)
165 .or_err(InternalError, "failed to decode cache meta array size")?
166 {
167 4 => Ok(InternalMetaV0::deserialize(buf)?.into()),
169 _ => {
171 let version = rmp::decode::read_pfix(preread_buf)
174 .or_err(InternalError, "failed to decode meta version")?;
175 match version {
176 1 => Ok(InternalMetaV1::deserialize(buf)?.into()),
177 2 => InternalMetaV2::deserialize(buf),
178 _ => Error::e_explain(
179 InternalError,
180 format!("Unknown InternalMeta version {version}"),
181 ),
182 }
183 }
184 }
185 }
186
187 #[cfg(test)]
188 mod tests {
189 use super::*;
190
191 #[test]
192 fn test_internal_meta_serde_v0() {
193 let meta = InternalMetaV0 {
194 fresh_until: SystemTime::now(),
195 created: SystemTime::now(),
196 stale_while_revalidate_sec: 0,
197 stale_if_error_sec: 0,
198 };
199 let binary = meta.serialize().unwrap();
200 let meta2 = InternalMetaV0::deserialize(&binary).unwrap();
201 assert_eq!(meta.fresh_until, meta2.fresh_until);
202 }
203
204 #[test]
205 fn test_internal_meta_serde_v1() {
206 let meta = InternalMetaV1 {
207 version: InternalMetaV1::VERSION,
208 fresh_until: SystemTime::now(),
209 created: SystemTime::now(),
210 stale_while_revalidate_sec: 0,
211 stale_if_error_sec: 0,
212 };
213 let binary = meta.serialize().unwrap();
214 let meta2 = InternalMetaV1::deserialize(&binary).unwrap();
215 assert_eq!(meta.fresh_until, meta2.fresh_until);
216 }
217
218 #[test]
219 fn test_internal_meta_serde_v2() {
220 let meta = InternalMetaV2::default();
221 let binary = meta.serialize().unwrap();
222 let meta2 = InternalMetaV2::deserialize(&binary).unwrap();
223 assert_eq!(meta2.version, 2);
224 assert_eq!(meta.fresh_until, meta2.fresh_until);
225 assert_eq!(meta.created, meta2.created);
226 assert_eq!(meta.updated, meta2.updated);
227 }
228
229 #[test]
230 fn test_internal_meta_serde_across_versions() {
231 let meta = InternalMetaV0 {
232 fresh_until: SystemTime::now(),
233 created: SystemTime::now(),
234 stale_while_revalidate_sec: 0,
235 stale_if_error_sec: 0,
236 };
237 let binary = meta.serialize().unwrap();
238 let meta2 = deserialize(&binary).unwrap();
239 assert_eq!(meta2.version, 2);
240 assert_eq!(meta.fresh_until, meta2.fresh_until);
241
242 let meta = InternalMetaV1 {
243 version: 1,
244 fresh_until: SystemTime::now(),
245 created: SystemTime::now(),
246 stale_while_revalidate_sec: 0,
247 stale_if_error_sec: 0,
248 };
249 let binary = meta.serialize().unwrap();
250 let meta2 = deserialize(&binary).unwrap();
251 assert_eq!(meta2.version, 2);
252 assert_eq!(meta.fresh_until, meta2.fresh_until);
253 assert_eq!(meta2.created, meta2.updated);
255 }
256
257 #[test]
258 fn test_internal_meta_serde_v2_extend_fields() {
259 #[derive(Deserialize, Serialize)]
262 pub(crate) struct InternalMetaV2Base {
263 pub(crate) version: u8,
264 pub(crate) fresh_until: SystemTime,
265 pub(crate) created: SystemTime,
266 pub(crate) updated: SystemTime,
267 pub(crate) stale_while_revalidate_sec: u32,
268 pub(crate) stale_if_error_sec: u32,
269 }
270
271 impl InternalMetaV2Base {
272 pub const VERSION: u8 = 2;
273 pub fn serialize(&self) -> Result<Vec<u8>> {
274 assert!(self.version >= Self::VERSION);
275 rmp_serde::encode::to_vec(self)
276 .or_err(InternalError, "failed to encode cache meta")
277 }
278 fn deserialize(buf: &[u8]) -> Result<Self> {
279 rmp_serde::decode::from_slice(buf)
280 .or_err(InternalError, "failed to decode cache meta v2")
281 }
282 }
283
284 let meta = InternalMetaV2::default();
286 let binary = meta.serialize().unwrap();
287 let meta2 = InternalMetaV2Base::deserialize(&binary).unwrap();
288 assert_eq!(meta2.version, 2);
289 assert_eq!(meta.fresh_until, meta2.fresh_until);
290 assert_eq!(meta.created, meta2.created);
291 assert_eq!(meta.updated, meta2.updated);
292
293 let now = SystemTime::now();
295 let meta = InternalMetaV2Base {
296 version: InternalMetaV2::VERSION,
297 fresh_until: now,
298 created: now,
299 updated: now,
300 stale_while_revalidate_sec: 0,
301 stale_if_error_sec: 0,
302 };
303 let binary = meta.serialize().unwrap();
304 let meta2 = InternalMetaV2::deserialize(&binary).unwrap();
305 assert_eq!(meta2.version, 2);
306 assert_eq!(meta.fresh_until, meta2.fresh_until);
307 assert_eq!(meta.created, meta2.created);
308 assert_eq!(meta.updated, meta2.updated);
309 }
310 }
311}
312
313#[derive(Debug)]
314pub(crate) struct CacheMetaInner {
315 pub(crate) internal: InternalMeta,
317 pub(crate) header: ResponseHeader,
318 pub extensions: Extensions,
321}
322
323#[derive(Debug)]
325pub struct CacheMeta(pub(crate) Box<CacheMetaInner>);
326
327impl CacheMeta {
328 pub fn new(
330 fresh_until: SystemTime,
331 created: SystemTime,
332 stale_while_revalidate_sec: u32,
333 stale_if_error_sec: u32,
334 header: ResponseHeader,
335 ) -> CacheMeta {
336 CacheMeta(Box::new(CacheMetaInner {
337 internal: InternalMeta {
338 version: InternalMeta::VERSION,
339 fresh_until,
340 created,
341 updated: created, stale_while_revalidate_sec,
343 stale_if_error_sec,
344 ..Default::default()
345 },
346 header,
347 extensions: Extensions::new(),
348 }))
349 }
350
351 pub fn created(&self) -> SystemTime {
353 self.0.internal.created
354 }
355
356 pub fn updated(&self) -> SystemTime {
360 self.0.internal.updated
361 }
362
363 pub fn is_fresh(&self, time: SystemTime) -> bool {
365 self.0.internal.fresh_until >= time
367 }
368
369 pub fn fresh_sec(&self) -> u64 {
373 self.0
377 .internal
378 .fresh_until
379 .duration_since(self.0.internal.updated)
380 .map_or(0, |duration| duration.as_secs())
381 }
382
383 pub fn fresh_until(&self) -> SystemTime {
385 self.0.internal.fresh_until
386 }
387
388 pub fn age(&self) -> Duration {
390 SystemTime::now()
391 .duration_since(self.updated())
392 .unwrap_or_default()
393 }
394
395 pub fn stale_while_revalidate_sec(&self) -> u32 {
397 self.0.internal.stale_while_revalidate_sec
398 }
399
400 pub fn stale_if_error_sec(&self) -> u32 {
402 self.0.internal.stale_if_error_sec
403 }
404
405 pub fn serve_stale_while_revalidate(&self, time: SystemTime) -> bool {
410 self.can_serve_stale(self.0.internal.stale_while_revalidate_sec, time)
411 }
412
413 pub fn serve_stale_if_error(&self, time: SystemTime) -> bool {
418 self.can_serve_stale(self.0.internal.stale_if_error_sec, time)
419 }
420
421 pub fn disable_serve_stale(&mut self) {
423 self.0.internal.stale_if_error_sec = 0;
424 self.0.internal.stale_while_revalidate_sec = 0;
425 }
426
427 pub fn variance(&self) -> Option<HashBinary> {
429 self.0.internal.variance
430 }
431
432 pub fn set_variance_key(&mut self, variance_key: HashBinary) {
434 self.0.internal.variance = Some(variance_key);
435 }
436
437 pub fn set_variance(&mut self, variance: HashBinary) {
439 self.0.internal.variance = Some(variance)
440 }
441
442 pub fn remove_variance(&mut self) {
444 self.0.internal.variance = None
445 }
446
447 pub fn response_header(&self) -> &ResponseHeader {
449 &self.0.header
450 }
451
452 pub fn response_header_mut(&mut self) -> &mut ResponseHeader {
454 &mut self.0.header
455 }
456
457 pub fn extensions(&self) -> &Extensions {
459 &self.0.extensions
460 }
461
462 pub fn extensions_mut(&mut self) -> &mut Extensions {
464 &mut self.0.extensions
465 }
466
467 pub fn response_header_copy(&self) -> ResponseHeader {
469 self.0.header.clone()
470 }
471
472 pub fn headers(&self) -> &HMap {
474 &self.0.header.headers
475 }
476
477 fn can_serve_stale(&self, serve_stale_sec: u32, time: SystemTime) -> bool {
478 if serve_stale_sec == 0 {
479 return false;
480 }
481 if let Some(stale_until) = self
482 .0
483 .internal
484 .fresh_until
485 .checked_add(Duration::from_secs(serve_stale_sec.into()))
486 {
487 stale_until >= time
488 } else {
489 true
491 }
492 }
493
494 pub fn serialize(&self) -> Result<(Vec<u8>, Vec<u8>)> {
496 let internal = self.0.internal.serialize()?;
497 let header = header_serialize(&self.0.header)?;
498 Ok((internal, header))
499 }
500
501 pub fn deserialize(internal: &[u8], header: &[u8]) -> Result<Self> {
503 let internal = internal_meta::deserialize(internal)?;
504 let header = header_deserialize(header)?;
505 Ok(CacheMeta(Box::new(CacheMetaInner {
506 internal,
507 header,
508 extensions: Extensions::new(),
509 })))
510 }
511}
512
513use http::StatusCode;
514
515pub type FreshSecByStatusFn = fn(StatusCode) -> Option<u32>;
517
518pub struct CacheMetaDefaults {
520 fresh_sec_fn: FreshSecByStatusFn,
522 stale_while_revalidate_sec: u32,
523 stale_if_error_sec: u32,
525}
526
527impl CacheMetaDefaults {
528 pub const fn new(
530 fresh_sec_fn: FreshSecByStatusFn,
531 stale_while_revalidate_sec: u32,
532 stale_if_error_sec: u32,
533 ) -> Self {
534 CacheMetaDefaults {
535 fresh_sec_fn,
536 stale_while_revalidate_sec,
537 stale_if_error_sec,
538 }
539 }
540
541 pub fn fresh_sec(&self, resp_status: StatusCode) -> Option<u32> {
545 if resp_status == StatusCode::NOT_MODIFIED {
547 (self.fresh_sec_fn)(StatusCode::OK)
548 } else {
549 (self.fresh_sec_fn)(resp_status)
550 }
551 }
552
553 pub fn serve_stale_while_revalidate_sec(&self) -> u32 {
555 self.stale_while_revalidate_sec
556 }
557
558 pub fn serve_stale_if_error_sec(&self) -> u32 {
560 self.stale_if_error_sec
561 }
562}
563
564use log::warn;
565use once_cell::sync::{Lazy, OnceCell};
566use pingora_header_serde::HeaderSerde;
567use std::fs::File;
568use std::io::Read;
569
570pub(crate) static COMPRESSION_DICT_PATH: OnceCell<String> = OnceCell::new();
572
573fn load_file(path: &String) -> Option<Vec<u8>> {
574 let mut file = File::open(path)
575 .map_err(|e| {
576 warn!(
577 "failed to open header compress dictionary file at {}, {:?}",
578 path, e
579 );
580 e
581 })
582 .ok()?;
583 let mut dict = Vec::new();
584 file.read_to_end(&mut dict)
585 .map_err(|e| {
586 warn!(
587 "failed to read header compress dictionary file at {}, {:?}",
588 path, e
589 );
590 e
591 })
592 .ok()?;
593
594 Some(dict)
595}
596
597static HEADER_SERDE: Lazy<HeaderSerde> = Lazy::new(|| {
598 let dict_path_opt = COMPRESSION_DICT_PATH.get();
599
600 if dict_path_opt.is_none() {
601 warn!("COMPRESSION_DICT_PATH is not set");
602 }
603
604 let result = dict_path_opt.and_then(load_file);
605
606 if result.is_none() {
607 warn!("HeaderSerde not loaded from file");
608 }
609
610 HeaderSerde::new(result)
611});
612
613pub(crate) fn header_serialize(header: &ResponseHeader) -> Result<Vec<u8>> {
614 HEADER_SERDE.serialize(header)
615}
616
617pub(crate) fn header_deserialize<T: AsRef<[u8]>>(buf: T) -> Result<ResponseHeader> {
618 HEADER_SERDE.deserialize(buf.as_ref())
619}