1use std::fmt;
4use std::time::SystemTime;
5
6use anyhow::Result;
7use http::HeaderMap;
8use http::HeaderValue;
9use http::Method;
10use http::Response;
11use http::StatusCode;
12use http::Uri;
13use http::Version;
14use http::header;
15use http::header::CACHE_CONTROL;
16use http::uri::Authority;
17use http_body::Body;
18use http_cache_semantics::AfterResponse;
19use http_cache_semantics::BeforeRequest;
20use http_cache_semantics::CacheOptions;
21use http_cache_semantics::CachePolicy;
22use sha2::Digest;
23use sha2::Sha256;
24use tracing::debug;
25
26use crate::body::CacheBody;
27use crate::storage::CacheStorage;
28use crate::storage::StoredResponse;
29
30pub const X_CACHE_LOOKUP: &str = "x-cache-lookup";
34
35pub const X_CACHE: &str = "x-cache";
39
40pub const X_CACHE_DIGEST: &str = "x-cache-digest";
51
52fn storage_key(method: &Method, uri: &Uri, headers: &HeaderMap) -> String {
54 let mut hasher = Sha256::new();
55 hasher.update(method.as_str());
56 hasher.update(":");
57
58 if let Some(scheme) = uri.scheme_str() {
59 hasher.update(scheme);
60 }
61
62 hasher.update("://");
63 if let Some(authority) = uri.authority() {
64 hasher.update(authority.as_str());
65 }
66
67 hasher.update(uri.path());
68
69 if let Some(query) = uri.query() {
70 hasher.update(query);
71 }
72
73 if let Some(value) = headers.get(header::RANGE) {
74 hasher.update(value.as_bytes());
75 }
76
77 let bytes = hasher.finalize();
78 hex::encode(bytes)
79}
80
81#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
85pub enum CacheLookupStatus {
86 Hit,
88 Miss,
90}
91
92impl fmt::Display for CacheLookupStatus {
93 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
94 match self {
95 Self::Hit => write!(f, "HIT"),
96 Self::Miss => write!(f, "MISS"),
97 }
98 }
99}
100
101#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
105pub enum CacheStatus {
106 Hit,
108 Miss,
110}
111
112impl fmt::Display for CacheStatus {
113 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
114 match self {
115 Self::Hit => write!(f, "HIT"),
116 Self::Miss => write!(f, "MISS"),
117 }
118 }
119}
120
121trait ResponseExt {
123 fn add_warning(&mut self, uri: &Uri, code: usize, message: &str);
125
126 fn must_revalidate(&self) -> bool;
129
130 fn extend_headers(&mut self, headers: HeaderMap);
134
135 fn set_cache_status(
137 &mut self,
138 lookup: CacheLookupStatus,
139 status: CacheStatus,
140 digest: Option<&str>,
141 );
142}
143
144impl<B> ResponseExt for Response<B> {
145 fn add_warning(&mut self, url: &Uri, code: usize, message: &str) {
146 self.headers_mut().insert(
156 "warning",
157 HeaderValue::from_str(&format!(
158 "{} {} {:?} \"{}\"",
159 code,
160 url.host().expect("URL should be valid"),
161 message,
162 httpdate::fmt_http_date(SystemTime::now())
163 ))
164 .expect("value should be valid"),
165 );
166 }
167
168 fn must_revalidate(&self) -> bool {
169 self.headers()
170 .get(CACHE_CONTROL.as_str())
171 .is_some_and(|val| {
172 val.to_str()
173 .unwrap_or("")
174 .to_lowercase()
175 .contains("must-revalidate")
176 })
177 }
178
179 fn extend_headers(&mut self, headers: HeaderMap) {
180 self.headers_mut().extend(headers);
181 }
182
183 fn set_cache_status(
184 &mut self,
185 lookup: CacheLookupStatus,
186 status: CacheStatus,
187 digest: Option<&str>,
188 ) {
189 let headers = self.headers_mut();
190 headers.insert(
191 X_CACHE_LOOKUP,
192 lookup.to_string().parse().expect("value should parse"),
193 );
194 headers.insert(
195 X_CACHE,
196 status.to_string().parse().expect("value should parse"),
197 );
198 if let Some(digest) = digest {
199 headers.insert(X_CACHE_DIGEST, digest.parse().expect("value should parse"));
200 }
201 }
202}
203
204pub trait Request<B: Body>: Send {
209 fn version(&self) -> Version;
211
212 fn method(&self) -> &Method;
214
215 fn uri(&self) -> &Uri;
217
218 fn headers(&self) -> &HeaderMap;
220
221 fn send(self, headers: Option<HeaderMap>) -> impl Future<Output = Result<Response<B>>> + Send;
226}
227
228struct RequestLike {
230 method: Method,
232 uri: Uri,
234 headers: HeaderMap,
236}
237
238impl RequestLike {
239 fn new<R: Request<B>, B: Body>(request: &R) -> Self {
241 Self {
243 method: request.method().clone(),
244 uri: request.uri().clone(),
245 headers: request.headers().clone(),
246 }
247 }
248}
249
250impl http_cache_semantics::RequestLike for RequestLike {
251 fn uri(&self) -> Uri {
252 self.uri.clone()
254 }
255
256 fn is_same_uri(&self, other: &Uri) -> bool {
257 self.uri.eq(other)
258 }
259
260 fn method(&self) -> &Method {
261 &self.method
262 }
263
264 fn headers(&self) -> &HeaderMap {
265 &self.headers
266 }
267}
268
269type RevalidationHook = dyn Fn(&dyn http_cache_semantics::RequestLike, &mut HeaderMap) -> Result<()>
280 + Send
281 + Sync
282 + 'static;
283
284pub struct Cache<S> {
286 storage: S,
288 options: CacheOptions,
290 hook: Option<Box<RevalidationHook>>,
294}
295
296impl<S> Cache<S>
297where
298 S: CacheStorage,
299{
300 pub fn new(storage: S) -> Self {
304 Self {
305 storage,
306 options: CacheOptions {
308 shared: false,
309 ..Default::default()
310 },
311 hook: None,
312 }
313 }
314
315 pub fn new_with_options(storage: S, options: CacheOptions) -> Self {
317 Self {
318 storage,
319 options,
320 hook: None,
321 }
322 }
323
324 pub fn with_revalidation_hook(
335 mut self,
336 hook: impl Fn(&dyn http_cache_semantics::RequestLike, &mut HeaderMap) -> Result<()>
337 + Send
338 + Sync
339 + 'static,
340 ) -> Self {
341 self.hook = Some(Box::new(hook));
342 self
343 }
344
345 pub fn storage(&self) -> &S {
347 &self.storage
348 }
349
350 pub async fn send<B: Body + Send>(
361 &self,
362 request: impl Request<B>,
363 ) -> Result<Response<CacheBody<B>>> {
364 let method = request.method();
365 let uri = request.uri();
366
367 let key = storage_key(method, uri, request.headers());
368 if matches!(*method, Method::GET | Method::HEAD) {
369 match self.storage.get(&key).await {
370 Ok(Some(stored)) => {
371 debug!(
372 method = method.as_str(),
373 scheme = uri.scheme_str(),
374 authority = uri.authority().map(Authority::as_str),
375 path = uri.path(),
376 key,
377 "cache hit"
378 );
379 return self.conditional_send_upstream(key, request, stored).await;
380 }
381 Ok(None) => {
382 debug!(
383 method = method.as_str(),
384 scheme = uri.scheme_str(),
385 authority = uri.authority().map(Authority::as_str),
386 path = uri.path(),
387 key,
388 "cache miss"
389 );
390 }
391 Err(e) => {
392 debug!(
393 method = method.as_str(),
394 scheme = uri.scheme_str(),
395 authority = uri.authority().map(Authority::as_str),
396 path = uri.path(),
397 key,
398 error = format!("{e:?}"),
399 "failed to get response from storage; treating as not cached"
400 );
401
402 }
404 }
405 }
406
407 self.send_upstream(key, request, CacheLookupStatus::Miss)
408 .await
409 }
410
411 async fn send_upstream<B: Body + Send>(
415 &self,
416 key: String,
417 request: impl Request<B>,
418 lookup_status: CacheLookupStatus,
419 ) -> Result<Response<CacheBody<B>>> {
420 let request_like: RequestLike = RequestLike::new(&request);
421
422 let mut response = request.send(None).await?;
423 let policy =
424 CachePolicy::new_options(&request_like, &response, SystemTime::now(), self.options);
425
426 response.set_cache_status(lookup_status, CacheStatus::Miss, None);
427
428 if matches!(request_like.method, Method::GET | Method::HEAD)
429 && response.status().is_success()
430 && policy.is_storable()
431 {
432 let (parts, body) = response.into_parts();
433 return match self.storage.store(key.clone(), parts, body, policy).await {
434 Ok(response) => Ok(response),
435 Err(e) => {
436 debug!(
437 method = request_like.method.as_str(),
438 scheme = request_like.uri.scheme_str(),
439 authority = request_like.uri.authority().map(Authority::as_str),
440 path = request_like.uri.path(),
441 key,
442 error = format!("{e:?}"),
443 "failed to store response"
444 );
445 Err(e)
446 }
447 };
448 }
449
450 debug!(
451 method = request_like.method.as_str(),
452 scheme = request_like.uri.scheme_str(),
453 authority = request_like.uri.authority().map(Authority::as_str),
454 path = request_like.uri.path(),
455 key,
456 status = response.status().as_u16(),
457 "response is not cacheable"
458 );
459
460 if !request_like.method.is_safe() {
461 for method in [Method::HEAD, Method::GET] {
464 let key = storage_key(&method, &request_like.uri, &request_like.headers);
465 if let Err(e) = self.storage.delete(&key).await {
466 debug!(
467 method = method.as_str(),
468 scheme = request_like.uri.scheme_str(),
469 authority = request_like.uri.authority().map(Authority::as_str),
470 path = request_like.uri.path(),
471 key,
472 error = format!("{e:?}"),
473 "failed to put response into storage"
474 );
475 }
476 }
477 }
478
479 Ok(response.map(CacheBody::from_upstream))
480 }
481
482 async fn conditional_send_upstream<B: Body + Send>(
488 &self,
489 key: String,
490 request: impl Request<B>,
491 mut stored: StoredResponse<B>,
492 ) -> Result<Response<CacheBody<B>>> {
493 let request_like = RequestLike::new(&request);
494
495 let mut headers = match stored
496 .policy
497 .before_request(&request_like, SystemTime::now())
498 {
499 BeforeRequest::Fresh(parts) => {
500 debug!(
502 method = request_like.method.as_str(),
503 scheme = request_like.uri.scheme_str(),
504 authority = request_like.uri.authority().map(Authority::as_str),
505 path = request_like.uri.path(),
506 key,
507 digest = stored.digest,
508 "response is still fresh: responding with body from storage"
509 );
510
511 stored.response.extend_headers(parts.headers);
512 stored.response.set_cache_status(
513 CacheLookupStatus::Hit,
514 CacheStatus::Hit,
515 Some(&stored.digest),
516 );
517 return Ok(stored.response);
518 }
519 BeforeRequest::Stale {
520 request: http::request::Parts { headers, .. },
521 matches,
522 } => {
523 if matches { Some(headers) } else { None }
525 }
526 };
527
528 debug!(
529 method = request_like.method.as_str(),
530 scheme = request_like.uri.scheme_str(),
531 authority = request_like.uri.authority().map(Authority::as_str),
532 path = request_like.uri.path(),
533 key,
534 "response is stale: sending request upstream for revalidation"
535 );
536
537 if let Some(headers) = &mut headers
539 && let Some(hook) = &self.hook
540 {
541 hook(&request_like, headers)?;
542 }
543
544 match request.send(headers).await {
546 Ok(response) if response.status().is_success() => {
547 debug!(
548 method = request_like.method.as_str(),
549 scheme = request_like.uri.scheme_str(),
550 authority = request_like.uri.authority().map(Authority::as_str),
551 path = request_like.uri.path(),
552 key,
553 "server responded with a new response"
554 );
555
556 let policy = CachePolicy::new_options(
558 &request_like,
559 &response,
560 SystemTime::now(),
561 self.options,
562 );
563
564 let (parts, body) = response.into_parts();
565 match self.storage.store(key.clone(), parts, body, policy).await {
566 Ok(mut response) => {
567 response.set_cache_status(CacheLookupStatus::Hit, CacheStatus::Miss, None);
568 Ok(response)
569 }
570 Err(e) => {
571 debug!(
572 method = request_like.method.as_str(),
573 scheme = request_like.uri.scheme_str(),
574 authority = request_like.uri.authority().map(Authority::as_str),
575 path = request_like.uri.path(),
576 key,
577 error = format!("{e:?}"),
578 "failed to put response into cache storage"
579 );
580 Err(e)
581 }
582 }
583 }
584 Ok(response) if response.status() == StatusCode::NOT_MODIFIED => {
585 debug!(
586 method = request_like.method.as_str(),
587 scheme = request_like.uri.scheme_str(),
588 authority = request_like.uri.authority().map(Authority::as_str),
589 path = request_like.uri.path(),
590 key,
591 "server responded with a not modified status"
592 );
593
594 match stored
597 .policy
598 .after_response(&request_like, &response, SystemTime::now())
599 {
600 AfterResponse::Modified(..) => {
601 debug!(
610 method = request_like.method.as_str(),
611 scheme = request_like.uri.scheme_str(),
612 authority = request_like.uri.authority().map(Authority::as_str),
613 path = request_like.uri.path(),
614 key,
615 "cached response was considered modified despite revalidation \
616 replying with not modified"
617 );
618
619 Self::prepare_stale_response(
620 &request_like.uri,
621 &mut stored.response,
622 &stored.digest,
623 );
624 Ok(stored.response)
625 }
626 AfterResponse::NotModified(policy, parts) => {
627 stored.response.extend_headers(parts.headers);
628
629 let (parts, body) = stored.response.into_parts();
630 match self
631 .storage
632 .put(&key, &parts, &policy, &stored.digest)
633 .await
634 {
635 Ok(_) => {
636 debug!(
637 method = request_like.method.as_str(),
638 scheme = request_like.uri.scheme_str(),
639 authority = request_like.uri.authority().map(Authority::as_str),
640 path = request_like.uri.path(),
641 key,
642 digest = stored.digest,
643 "response updated in cache successfully"
644 );
645
646 let mut cached_response = Response::from_parts(parts, body);
648 cached_response.set_cache_status(
649 CacheLookupStatus::Hit,
650 CacheStatus::Hit,
651 Some(&stored.digest),
652 );
653 Ok(cached_response)
654 }
655 Err(e) => {
656 debug!(
657 method = request_like.method.as_str(),
658 scheme = request_like.uri.scheme_str(),
659 authority = request_like.uri.authority().map(Authority::as_str),
660 path = request_like.uri.path(),
661 key,
662 error = format!("{e:?}"),
663 "failed to put response into cache storage"
664 );
665 Err(e)
666 }
667 }
668 }
669 }
670 }
671 Ok(response)
672 if response.status().is_server_error() && !stored.response.must_revalidate() =>
673 {
674 debug!(
675 method = request_like.method.as_str(),
676 scheme = request_like.uri.scheme_str(),
677 authority = request_like.uri.authority().map(Authority::as_str),
678 path = request_like.uri.path(),
679 key,
680 stored.digest,
681 "failed to revalidate response: serving potentially stale body from storage \
682 with a warning"
683 );
684
685 Self::prepare_stale_response(
686 &request_like.uri,
687 &mut stored.response,
688 &stored.digest,
689 );
690 Ok(stored.response)
691 }
692 Ok(mut response) => {
693 debug!(
694 method = request_like.method.as_str(),
695 scheme = request_like.uri.scheme_str(),
696 authority = request_like.uri.authority().map(Authority::as_str),
697 path = request_like.uri.path(),
698 key,
699 "failed to revalidate response: returning response from server uncached"
700 );
701
702 response.set_cache_status(CacheLookupStatus::Hit, CacheStatus::Miss, None);
704 Ok(response.map(CacheBody::from_upstream))
705 }
706 Err(e) => {
707 if stored.response.must_revalidate() {
708 Err(e)
709 } else {
710 debug!(
711 method = request_like.method.as_str(),
712 scheme = request_like.uri.scheme_str(),
713 authority = request_like.uri.authority().map(Authority::as_str),
714 path = request_like.uri.path(),
715 key,
716 stored.digest,
717 "failed to revalidate response: serving potentially stale body from \
718 storage with a warning"
719 );
720
721 Self::prepare_stale_response(
722 &request_like.uri,
723 &mut stored.response,
724 &stored.digest,
725 );
726 Ok(stored.response)
727 }
728 }
729 }
730 }
731
732 fn prepare_stale_response<B>(uri: &Uri, response: &mut Response<CacheBody<B>>, digest: &str) {
734 response.add_warning(uri, 111, "Revalidation failed");
742 response.set_cache_status(CacheLookupStatus::Hit, CacheStatus::Hit, Some(digest));
743 }
744}