1use std::fmt;
4use std::io;
5use std::time::SystemTime;
6
7use anyhow::Result;
8use anyhow::bail;
9use bytes::Bytes;
10use http::HeaderMap;
11use http::HeaderValue;
12use http::Method;
13use http::Response;
14use http::StatusCode;
15use http::Uri;
16use http::Version;
17use http::header::CACHE_CONTROL;
18use http::uri::Authority;
19use http_cache_semantics::AfterResponse;
20use http_cache_semantics::BeforeRequest;
21use http_cache_semantics::CacheOptions;
22use http_cache_semantics::CachePolicy;
23use sha2::Digest;
24use sha2::Sha256;
25use tracing::debug;
26
27use crate::body::Body;
28use crate::storage::CacheStorage;
29use crate::storage::StoredResponse;
30
31pub const X_CACHE_LOOKUP: &str = "x-cache-lookup";
35
36pub const X_CACHE: &str = "x-cache";
40
41pub const X_CACHE_DIGEST: &str = "x-cache-digest";
50
51fn storage_key(method: &Method, uri: &Uri) -> String {
53 let mut hasher = Sha256::new();
54 hasher.update(method.as_str());
55 hasher.update(":");
56
57 if let Some(scheme) = uri.scheme_str() {
58 hasher.update(scheme);
59 }
60
61 hasher.update("://");
62 if let Some(authority) = uri.authority() {
63 hasher.update(authority.as_str());
64 }
65
66 hasher.update(uri.path());
67
68 if let Some(query) = uri.query() {
69 hasher.update(query);
70 }
71
72 let bytes = hasher.finalize();
73 hex::encode(bytes)
74}
75
76#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
80pub enum CacheLookupStatus {
81 Hit,
83 Miss,
85}
86
87impl fmt::Display for CacheLookupStatus {
88 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89 match self {
90 Self::Hit => write!(f, "HIT"),
91 Self::Miss => write!(f, "MISS"),
92 }
93 }
94}
95
96#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
100pub enum CacheStatus {
101 Hit,
103 Miss,
105}
106
107impl fmt::Display for CacheStatus {
108 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
109 match self {
110 Self::Hit => write!(f, "HIT"),
111 Self::Miss => write!(f, "MISS"),
112 }
113 }
114}
115
116trait ResponseExt {
118 fn add_warning(&mut self, uri: &Uri, code: usize, message: &str);
120
121 fn must_revalidate(&self) -> bool;
124
125 fn extend_headers(&mut self, headers: HeaderMap);
129
130 fn set_cache_status(&mut self, lookup: CacheLookupStatus, status: CacheStatus);
132
133 fn set_cache_digest(&mut self, digest: &str);
135}
136
137impl<B> ResponseExt for Response<B> {
138 fn add_warning(&mut self, url: &Uri, code: usize, message: &str) {
139 self.headers_mut().insert(
149 "warning",
150 HeaderValue::from_str(&format!(
151 "{} {} {:?} \"{}\"",
152 code,
153 url.host().expect("URL should be valid"),
154 message,
155 httpdate::fmt_http_date(SystemTime::now())
156 ))
157 .expect("value should be valid"),
158 );
159 }
160
161 fn must_revalidate(&self) -> bool {
162 self.headers()
163 .get(CACHE_CONTROL.as_str())
164 .is_some_and(|val| {
165 val.to_str()
166 .unwrap_or("")
167 .to_lowercase()
168 .contains("must-revalidate")
169 })
170 }
171
172 fn extend_headers(&mut self, headers: HeaderMap) {
173 self.headers_mut().extend(headers);
174 }
175
176 fn set_cache_status(&mut self, lookup: CacheLookupStatus, status: CacheStatus) {
177 self.headers_mut().insert(
178 X_CACHE_LOOKUP,
179 lookup.to_string().parse().expect("value should parse"),
180 );
181 self.headers_mut().insert(
182 X_CACHE,
183 status.to_string().parse().expect("value should parse"),
184 );
185 }
186
187 fn set_cache_digest(&mut self, digest: &str) {
188 self.headers_mut()
189 .insert(X_CACHE_DIGEST, digest.parse().expect("value should parse"));
190 }
191}
192
193pub trait HttpBody: http_body::Body<Data = Bytes, Error = io::Error> + Send {}
195
196pub trait Request<B: HttpBody>: Send {
201 fn version(&self) -> Version;
203
204 fn method(&self) -> &Method;
206
207 fn uri(&self) -> &Uri;
209
210 fn headers(&self) -> &HeaderMap;
212
213 fn send(self, headers: Option<HeaderMap>) -> impl Future<Output = Result<Response<B>>> + Send;
218}
219
220struct RequestLike {
222 method: Method,
224 uri: Uri,
226 headers: HeaderMap,
228}
229
230impl RequestLike {
231 fn new<R: Request<B>, B: HttpBody>(request: &R) -> Self {
233 Self {
235 method: request.method().clone(),
236 uri: request.uri().clone(),
237 headers: request.headers().clone(),
238 }
239 }
240}
241
242impl http_cache_semantics::RequestLike for RequestLike {
243 fn uri(&self) -> Uri {
244 self.uri.clone()
246 }
247
248 fn is_same_uri(&self, other: &Uri) -> bool {
249 self.uri.eq(other)
250 }
251
252 fn method(&self) -> &Method {
253 &self.method
254 }
255
256 fn headers(&self) -> &HeaderMap {
257 &self.headers
258 }
259}
260
261pub struct Cache<S> {
263 storage: S,
265 options: CacheOptions,
267}
268
269impl<S> Cache<S>
270where
271 S: CacheStorage,
272{
273 pub fn new(storage: S) -> Self {
277 Self {
278 storage,
279 options: CacheOptions {
281 shared: false,
282 ..Default::default()
283 },
284 }
285 }
286
287 pub fn new_with_options(storage: S, options: CacheOptions) -> Self {
289 Self { storage, options }
290 }
291
292 pub fn storage(&self) -> &S {
294 &self.storage
295 }
296
297 pub async fn send<B: HttpBody>(&self, request: impl Request<B>) -> Result<Response<Body<B>>> {
308 let method = request.method();
309 let uri = request.uri();
310
311 let key = storage_key(method, uri);
312 if matches!(*method, Method::GET | Method::HEAD) {
313 match self.storage.get(&key).await {
314 Ok(Some(stored)) => {
315 debug!(
316 method = method.as_str(),
317 scheme = uri.scheme_str(),
318 authority = uri.authority().map(Authority::as_str),
319 path = uri.path(),
320 key,
321 "cache hit"
322 );
323 return self.conditional_send_upstream(key, request, stored).await;
324 }
325 Ok(None) => {
326 debug!(
327 method = method.as_str(),
328 scheme = uri.scheme_str(),
329 authority = uri.authority().map(Authority::as_str),
330 path = uri.path(),
331 key,
332 "cache miss"
333 );
334 }
335 Err(e) => {
336 debug!(
337 method = method.as_str(),
338 scheme = uri.scheme_str(),
339 authority = uri.authority().map(Authority::as_str),
340 path = uri.path(),
341 key,
342 error = format!("{e:?}"),
343 "failed to get response from storage; treating as not cached"
344 );
345
346 }
348 }
349 }
350
351 self.send_upstream(key, request, CacheLookupStatus::Miss)
352 .await
353 }
354
355 async fn send_upstream<B: HttpBody>(
359 &self,
360 key: String,
361 request: impl Request<B>,
362 lookup_status: CacheLookupStatus,
363 ) -> Result<Response<Body<B>>> {
364 let request_like: RequestLike = RequestLike::new(&request);
365
366 let mut response = request.send(None).await?;
367 let policy =
368 CachePolicy::new_options(&request_like, &response, SystemTime::now(), self.options);
369
370 response.set_cache_status(lookup_status, CacheStatus::Miss);
371
372 if matches!(request_like.method, Method::GET | Method::HEAD)
373 && response.status() == StatusCode::OK
374 && policy.is_storable()
375 {
376 let (parts, body) = response.into_parts();
377 return match self
378 .storage
379 .put_with_body(&key, &parts, &policy, body)
380 .await
381 {
382 Ok((body, digest)) => {
383 debug!(
384 method = request_like.method.as_str(),
385 scheme = request_like.uri.scheme_str(),
386 authority = request_like.uri.authority().map(Authority::as_str),
387 path = request_like.uri.path(),
388 key,
389 digest,
390 "cache storage updated successfully"
391 );
392
393 let mut response = Response::from_parts(parts, body);
394 response.set_cache_digest(&digest);
395 Ok(response)
396 }
397 Err(e) => {
398 debug!(
399 method = request_like.method.as_str(),
400 scheme = request_like.uri.scheme_str(),
401 authority = request_like.uri.authority().map(Authority::as_str),
402 path = request_like.uri.path(),
403 key,
404 error = format!("{e:?}"),
405 "failed to put response into storage"
406 );
407 Err(e)
408 }
409 };
410 }
411
412 debug!(
413 method = request_like.method.as_str(),
414 scheme = request_like.uri.scheme_str(),
415 authority = request_like.uri.authority().map(Authority::as_str),
416 path = request_like.uri.path(),
417 key,
418 status = response.status().as_u16(),
419 "response is not cacheable"
420 );
421
422 if !request_like.method.is_safe() {
423 for method in [Method::HEAD, Method::GET] {
426 let key = storage_key(&method, &request_like.uri);
427 if let Err(e) = self.storage.delete(&key).await {
428 debug!(
429 method = method.as_str(),
430 scheme = request_like.uri.scheme_str(),
431 authority = request_like.uri.authority().map(Authority::as_str),
432 path = request_like.uri.path(),
433 key,
434 error = format!("{e:?}"),
435 "failed to put response into storage"
436 );
437 }
438 }
439 }
440
441 Ok(response.map(|body| Body::from_upstream(body)))
442 }
443
444 async fn conditional_send_upstream<B: HttpBody>(
450 &self,
451 key: String,
452 request: impl Request<B>,
453 mut stored: StoredResponse<B>,
454 ) -> Result<Response<Body<B>>> {
455 let request_like = RequestLike::new(&request);
456
457 let headers = match stored
458 .policy
459 .before_request(&request_like, SystemTime::now())
460 {
461 BeforeRequest::Fresh(parts) => {
462 debug!(
464 method = request_like.method.as_str(),
465 scheme = request_like.uri.scheme_str(),
466 authority = request_like.uri.authority().map(Authority::as_str),
467 path = request_like.uri.path(),
468 key,
469 digest = stored.digest,
470 "response is still fresh: responding with body from storage"
471 );
472
473 stored.response.extend_headers(parts.headers);
474 stored
475 .response
476 .set_cache_status(CacheLookupStatus::Hit, CacheStatus::Hit);
477 stored.response.set_cache_digest(&stored.digest);
478 return Ok(stored.response);
479 }
480 BeforeRequest::Stale {
481 request: http::request::Parts { headers, .. },
482 matches,
483 } => {
484 if matches { Some(headers) } else { None }
486 }
487 };
488
489 debug!(
490 method = request_like.method.as_str(),
491 scheme = request_like.uri.scheme_str(),
492 authority = request_like.uri.authority().map(Authority::as_str),
493 path = request_like.uri.path(),
494 key,
495 "response is stale: sending request upstream for validation"
496 );
497
498 match request.send(headers).await {
500 Ok(response) if response.status() == StatusCode::OK => {
501 debug!(
502 method = request_like.method.as_str(),
503 scheme = request_like.uri.scheme_str(),
504 authority = request_like.uri.authority().map(Authority::as_str),
505 path = request_like.uri.path(),
506 key,
507 "server responded with a new response"
508 );
509
510 let policy = CachePolicy::new_options(
512 &request_like,
513 &response,
514 SystemTime::now(),
515 self.options,
516 );
517
518 let (parts, body) = response.into_parts();
519 match self
520 .storage
521 .put_with_body(&key, &parts, &policy, body)
522 .await
523 {
524 Ok((body, digest)) => {
525 debug!(
526 method = request_like.method.as_str(),
527 scheme = request_like.uri.scheme_str(),
528 authority = request_like.uri.authority().map(Authority::as_str),
529 path = request_like.uri.path(),
530 key,
531 digest,
532 "cache storage updated successfully"
533 );
534
535 let mut response = Response::from_parts(parts, body);
537 response.set_cache_status(CacheLookupStatus::Hit, CacheStatus::Miss);
538 response.set_cache_digest(&digest);
539 Ok(response)
540 }
541 Err(e) => {
542 debug!(
543 method = request_like.method.as_str(),
544 scheme = request_like.uri.scheme_str(),
545 authority = request_like.uri.authority().map(Authority::as_str),
546 path = request_like.uri.path(),
547 key,
548 error = format!("{e:?}"),
549 "failed to put response into cache storage"
550 );
551 Err(e)
552 }
553 }
554 }
555 Ok(response) if response.status() == StatusCode::NOT_MODIFIED => {
556 debug!(
557 method = request_like.method.as_str(),
558 scheme = request_like.uri.scheme_str(),
559 authority = request_like.uri.authority().map(Authority::as_str),
560 path = request_like.uri.path(),
561 key,
562 "server responded with a not modified status"
563 );
564
565 match stored
568 .policy
569 .after_response(&request_like, &response, SystemTime::now())
570 {
571 AfterResponse::Modified(..) => {
572 debug!(
573 method = request_like.method.as_str(),
574 scheme = request_like.uri.scheme_str(),
575 authority = request_like.uri.authority().map(Authority::as_str),
576 path = request_like.uri.path(),
577 key,
578 "cached response was considered modified despite revalidation"
579 );
580
581 bail!("cached response was considered modified despite revalidation");
584 }
585 AfterResponse::NotModified(policy, parts) => {
586 stored.response.extend_headers(parts.headers);
587
588 let (parts, body) = stored.response.into_parts();
589 match self
590 .storage
591 .put(&key, &parts, &policy, &stored.digest)
592 .await
593 {
594 Ok(_) => {
595 debug!(
596 method = request_like.method.as_str(),
597 scheme = request_like.uri.scheme_str(),
598 authority = request_like.uri.authority().map(Authority::as_str),
599 path = request_like.uri.path(),
600 key,
601 digest = stored.digest,
602 "cache storage updated successfully"
603 );
604
605 let mut cached_response = Response::from_parts(parts, body);
607 cached_response
608 .set_cache_status(CacheLookupStatus::Hit, CacheStatus::Hit);
609 cached_response.set_cache_digest(&stored.digest);
610 Ok(cached_response)
611 }
612 Err(e) => {
613 debug!(
614 method = request_like.method.as_str(),
615 scheme = request_like.uri.scheme_str(),
616 authority = request_like.uri.authority().map(Authority::as_str),
617 path = request_like.uri.path(),
618 key,
619 error = format!("{e:?}"),
620 "failed to put response into cache storage"
621 );
622 Err(e)
623 }
624 }
625 }
626 }
627 }
628 Ok(response)
629 if response.status().is_server_error() && !stored.response.must_revalidate() =>
630 {
631 Self::prepare_stale_response(
632 &request_like.method,
633 &request_like.uri,
634 &key,
635 &stored.digest,
636 &mut stored.response,
637 );
638 Ok(stored.response)
639 }
640 Ok(mut response) => {
641 debug!(
642 method = request_like.method.as_str(),
643 scheme = request_like.uri.scheme_str(),
644 authority = request_like.uri.authority().map(Authority::as_str),
645 path = request_like.uri.path(),
646 key,
647 "failed to revalidate response: returning response from server uncached"
648 );
649
650 response.set_cache_status(CacheLookupStatus::Hit, CacheStatus::Miss);
652 Ok(response.map(|b| Body::from_upstream(b)))
653 }
654 Err(e) => {
655 if stored.response.must_revalidate() {
656 Err(e)
657 } else {
658 Self::prepare_stale_response(
659 &request_like.method,
660 &request_like.uri,
661 &key,
662 &stored.digest,
663 &mut stored.response,
664 );
665 Ok(stored.response)
666 }
667 }
668 }
669 }
670
671 fn prepare_stale_response<B>(
673 method: &Method,
674 uri: &Uri,
675 key: &str,
676 digest: &str,
677 response: &mut Response<Body<B>>,
678 ) {
679 debug!(
680 method = method.as_str(),
681 scheme = uri.scheme_str(),
682 authority = uri.authority().map(Authority::as_str),
683 path = uri.path(),
684 key,
685 digest,
686 "failed to revalidate response: serving potentially stale body from storage with a \
687 warning"
688 );
689
690 response.add_warning(uri, 111, "Revalidation failed");
698 response.set_cache_status(CacheLookupStatus::Hit, CacheStatus::Hit);
699 response.set_cache_digest(digest);
700 }
701}