pub use http::Extensions;
use log::warn;
use once_cell::sync::{Lazy, OnceCell};
use pingora_error::{Error, ErrorType::*, OrErr, Result};
use pingora_header_serde::HeaderSerde;
use pingora_http::{HMap, ResponseHeader};
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use std::time::{Duration, SystemTime};
use crate::key::HashBinary;
pub(crate) type InternalMeta = internal_meta::InternalMetaLatest;
mod internal_meta {
use super::*;
pub(crate) type InternalMetaLatest = InternalMetaV2;
#[derive(Debug, Deserialize, Serialize, Clone)]
pub(crate) struct InternalMetaV0 {
pub(crate) fresh_until: SystemTime,
pub(crate) created: SystemTime,
pub(crate) stale_while_revalidate_sec: u32,
pub(crate) stale_if_error_sec: u32,
}
impl InternalMetaV0 {
#[allow(dead_code)]
fn serialize(&self) -> Result<Vec<u8>> {
rmp_serde::encode::to_vec(self).or_err(InternalError, "failed to encode cache meta")
}
fn deserialize(buf: &[u8]) -> Result<Self> {
rmp_serde::decode::from_slice(buf)
.or_err(InternalError, "failed to decode cache meta v0")
}
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub(crate) struct InternalMetaV1 {
pub(crate) version: u8,
pub(crate) fresh_until: SystemTime,
pub(crate) created: SystemTime,
pub(crate) stale_while_revalidate_sec: u32,
pub(crate) stale_if_error_sec: u32,
}
impl InternalMetaV1 {
#[allow(dead_code)]
pub const VERSION: u8 = 1;
#[allow(dead_code)]
pub fn serialize(&self) -> Result<Vec<u8>> {
assert_eq!(self.version, 1);
rmp_serde::encode::to_vec(self).or_err(InternalError, "failed to encode cache meta")
}
fn deserialize(buf: &[u8]) -> Result<Self> {
rmp_serde::decode::from_slice(buf)
.or_err(InternalError, "failed to decode cache meta v1")
}
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub(crate) struct InternalMetaV2 {
pub(crate) version: u8,
pub(crate) fresh_until: SystemTime,
pub(crate) created: SystemTime,
pub(crate) updated: SystemTime,
pub(crate) stale_while_revalidate_sec: u32,
pub(crate) stale_if_error_sec: u32,
#[serde(default)]
pub(crate) variance: Option<HashBinary>,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) epoch_override: Option<SystemTime>,
}
impl Default for InternalMetaV2 {
fn default() -> Self {
let epoch = SystemTime::UNIX_EPOCH;
InternalMetaV2 {
version: InternalMetaV2::VERSION,
fresh_until: epoch,
created: epoch,
updated: epoch,
stale_while_revalidate_sec: 0,
stale_if_error_sec: 0,
variance: None,
epoch_override: None,
}
}
}
impl InternalMetaV2 {
pub const VERSION: u8 = 2;
pub fn serialize(&self) -> Result<Vec<u8>> {
assert_eq!(self.version, Self::VERSION);
rmp_serde::encode::to_vec(self).or_err(InternalError, "failed to encode cache meta")
}
fn deserialize(buf: &[u8]) -> Result<Self> {
rmp_serde::decode::from_slice(buf)
.or_err(InternalError, "failed to decode cache meta v2")
}
}
impl From<InternalMetaV0> for InternalMetaV2 {
fn from(v0: InternalMetaV0) -> Self {
InternalMetaV2 {
version: InternalMetaV2::VERSION,
fresh_until: v0.fresh_until,
created: v0.created,
updated: v0.created,
stale_while_revalidate_sec: v0.stale_while_revalidate_sec,
stale_if_error_sec: v0.stale_if_error_sec,
..Default::default()
}
}
}
impl From<InternalMetaV1> for InternalMetaV2 {
fn from(v1: InternalMetaV1) -> Self {
InternalMetaV2 {
version: InternalMetaV2::VERSION,
fresh_until: v1.fresh_until,
created: v1.created,
updated: v1.created,
stale_while_revalidate_sec: v1.stale_while_revalidate_sec,
stale_if_error_sec: v1.stale_if_error_sec,
..Default::default()
}
}
}
pub(crate) fn deserialize(buf: &[u8]) -> Result<InternalMetaLatest> {
const MIN_SIZE: usize = 10; if buf.len() < MIN_SIZE {
return Error::e_explain(
InternalError,
format!("Buf too short ({}) to be InternalMeta", buf.len()),
);
}
let preread_buf = &mut &buf[..MIN_SIZE];
match rmp::decode::read_array_len(preread_buf)
.or_err(InternalError, "failed to decode cache meta array size")?
{
4 => Ok(InternalMetaV0::deserialize(buf)?.into()),
_ => {
let version = rmp::decode::read_pfix(preread_buf)
.or_err(InternalError, "failed to decode meta version")?;
match version {
1 => Ok(InternalMetaV1::deserialize(buf)?.into()),
2 => InternalMetaV2::deserialize(buf),
_ => Error::e_explain(
InternalError,
format!("Unknown InternalMeta version {version}"),
),
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_internal_meta_serde_v0() {
let meta = InternalMetaV0 {
fresh_until: SystemTime::now(),
created: SystemTime::now(),
stale_while_revalidate_sec: 0,
stale_if_error_sec: 0,
};
let binary = meta.serialize().unwrap();
let meta2 = InternalMetaV0::deserialize(&binary).unwrap();
assert_eq!(meta.fresh_until, meta2.fresh_until);
}
#[test]
fn test_internal_meta_serde_v1() {
let meta = InternalMetaV1 {
version: InternalMetaV1::VERSION,
fresh_until: SystemTime::now(),
created: SystemTime::now(),
stale_while_revalidate_sec: 0,
stale_if_error_sec: 0,
};
let binary = meta.serialize().unwrap();
let meta2 = InternalMetaV1::deserialize(&binary).unwrap();
assert_eq!(meta.fresh_until, meta2.fresh_until);
}
#[test]
fn test_internal_meta_serde_v2() {
let meta = InternalMetaV2::default();
let binary = meta.serialize().unwrap();
let meta2 = InternalMetaV2::deserialize(&binary).unwrap();
assert_eq!(meta2.version, 2);
assert_eq!(meta.fresh_until, meta2.fresh_until);
assert_eq!(meta.created, meta2.created);
assert_eq!(meta.updated, meta2.updated);
}
#[test]
fn test_internal_meta_serde_across_versions() {
let meta = InternalMetaV0 {
fresh_until: SystemTime::now(),
created: SystemTime::now(),
stale_while_revalidate_sec: 0,
stale_if_error_sec: 0,
};
let binary = meta.serialize().unwrap();
let meta2 = deserialize(&binary).unwrap();
assert_eq!(meta2.version, 2);
assert_eq!(meta.fresh_until, meta2.fresh_until);
let meta = InternalMetaV1 {
version: 1,
fresh_until: SystemTime::now(),
created: SystemTime::now(),
stale_while_revalidate_sec: 0,
stale_if_error_sec: 0,
};
let binary = meta.serialize().unwrap();
let meta2 = deserialize(&binary).unwrap();
assert_eq!(meta2.version, 2);
assert_eq!(meta.fresh_until, meta2.fresh_until);
assert_eq!(meta2.created, meta2.updated);
}
#[derive(Deserialize, Serialize)]
struct InternalMetaV2Base {
version: u8,
fresh_until: SystemTime,
created: SystemTime,
updated: SystemTime,
stale_while_revalidate_sec: u32,
stale_if_error_sec: u32,
}
impl InternalMetaV2Base {
pub const VERSION: u8 = 2;
pub fn serialize(&self) -> Result<Vec<u8>> {
assert!(self.version >= Self::VERSION);
rmp_serde::encode::to_vec(self).or_err(InternalError, "failed to encode cache meta")
}
fn deserialize(buf: &[u8]) -> Result<Self> {
rmp_serde::decode::from_slice(buf)
.or_err(InternalError, "failed to decode cache meta v2")
}
}
#[derive(Deserialize, Serialize)]
struct InternalMetaV2BaseWithVariance {
version: u8,
fresh_until: SystemTime,
created: SystemTime,
updated: SystemTime,
stale_while_revalidate_sec: u32,
stale_if_error_sec: u32,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
variance: Option<HashBinary>,
}
impl Default for InternalMetaV2BaseWithVariance {
fn default() -> Self {
let epoch = SystemTime::UNIX_EPOCH;
InternalMetaV2BaseWithVariance {
version: InternalMetaV2::VERSION,
fresh_until: epoch,
created: epoch,
updated: epoch,
stale_while_revalidate_sec: 0,
stale_if_error_sec: 0,
variance: None,
}
}
}
impl InternalMetaV2BaseWithVariance {
pub const VERSION: u8 = 2;
pub fn serialize(&self) -> Result<Vec<u8>> {
assert!(self.version >= Self::VERSION);
rmp_serde::encode::to_vec(self).or_err(InternalError, "failed to encode cache meta")
}
fn deserialize(buf: &[u8]) -> Result<Self> {
rmp_serde::decode::from_slice(buf)
.or_err(InternalError, "failed to decode cache meta v2")
}
}
#[test]
fn test_internal_meta_serde_v2_extend_fields_variance() {
let meta = InternalMetaV2BaseWithVariance::default();
let binary = meta.serialize().unwrap();
let meta2 = InternalMetaV2Base::deserialize(&binary).unwrap();
assert_eq!(meta2.version, 2);
assert_eq!(meta.fresh_until, meta2.fresh_until);
assert_eq!(meta.created, meta2.created);
assert_eq!(meta.updated, meta2.updated);
let now = SystemTime::now();
let meta = InternalMetaV2Base {
version: InternalMetaV2::VERSION,
fresh_until: now,
created: now,
updated: now,
stale_while_revalidate_sec: 0,
stale_if_error_sec: 0,
};
let binary = meta.serialize().unwrap();
let meta2 = InternalMetaV2BaseWithVariance::deserialize(&binary).unwrap();
assert_eq!(meta2.version, 2);
assert_eq!(meta.fresh_until, meta2.fresh_until);
assert_eq!(meta.created, meta2.created);
assert_eq!(meta.updated, meta2.updated);
}
#[test]
fn test_internal_meta_serde_v2_extend_fields_epoch_override() {
let now = SystemTime::now();
let meta = InternalMetaV2 {
fresh_until: now,
created: now,
updated: now,
epoch_override: None, ..Default::default()
};
let binary = meta.serialize().unwrap();
let meta2 = InternalMetaV2BaseWithVariance::deserialize(&binary).unwrap();
assert_eq!(meta2.version, 2);
assert_eq!(meta.fresh_until, meta2.fresh_until);
assert_eq!(meta.created, meta2.created);
assert_eq!(meta.updated, meta2.updated);
assert!(meta2.variance.is_none());
let mut meta = InternalMetaV2BaseWithVariance {
version: InternalMetaV2::VERSION,
fresh_until: now,
created: now,
updated: now,
stale_while_revalidate_sec: 0,
stale_if_error_sec: 0,
variance: None,
};
let binary = meta.serialize().unwrap();
let meta2 = InternalMetaV2::deserialize(&binary).unwrap();
assert_eq!(meta2.version, 2);
assert_eq!(meta.fresh_until, meta2.fresh_until);
assert_eq!(meta.created, meta2.created);
assert_eq!(meta.updated, meta2.updated);
assert!(meta2.variance.is_none());
assert!(meta2.epoch_override.is_none());
meta.variance = Some(*b"variance_testing");
let binary = meta.serialize().unwrap();
let meta2 = InternalMetaV2::deserialize(&binary).unwrap();
assert_eq!(meta2.version, 2);
assert_eq!(meta.fresh_until, meta2.fresh_until);
assert_eq!(meta.created, meta2.created);
assert_eq!(meta.updated, meta2.updated);
assert_eq!(meta.variance, meta2.variance);
assert!(meta2.epoch_override.is_none());
}
}
}
#[derive(Debug)]
pub(crate) struct CacheMetaInner {
pub(crate) internal: InternalMeta,
pub(crate) header: ResponseHeader,
pub extensions: Extensions,
}
#[derive(Debug)]
pub struct CacheMeta(pub(crate) Box<CacheMetaInner>);
impl CacheMeta {
pub fn new(
fresh_until: SystemTime,
created: SystemTime,
stale_while_revalidate_sec: u32,
stale_if_error_sec: u32,
header: ResponseHeader,
) -> CacheMeta {
CacheMeta(Box::new(CacheMetaInner {
internal: InternalMeta {
version: InternalMeta::VERSION,
fresh_until,
created,
updated: created, stale_while_revalidate_sec,
stale_if_error_sec,
..Default::default()
},
header,
extensions: Extensions::new(),
}))
}
pub fn created(&self) -> SystemTime {
self.0.internal.created
}
pub fn updated(&self) -> SystemTime {
self.0.internal.updated
}
pub fn epoch(&self) -> SystemTime {
self.0.internal.epoch_override.unwrap_or(self.updated())
}
pub fn epoch_override(&self) -> Option<SystemTime> {
self.0.internal.epoch_override
}
pub fn set_epoch_override(&mut self, epoch: SystemTime) {
self.0.internal.epoch_override = Some(epoch);
}
pub fn remove_epoch_override(&mut self) {
self.0.internal.epoch_override = None;
}
pub fn is_fresh(&self, time: SystemTime) -> bool {
self.0.internal.fresh_until >= time
}
pub fn fresh_sec(&self) -> u64 {
let reference = self.epoch();
self.0
.internal
.fresh_until
.duration_since(reference)
.map_or(0, |duration| duration.as_secs())
}
pub fn fresh_until(&self) -> SystemTime {
self.0.internal.fresh_until
}
pub fn age(&self) -> Duration {
let reference = self.epoch();
SystemTime::now()
.duration_since(reference)
.unwrap_or_default()
}
pub fn stale_while_revalidate_sec(&self) -> u32 {
self.0.internal.stale_while_revalidate_sec
}
pub fn stale_if_error_sec(&self) -> u32 {
self.0.internal.stale_if_error_sec
}
pub fn serve_stale_while_revalidate(&self, time: SystemTime) -> bool {
self.can_serve_stale(self.0.internal.stale_while_revalidate_sec, time)
}
pub fn serve_stale_if_error(&self, time: SystemTime) -> bool {
self.can_serve_stale(self.0.internal.stale_if_error_sec, time)
}
pub fn disable_serve_stale(&mut self) {
self.0.internal.stale_if_error_sec = 0;
self.0.internal.stale_while_revalidate_sec = 0;
}
pub fn variance(&self) -> Option<HashBinary> {
self.0.internal.variance
}
pub fn set_variance_key(&mut self, variance_key: HashBinary) {
self.0.internal.variance = Some(variance_key);
}
pub fn set_variance(&mut self, variance: HashBinary) {
self.0.internal.variance = Some(variance)
}
pub fn remove_variance(&mut self) {
self.0.internal.variance = None
}
pub fn response_header(&self) -> &ResponseHeader {
&self.0.header
}
pub fn response_header_mut(&mut self) -> &mut ResponseHeader {
&mut self.0.header
}
pub fn extensions(&self) -> &Extensions {
&self.0.extensions
}
pub fn extensions_mut(&mut self) -> &mut Extensions {
&mut self.0.extensions
}
pub fn response_header_copy(&self) -> ResponseHeader {
self.0.header.clone()
}
pub fn headers(&self) -> &HMap {
&self.0.header.headers
}
fn can_serve_stale(&self, serve_stale_sec: u32, time: SystemTime) -> bool {
if serve_stale_sec == 0 {
return false;
}
if let Some(stale_until) = self
.0
.internal
.fresh_until
.checked_add(Duration::from_secs(serve_stale_sec.into()))
{
stale_until >= time
} else {
true
}
}
pub fn serialize(&self) -> Result<(Vec<u8>, Vec<u8>)> {
let internal = self.0.internal.serialize()?;
let header = header_serialize(&self.0.header)?;
log::debug!("header to serialize: {:?}", &self.0.header);
Ok((internal, header))
}
pub fn deserialize(internal: &[u8], header: &[u8]) -> Result<Self> {
let internal = internal_meta::deserialize(internal)?;
let header = header_deserialize(header)?;
Ok(CacheMeta(Box::new(CacheMetaInner {
internal,
header,
extensions: Extensions::new(),
})))
}
}
use http::StatusCode;
pub type FreshDurationByStatusFn = fn(StatusCode) -> Option<Duration>;
pub struct CacheMetaDefaults {
fresh_sec_fn: FreshDurationByStatusFn,
stale_while_revalidate_sec: u32,
stale_if_error_sec: u32,
}
impl CacheMetaDefaults {
pub const fn new(
fresh_sec_fn: FreshDurationByStatusFn,
stale_while_revalidate_sec: u32,
stale_if_error_sec: u32,
) -> Self {
CacheMetaDefaults {
fresh_sec_fn,
stale_while_revalidate_sec,
stale_if_error_sec,
}
}
pub fn fresh_sec(&self, resp_status: StatusCode) -> Option<Duration> {
if resp_status == StatusCode::NOT_MODIFIED {
(self.fresh_sec_fn)(StatusCode::OK)
} else {
(self.fresh_sec_fn)(resp_status)
}
}
pub fn serve_stale_while_revalidate_sec(&self) -> u32 {
self.stale_while_revalidate_sec
}
pub fn serve_stale_if_error_sec(&self) -> u32 {
self.stale_if_error_sec
}
}
static COMPRESSION_DICT_CONTENT: OnceCell<Cow<'static, [u8]>> = OnceCell::new();
static HEADER_SERDE: Lazy<HeaderSerde> = Lazy::new(|| {
let dict_opt = if let Some(dict_content) = COMPRESSION_DICT_CONTENT.get() {
Some(dict_content.to_vec())
} else {
warn!("no header compression dictionary loaded - use set_compression_dict_content() or set_compression_dict_path() to set one");
None
};
HeaderSerde::new(dict_opt)
});
pub(crate) fn header_serialize(header: &ResponseHeader) -> Result<Vec<u8>> {
HEADER_SERDE.serialize(header)
}
pub(crate) fn header_deserialize<T: AsRef<[u8]>>(buf: T) -> Result<ResponseHeader> {
HEADER_SERDE.deserialize(buf.as_ref())
}
pub fn set_compression_dict_path(path: &str) -> bool {
match std::fs::read(path) {
Ok(dict) => COMPRESSION_DICT_CONTENT.set(dict.into()).is_ok(),
Err(e) => {
warn!(
"failed to read header compress dictionary file at {}, {:?}",
path, e
);
false
}
}
}
pub fn set_compression_dict_content(content: Cow<'static, [u8]>) -> bool {
COMPRESSION_DICT_CONTENT.set(content).is_ok()
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
#[test]
fn test_cache_meta_age_without_override() {
let now = SystemTime::now();
let header = ResponseHeader::build_no_case(200, None).unwrap();
let meta = CacheMeta::new(now + Duration::from_secs(300), now, 0, 0, header);
std::thread::sleep(Duration::from_millis(100));
let age = meta.age();
assert!(age.as_secs() < 1, "age should be close to 0");
assert_eq!(meta.epoch(), meta.updated());
}
#[test]
fn test_cache_meta_age_with_epoch_override_past() {
let now = SystemTime::now();
let header = ResponseHeader::build(200, None).unwrap();
let mut meta = CacheMeta::new(now + Duration::from_secs(300), now, 0, 0, header);
let epoch_override = now - Duration::from_secs(10);
meta.set_epoch_override(epoch_override);
let age = meta.age();
assert!(age.as_secs() >= 10);
assert!(age.as_secs() < 12);
assert_eq!(meta.epoch(), epoch_override);
assert_eq!(meta.epoch_override(), Some(epoch_override));
}
#[test]
fn test_cache_meta_age_with_epoch_override_future() {
let now = SystemTime::now();
let header = ResponseHeader::build(200, None).unwrap();
let mut meta = CacheMeta::new(now + Duration::from_secs(100), now, 0, 0, header);
let future_epoch = now + Duration::from_secs(10);
meta.set_epoch_override(future_epoch);
let age_with_epoch = meta.age();
assert_eq!(age_with_epoch, Duration::ZERO);
}
#[test]
fn test_cache_meta_fresh_sec() {
let header = ResponseHeader::build(StatusCode::OK, None).unwrap();
let mut meta = CacheMeta::new(
SystemTime::now() + Duration::from_secs(100),
SystemTime::now() - Duration::from_secs(100),
0,
0,
header,
);
meta.0.internal.updated = SystemTime::UNIX_EPOCH + Duration::from_secs(1000);
meta.0.internal.fresh_until = SystemTime::UNIX_EPOCH + Duration::from_secs(1100);
let fresh_sec_without_override = meta.fresh_sec();
assert_eq!(fresh_sec_without_override, 100);
let epoch_override = SystemTime::UNIX_EPOCH + Duration::from_secs(1050);
meta.set_epoch_override(epoch_override);
assert_eq!(meta.epoch_override(), Some(epoch_override));
assert_eq!(meta.epoch(), epoch_override);
let fresh_sec_with_override = meta.fresh_sec();
assert_eq!(fresh_sec_with_override, 50);
meta.remove_epoch_override();
assert_eq!(meta.epoch_override(), None);
assert_eq!(meta.epoch(), meta.updated());
assert_eq!(meta.fresh_sec(), 100); }
}