use std::fmt;
use std::time::SystemTime;
use anyhow::Result;
use http::HeaderMap;
use http::HeaderValue;
use http::Method;
use http::Response;
use http::StatusCode;
use http::Uri;
use http::Version;
use http::header;
use http::header::CACHE_CONTROL;
use http::uri::Authority;
use http_body::Body;
use http_cache_semantics::AfterResponse;
use http_cache_semantics::BeforeRequest;
use http_cache_semantics::CacheOptions;
use http_cache_semantics::CachePolicy;
use sha2::Digest;
use sha2::Sha256;
use tracing::debug;
use crate::body::CacheBody;
use crate::storage::CacheStorage;
use crate::storage::StoredResponse;
pub const X_CACHE_LOOKUP: &str = "x-cache-lookup";
pub const X_CACHE: &str = "x-cache";
pub const X_CACHE_DIGEST: &str = "x-cache-digest";
fn storage_key(method: &Method, uri: &Uri, headers: &HeaderMap) -> String {
let mut hasher = Sha256::new();
hasher.update(method.as_str());
hasher.update(":");
if let Some(scheme) = uri.scheme_str() {
hasher.update(scheme);
}
hasher.update("://");
if let Some(authority) = uri.authority() {
hasher.update(authority.as_str());
}
hasher.update(uri.path());
if let Some(query) = uri.query() {
hasher.update(query);
}
if let Some(value) = headers.get(header::RANGE) {
hasher.update(value.as_bytes());
}
let bytes = hasher.finalize();
hex::encode(bytes)
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub enum CacheLookupStatus {
Hit,
Miss,
}
impl fmt::Display for CacheLookupStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Hit => write!(f, "HIT"),
Self::Miss => write!(f, "MISS"),
}
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub enum CacheStatus {
Hit,
Miss,
}
impl fmt::Display for CacheStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Hit => write!(f, "HIT"),
Self::Miss => write!(f, "MISS"),
}
}
}
trait ResponseExt {
fn add_warning(&mut self, uri: &Uri, code: usize, message: &str);
fn must_revalidate(&self) -> bool;
fn extend_headers(&mut self, headers: HeaderMap);
fn set_cache_status(
&mut self,
lookup: CacheLookupStatus,
status: CacheStatus,
digest: Option<&str>,
);
}
impl<B> ResponseExt for Response<B> {
fn add_warning(&mut self, url: &Uri, code: usize, message: &str) {
self.headers_mut().insert(
"warning",
HeaderValue::from_str(&format!(
"{} {} {:?} \"{}\"",
code,
url.host().expect("URL should be valid"),
message,
httpdate::fmt_http_date(SystemTime::now())
))
.expect("value should be valid"),
);
}
fn must_revalidate(&self) -> bool {
self.headers()
.get(CACHE_CONTROL.as_str())
.is_some_and(|val| {
val.to_str()
.unwrap_or("")
.to_lowercase()
.contains("must-revalidate")
})
}
fn extend_headers(&mut self, headers: HeaderMap) {
self.headers_mut().extend(headers);
}
fn set_cache_status(
&mut self,
lookup: CacheLookupStatus,
status: CacheStatus,
digest: Option<&str>,
) {
let headers = self.headers_mut();
headers.insert(
X_CACHE_LOOKUP,
lookup.to_string().parse().expect("value should parse"),
);
headers.insert(
X_CACHE,
status.to_string().parse().expect("value should parse"),
);
if let Some(digest) = digest {
headers.insert(X_CACHE_DIGEST, digest.parse().expect("value should parse"));
}
}
}
pub trait Request<B: Body>: Send {
fn version(&self) -> Version;
fn method(&self) -> &Method;
fn uri(&self) -> &Uri;
fn headers(&self) -> &HeaderMap;
fn send(self, headers: Option<HeaderMap>) -> impl Future<Output = Result<Response<B>>> + Send;
}
struct RequestLike {
method: Method,
uri: Uri,
headers: HeaderMap,
}
impl RequestLike {
fn new<R: Request<B>, B: Body>(request: &R) -> Self {
Self {
method: request.method().clone(),
uri: request.uri().clone(),
headers: request.headers().clone(),
}
}
}
impl http_cache_semantics::RequestLike for RequestLike {
fn uri(&self) -> Uri {
self.uri.clone()
}
fn is_same_uri(&self, other: &Uri) -> bool {
self.uri.eq(other)
}
fn method(&self) -> &Method {
&self.method
}
fn headers(&self) -> &HeaderMap {
&self.headers
}
}
type RevalidationHook = dyn Fn(&dyn http_cache_semantics::RequestLike, &mut HeaderMap) -> Result<()>
+ Send
+ Sync
+ 'static;
pub struct Cache<S> {
storage: S,
options: CacheOptions,
hook: Option<Box<RevalidationHook>>,
}
impl<S> Cache<S>
where
S: CacheStorage,
{
pub fn new(storage: S) -> Self {
Self {
storage,
options: CacheOptions {
shared: false,
..Default::default()
},
hook: None,
}
}
pub fn new_with_options(storage: S, options: CacheOptions) -> Self {
Self {
storage,
options,
hook: None,
}
}
pub fn with_revalidation_hook(
mut self,
hook: impl Fn(&dyn http_cache_semantics::RequestLike, &mut HeaderMap) -> Result<()>
+ Send
+ Sync
+ 'static,
) -> Self {
self.hook = Some(Box::new(hook));
self
}
pub fn storage(&self) -> &S {
&self.storage
}
pub async fn send<B: Body + Send>(
&self,
request: impl Request<B>,
) -> Result<Response<CacheBody<B>>> {
let method = request.method();
let uri = request.uri();
let key = storage_key(method, uri, request.headers());
if matches!(*method, Method::GET | Method::HEAD) {
match self.storage.get(&key).await {
Ok(Some(stored)) => {
debug!(
method = method.as_str(),
scheme = uri.scheme_str(),
authority = uri.authority().map(Authority::as_str),
path = uri.path(),
key,
"cache hit"
);
return self.conditional_send_upstream(key, request, stored).await;
}
Ok(None) => {
debug!(
method = method.as_str(),
scheme = uri.scheme_str(),
authority = uri.authority().map(Authority::as_str),
path = uri.path(),
key,
"cache miss"
);
}
Err(e) => {
debug!(
method = method.as_str(),
scheme = uri.scheme_str(),
authority = uri.authority().map(Authority::as_str),
path = uri.path(),
key,
error = format!("{e:?}"),
"failed to get response from storage; treating as not cached"
);
}
}
}
self.send_upstream(key, request, CacheLookupStatus::Miss)
.await
}
async fn send_upstream<B: Body + Send>(
&self,
key: String,
request: impl Request<B>,
lookup_status: CacheLookupStatus,
) -> Result<Response<CacheBody<B>>> {
let request_like: RequestLike = RequestLike::new(&request);
let mut response = request.send(None).await?;
let policy =
CachePolicy::new_options(&request_like, &response, SystemTime::now(), self.options);
response.set_cache_status(lookup_status, CacheStatus::Miss, None);
if matches!(request_like.method, Method::GET | Method::HEAD)
&& response.status().is_success()
&& policy.is_storable()
{
let (parts, body) = response.into_parts();
return match self.storage.store(key.clone(), parts, body, policy).await {
Ok(response) => Ok(response),
Err(e) => {
debug!(
method = request_like.method.as_str(),
scheme = request_like.uri.scheme_str(),
authority = request_like.uri.authority().map(Authority::as_str),
path = request_like.uri.path(),
key,
error = format!("{e:?}"),
"failed to store response"
);
Err(e)
}
};
}
debug!(
method = request_like.method.as_str(),
scheme = request_like.uri.scheme_str(),
authority = request_like.uri.authority().map(Authority::as_str),
path = request_like.uri.path(),
key,
status = response.status().as_u16(),
"response is not cacheable"
);
if !request_like.method.is_safe() {
for method in [Method::HEAD, Method::GET] {
let key = storage_key(&method, &request_like.uri, &request_like.headers);
if let Err(e) = self.storage.delete(&key).await {
debug!(
method = method.as_str(),
scheme = request_like.uri.scheme_str(),
authority = request_like.uri.authority().map(Authority::as_str),
path = request_like.uri.path(),
key,
error = format!("{e:?}"),
"failed to put response into storage"
);
}
}
}
Ok(response.map(CacheBody::from_upstream))
}
async fn conditional_send_upstream<B: Body + Send>(
&self,
key: String,
request: impl Request<B>,
mut stored: StoredResponse<B>,
) -> Result<Response<CacheBody<B>>> {
let request_like = RequestLike::new(&request);
let mut headers = match stored
.policy
.before_request(&request_like, SystemTime::now())
{
BeforeRequest::Fresh(parts) => {
debug!(
method = request_like.method.as_str(),
scheme = request_like.uri.scheme_str(),
authority = request_like.uri.authority().map(Authority::as_str),
path = request_like.uri.path(),
key,
digest = stored.digest,
"response is still fresh: responding with body from storage"
);
stored.response.extend_headers(parts.headers);
stored.response.set_cache_status(
CacheLookupStatus::Hit,
CacheStatus::Hit,
Some(&stored.digest),
);
return Ok(stored.response);
}
BeforeRequest::Stale {
request: http::request::Parts { headers, .. },
matches,
} => {
if matches { Some(headers) } else { None }
}
};
debug!(
method = request_like.method.as_str(),
scheme = request_like.uri.scheme_str(),
authority = request_like.uri.authority().map(Authority::as_str),
path = request_like.uri.path(),
key,
"response is stale: sending request upstream for revalidation"
);
if let Some(headers) = &mut headers
&& let Some(hook) = &self.hook
{
hook(&request_like, headers)?;
}
match request.send(headers).await {
Ok(response) if response.status().is_success() => {
debug!(
method = request_like.method.as_str(),
scheme = request_like.uri.scheme_str(),
authority = request_like.uri.authority().map(Authority::as_str),
path = request_like.uri.path(),
key,
"server responded with a new response"
);
let policy = CachePolicy::new_options(
&request_like,
&response,
SystemTime::now(),
self.options,
);
let (parts, body) = response.into_parts();
match self.storage.store(key.clone(), parts, body, policy).await {
Ok(mut response) => {
response.set_cache_status(CacheLookupStatus::Hit, CacheStatus::Miss, None);
Ok(response)
}
Err(e) => {
debug!(
method = request_like.method.as_str(),
scheme = request_like.uri.scheme_str(),
authority = request_like.uri.authority().map(Authority::as_str),
path = request_like.uri.path(),
key,
error = format!("{e:?}"),
"failed to put response into cache storage"
);
Err(e)
}
}
}
Ok(response) if response.status() == StatusCode::NOT_MODIFIED => {
debug!(
method = request_like.method.as_str(),
scheme = request_like.uri.scheme_str(),
authority = request_like.uri.authority().map(Authority::as_str),
path = request_like.uri.path(),
key,
"server responded with a not modified status"
);
match stored
.policy
.after_response(&request_like, &response, SystemTime::now())
{
AfterResponse::Modified(..) => {
debug!(
method = request_like.method.as_str(),
scheme = request_like.uri.scheme_str(),
authority = request_like.uri.authority().map(Authority::as_str),
path = request_like.uri.path(),
key,
"cached response was considered modified despite revalidation \
replying with not modified"
);
Self::prepare_stale_response(
&request_like.uri,
&mut stored.response,
&stored.digest,
);
Ok(stored.response)
}
AfterResponse::NotModified(policy, parts) => {
stored.response.extend_headers(parts.headers);
let (parts, body) = stored.response.into_parts();
match self
.storage
.put(&key, &parts, &policy, &stored.digest)
.await
{
Ok(_) => {
debug!(
method = request_like.method.as_str(),
scheme = request_like.uri.scheme_str(),
authority = request_like.uri.authority().map(Authority::as_str),
path = request_like.uri.path(),
key,
digest = stored.digest,
"response updated in cache successfully"
);
let mut cached_response = Response::from_parts(parts, body);
cached_response.set_cache_status(
CacheLookupStatus::Hit,
CacheStatus::Hit,
Some(&stored.digest),
);
Ok(cached_response)
}
Err(e) => {
debug!(
method = request_like.method.as_str(),
scheme = request_like.uri.scheme_str(),
authority = request_like.uri.authority().map(Authority::as_str),
path = request_like.uri.path(),
key,
error = format!("{e:?}"),
"failed to put response into cache storage"
);
Err(e)
}
}
}
}
}
Ok(response)
if response.status().is_server_error() && !stored.response.must_revalidate() =>
{
debug!(
method = request_like.method.as_str(),
scheme = request_like.uri.scheme_str(),
authority = request_like.uri.authority().map(Authority::as_str),
path = request_like.uri.path(),
key,
stored.digest,
"failed to revalidate response: serving potentially stale body from storage \
with a warning"
);
Self::prepare_stale_response(
&request_like.uri,
&mut stored.response,
&stored.digest,
);
Ok(stored.response)
}
Ok(mut response) => {
debug!(
method = request_like.method.as_str(),
scheme = request_like.uri.scheme_str(),
authority = request_like.uri.authority().map(Authority::as_str),
path = request_like.uri.path(),
key,
"failed to revalidate response: returning response from server uncached"
);
response.set_cache_status(CacheLookupStatus::Hit, CacheStatus::Miss, None);
Ok(response.map(CacheBody::from_upstream))
}
Err(e) => {
if stored.response.must_revalidate() {
Err(e)
} else {
debug!(
method = request_like.method.as_str(),
scheme = request_like.uri.scheme_str(),
authority = request_like.uri.authority().map(Authority::as_str),
path = request_like.uri.path(),
key,
stored.digest,
"failed to revalidate response: serving potentially stale body from \
storage with a warning"
);
Self::prepare_stale_response(
&request_like.uri,
&mut stored.response,
&stored.digest,
);
Ok(stored.response)
}
}
}
}
fn prepare_stale_response<B>(uri: &Uri, response: &mut Response<CacheBody<B>>, digest: &str) {
response.add_warning(uri, 111, "Revalidation failed");
response.set_cache_status(CacheLookupStatus::Hit, CacheStatus::Hit, Some(digest));
}
}