1use std::fmt;
4use std::io;
5use std::pin::Pin;
6use std::task::Context;
7use std::task::Poll;
8use std::task::ready;
9use std::time::SystemTime;
10
11use anyhow::Result;
12use bytes::Bytes;
13use http::HeaderMap;
14use http::HeaderValue;
15use http::Method;
16use http::Response;
17use http::StatusCode;
18use http::Uri;
19use http::Version;
20use http::header;
21use http::header::CACHE_CONTROL;
22use http::uri::Authority;
23use http_cache_semantics::AfterResponse;
24use http_cache_semantics::BeforeRequest;
25use http_cache_semantics::CacheOptions;
26use http_cache_semantics::CachePolicy;
27use sha2::Digest;
28use sha2::Sha256;
29use tracing::debug;
30
31use crate::body::Body;
32use crate::storage::CacheStorage;
33use crate::storage::StoredResponse;
34
35pub const X_CACHE_LOOKUP: &str = "x-cache-lookup";
39
40pub const X_CACHE: &str = "x-cache";
44
45pub const X_CACHE_DIGEST: &str = "x-cache-digest";
56
57fn storage_key(method: &Method, uri: &Uri, headers: &HeaderMap) -> String {
59 let mut hasher = Sha256::new();
60 hasher.update(method.as_str());
61 hasher.update(":");
62
63 if let Some(scheme) = uri.scheme_str() {
64 hasher.update(scheme);
65 }
66
67 hasher.update("://");
68 if let Some(authority) = uri.authority() {
69 hasher.update(authority.as_str());
70 }
71
72 hasher.update(uri.path());
73
74 if let Some(query) = uri.query() {
75 hasher.update(query);
76 }
77
78 if let Some(value) = headers.get(header::RANGE) {
79 hasher.update(value.as_bytes());
80 }
81
82 let bytes = hasher.finalize();
83 hex::encode(bytes)
84}
85
86#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
90pub enum CacheLookupStatus {
91 Hit,
93 Miss,
95}
96
97impl fmt::Display for CacheLookupStatus {
98 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99 match self {
100 Self::Hit => write!(f, "HIT"),
101 Self::Miss => write!(f, "MISS"),
102 }
103 }
104}
105
106#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
110pub enum CacheStatus {
111 Hit,
113 Miss,
115}
116
117impl fmt::Display for CacheStatus {
118 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119 match self {
120 Self::Hit => write!(f, "HIT"),
121 Self::Miss => write!(f, "MISS"),
122 }
123 }
124}
125
126trait ResponseExt {
128 fn add_warning(&mut self, uri: &Uri, code: usize, message: &str);
130
131 fn must_revalidate(&self) -> bool;
134
135 fn extend_headers(&mut self, headers: HeaderMap);
139
140 fn set_cache_status(
142 &mut self,
143 lookup: CacheLookupStatus,
144 status: CacheStatus,
145 digest: Option<&str>,
146 );
147}
148
149impl<B> ResponseExt for Response<B> {
150 fn add_warning(&mut self, url: &Uri, code: usize, message: &str) {
151 self.headers_mut().insert(
161 "warning",
162 HeaderValue::from_str(&format!(
163 "{} {} {:?} \"{}\"",
164 code,
165 url.host().expect("URL should be valid"),
166 message,
167 httpdate::fmt_http_date(SystemTime::now())
168 ))
169 .expect("value should be valid"),
170 );
171 }
172
173 fn must_revalidate(&self) -> bool {
174 self.headers()
175 .get(CACHE_CONTROL.as_str())
176 .is_some_and(|val| {
177 val.to_str()
178 .unwrap_or("")
179 .to_lowercase()
180 .contains("must-revalidate")
181 })
182 }
183
184 fn extend_headers(&mut self, headers: HeaderMap) {
185 self.headers_mut().extend(headers);
186 }
187
188 fn set_cache_status(
189 &mut self,
190 lookup: CacheLookupStatus,
191 status: CacheStatus,
192 digest: Option<&str>,
193 ) {
194 let headers = self.headers_mut();
195 headers.insert(
196 X_CACHE_LOOKUP,
197 lookup.to_string().parse().expect("value should parse"),
198 );
199 headers.insert(
200 X_CACHE,
201 status.to_string().parse().expect("value should parse"),
202 );
203 if let Some(digest) = digest {
204 headers.insert(X_CACHE_DIGEST, digest.parse().expect("value should parse"));
205 }
206 }
207}
208
209pub trait HttpBody: http_body::Body<Data = Bytes, Error = io::Error> + Send {
211 fn poll_next_data(
215 self: Pin<&mut Self>,
216 cx: &mut Context<'_>,
217 ) -> Poll<Option<io::Result<Bytes>>> {
218 match ready!(self.poll_frame(cx)) {
219 Some(Ok(frame)) => match frame.into_data().ok() {
220 Some(data) => Poll::Ready(Some(Ok(data))),
221 None => Poll::Ready(None),
222 },
223 Some(Err(e)) => Poll::Ready(Some(Err(e))),
224 None => Poll::Ready(None),
225 }
226 }
227}
228
229pub trait Request<B: HttpBody>: Send {
234 fn version(&self) -> Version;
236
237 fn method(&self) -> &Method;
239
240 fn uri(&self) -> &Uri;
242
243 fn headers(&self) -> &HeaderMap;
245
246 fn send(self, headers: Option<HeaderMap>) -> impl Future<Output = Result<Response<B>>> + Send;
251}
252
253struct RequestLike {
255 method: Method,
257 uri: Uri,
259 headers: HeaderMap,
261}
262
263impl RequestLike {
264 fn new<R: Request<B>, B: HttpBody>(request: &R) -> Self {
266 Self {
268 method: request.method().clone(),
269 uri: request.uri().clone(),
270 headers: request.headers().clone(),
271 }
272 }
273}
274
275impl http_cache_semantics::RequestLike for RequestLike {
276 fn uri(&self) -> Uri {
277 self.uri.clone()
279 }
280
281 fn is_same_uri(&self, other: &Uri) -> bool {
282 self.uri.eq(other)
283 }
284
285 fn method(&self) -> &Method {
286 &self.method
287 }
288
289 fn headers(&self) -> &HeaderMap {
290 &self.headers
291 }
292}
293
294pub struct Cache<S> {
296 storage: S,
298 options: CacheOptions,
300}
301
302impl<S> Cache<S>
303where
304 S: CacheStorage,
305{
306 pub fn new(storage: S) -> Self {
310 Self {
311 storage,
312 options: CacheOptions {
314 shared: false,
315 ..Default::default()
316 },
317 }
318 }
319
320 pub fn new_with_options(storage: S, options: CacheOptions) -> Self {
322 Self { storage, options }
323 }
324
325 pub fn storage(&self) -> &S {
327 &self.storage
328 }
329
330 pub async fn send<B: HttpBody>(&self, request: impl Request<B>) -> Result<Response<Body<B>>> {
341 let method = request.method();
342 let uri = request.uri();
343
344 let key = storage_key(method, uri, request.headers());
345 if matches!(*method, Method::GET | Method::HEAD) {
346 match self.storage.get(&key).await {
347 Ok(Some(stored)) => {
348 debug!(
349 method = method.as_str(),
350 scheme = uri.scheme_str(),
351 authority = uri.authority().map(Authority::as_str),
352 path = uri.path(),
353 key,
354 "cache hit"
355 );
356 return self.conditional_send_upstream(key, request, stored).await;
357 }
358 Ok(None) => {
359 debug!(
360 method = method.as_str(),
361 scheme = uri.scheme_str(),
362 authority = uri.authority().map(Authority::as_str),
363 path = uri.path(),
364 key,
365 "cache miss"
366 );
367 }
368 Err(e) => {
369 debug!(
370 method = method.as_str(),
371 scheme = uri.scheme_str(),
372 authority = uri.authority().map(Authority::as_str),
373 path = uri.path(),
374 key,
375 error = format!("{e:?}"),
376 "failed to get response from storage; treating as not cached"
377 );
378
379 }
381 }
382 }
383
384 self.send_upstream(key, request, CacheLookupStatus::Miss)
385 .await
386 }
387
388 async fn send_upstream<B: HttpBody>(
392 &self,
393 key: String,
394 request: impl Request<B>,
395 lookup_status: CacheLookupStatus,
396 ) -> Result<Response<Body<B>>> {
397 let request_like: RequestLike = RequestLike::new(&request);
398
399 let mut response = request.send(None).await?;
400 let policy =
401 CachePolicy::new_options(&request_like, &response, SystemTime::now(), self.options);
402
403 response.set_cache_status(lookup_status, CacheStatus::Miss, None);
404
405 if matches!(request_like.method, Method::GET | Method::HEAD)
406 && response.status().is_success()
407 && policy.is_storable()
408 {
409 let (parts, body) = response.into_parts();
410 return match self.storage.store(key.clone(), parts, body, policy).await {
411 Ok(response) => Ok(response),
412 Err(e) => {
413 debug!(
414 method = request_like.method.as_str(),
415 scheme = request_like.uri.scheme_str(),
416 authority = request_like.uri.authority().map(Authority::as_str),
417 path = request_like.uri.path(),
418 key,
419 error = format!("{e:?}"),
420 "failed to store response"
421 );
422 Err(e)
423 }
424 };
425 }
426
427 debug!(
428 method = request_like.method.as_str(),
429 scheme = request_like.uri.scheme_str(),
430 authority = request_like.uri.authority().map(Authority::as_str),
431 path = request_like.uri.path(),
432 key,
433 status = response.status().as_u16(),
434 "response is not cacheable"
435 );
436
437 if !request_like.method.is_safe() {
438 for method in [Method::HEAD, Method::GET] {
441 let key = storage_key(&method, &request_like.uri, &request_like.headers);
442 if let Err(e) = self.storage.delete(&key).await {
443 debug!(
444 method = method.as_str(),
445 scheme = request_like.uri.scheme_str(),
446 authority = request_like.uri.authority().map(Authority::as_str),
447 path = request_like.uri.path(),
448 key,
449 error = format!("{e:?}"),
450 "failed to put response into storage"
451 );
452 }
453 }
454 }
455
456 Ok(response.map(Body::from_upstream))
457 }
458
459 async fn conditional_send_upstream<B: HttpBody>(
465 &self,
466 key: String,
467 request: impl Request<B>,
468 mut stored: StoredResponse<B>,
469 ) -> Result<Response<Body<B>>> {
470 let request_like = RequestLike::new(&request);
471
472 let headers = match stored
473 .policy
474 .before_request(&request_like, SystemTime::now())
475 {
476 BeforeRequest::Fresh(parts) => {
477 debug!(
479 method = request_like.method.as_str(),
480 scheme = request_like.uri.scheme_str(),
481 authority = request_like.uri.authority().map(Authority::as_str),
482 path = request_like.uri.path(),
483 key,
484 digest = stored.digest,
485 "response is still fresh: responding with body from storage"
486 );
487
488 stored.response.extend_headers(parts.headers);
489 stored.response.set_cache_status(
490 CacheLookupStatus::Hit,
491 CacheStatus::Hit,
492 Some(&stored.digest),
493 );
494 return Ok(stored.response);
495 }
496 BeforeRequest::Stale {
497 request: http::request::Parts { headers, .. },
498 matches,
499 } => {
500 if matches { Some(headers) } else { None }
502 }
503 };
504
505 debug!(
506 method = request_like.method.as_str(),
507 scheme = request_like.uri.scheme_str(),
508 authority = request_like.uri.authority().map(Authority::as_str),
509 path = request_like.uri.path(),
510 key,
511 "response is stale: sending request upstream for validation"
512 );
513
514 match request.send(headers).await {
516 Ok(response) if response.status().is_success() => {
517 debug!(
518 method = request_like.method.as_str(),
519 scheme = request_like.uri.scheme_str(),
520 authority = request_like.uri.authority().map(Authority::as_str),
521 path = request_like.uri.path(),
522 key,
523 "server responded with a new response"
524 );
525
526 let policy = CachePolicy::new_options(
528 &request_like,
529 &response,
530 SystemTime::now(),
531 self.options,
532 );
533
534 let (parts, body) = response.into_parts();
535 match self.storage.store(key.clone(), parts, body, policy).await {
536 Ok(mut response) => {
537 response.set_cache_status(CacheLookupStatus::Hit, CacheStatus::Miss, None);
538 Ok(response)
539 }
540 Err(e) => {
541 debug!(
542 method = request_like.method.as_str(),
543 scheme = request_like.uri.scheme_str(),
544 authority = request_like.uri.authority().map(Authority::as_str),
545 path = request_like.uri.path(),
546 key,
547 error = format!("{e:?}"),
548 "failed to put response into cache storage"
549 );
550 Err(e)
551 }
552 }
553 }
554 Ok(response) if response.status() == StatusCode::NOT_MODIFIED => {
555 debug!(
556 method = request_like.method.as_str(),
557 scheme = request_like.uri.scheme_str(),
558 authority = request_like.uri.authority().map(Authority::as_str),
559 path = request_like.uri.path(),
560 key,
561 "server responded with a not modified status"
562 );
563
564 match stored
567 .policy
568 .after_response(&request_like, &response, SystemTime::now())
569 {
570 AfterResponse::Modified(..) => {
571 debug!(
580 method = request_like.method.as_str(),
581 scheme = request_like.uri.scheme_str(),
582 authority = request_like.uri.authority().map(Authority::as_str),
583 path = request_like.uri.path(),
584 key,
585 "cached response was considered modified despite revalidation \
586 replying with not modified"
587 );
588
589 Self::prepare_stale_response(
590 &request_like.uri,
591 &mut stored.response,
592 &stored.digest,
593 );
594 Ok(stored.response)
595 }
596 AfterResponse::NotModified(policy, parts) => {
597 stored.response.extend_headers(parts.headers);
598
599 let (parts, body) = stored.response.into_parts();
600 match self
601 .storage
602 .put(&key, &parts, &policy, &stored.digest)
603 .await
604 {
605 Ok(_) => {
606 debug!(
607 method = request_like.method.as_str(),
608 scheme = request_like.uri.scheme_str(),
609 authority = request_like.uri.authority().map(Authority::as_str),
610 path = request_like.uri.path(),
611 key,
612 digest = stored.digest,
613 "response updated in cache successfully"
614 );
615
616 let mut cached_response = Response::from_parts(parts, body);
618 cached_response.set_cache_status(
619 CacheLookupStatus::Hit,
620 CacheStatus::Hit,
621 Some(&stored.digest),
622 );
623 Ok(cached_response)
624 }
625 Err(e) => {
626 debug!(
627 method = request_like.method.as_str(),
628 scheme = request_like.uri.scheme_str(),
629 authority = request_like.uri.authority().map(Authority::as_str),
630 path = request_like.uri.path(),
631 key,
632 error = format!("{e:?}"),
633 "failed to put response into cache storage"
634 );
635 Err(e)
636 }
637 }
638 }
639 }
640 }
641 Ok(response)
642 if response.status().is_server_error() && !stored.response.must_revalidate() =>
643 {
644 debug!(
645 method = request_like.method.as_str(),
646 scheme = request_like.uri.scheme_str(),
647 authority = request_like.uri.authority().map(Authority::as_str),
648 path = request_like.uri.path(),
649 key,
650 stored.digest,
651 "failed to revalidate response: serving potentially stale body from storage \
652 with a warning"
653 );
654
655 Self::prepare_stale_response(
656 &request_like.uri,
657 &mut stored.response,
658 &stored.digest,
659 );
660 Ok(stored.response)
661 }
662 Ok(mut response) => {
663 debug!(
664 method = request_like.method.as_str(),
665 scheme = request_like.uri.scheme_str(),
666 authority = request_like.uri.authority().map(Authority::as_str),
667 path = request_like.uri.path(),
668 key,
669 "failed to revalidate response: returning response from server uncached"
670 );
671
672 response.set_cache_status(CacheLookupStatus::Hit, CacheStatus::Miss, None);
674 Ok(response.map(Body::from_upstream))
675 }
676 Err(e) => {
677 if stored.response.must_revalidate() {
678 Err(e)
679 } else {
680 debug!(
681 method = request_like.method.as_str(),
682 scheme = request_like.uri.scheme_str(),
683 authority = request_like.uri.authority().map(Authority::as_str),
684 path = request_like.uri.path(),
685 key,
686 stored.digest,
687 "failed to revalidate response: serving potentially stale body from \
688 storage with a warning"
689 );
690
691 Self::prepare_stale_response(
692 &request_like.uri,
693 &mut stored.response,
694 &stored.digest,
695 );
696 Ok(stored.response)
697 }
698 }
699 }
700 }
701
702 fn prepare_stale_response<B>(uri: &Uri, response: &mut Response<Body<B>>, digest: &str) {
704 response.add_warning(uri, 111, "Revalidation failed");
712 response.set_cache_status(CacheLookupStatus::Hit, CacheStatus::Hit, Some(digest));
713 }
714}