http_cache_stream/
cache.rs

1//! Implementation of the HTTP cache.
2
3use std::fmt;
4use std::time::SystemTime;
5
6use anyhow::Result;
7use http::HeaderMap;
8use http::HeaderValue;
9use http::Method;
10use http::Response;
11use http::StatusCode;
12use http::Uri;
13use http::Version;
14use http::header;
15use http::header::CACHE_CONTROL;
16use http::uri::Authority;
17use http_body::Body;
18use http_cache_semantics::AfterResponse;
19use http_cache_semantics::BeforeRequest;
20use http_cache_semantics::CacheOptions;
21use http_cache_semantics::CachePolicy;
22use sha2::Digest;
23use sha2::Sha256;
24use tracing::debug;
25
26use crate::body::CacheBody;
27use crate::storage::CacheStorage;
28use crate::storage::StoredResponse;
29
30/// The name of the `x-cache-lookup` custom header.
31///
32/// Value will be `HIT` if a response existed in cache, `MISS` if not.
33pub const X_CACHE_LOOKUP: &str = "x-cache-lookup";
34
35/// The name of the `x-cache` custom header.
36///
37/// Value will be `HIT` if a response was served from the cache, `MISS` if not.
38pub const X_CACHE: &str = "x-cache";
39
40/// The name of the `x-cache-digest` custom header.
41///
42/// This header is only present in the response when returning a body from the
43/// cache.
44///
45/// This can be used to read a body directly from cache storage rather than
46/// reading the body through the response.
47///
48/// This header is only present when a cached response body is being served from
49/// the cache.
50pub const X_CACHE_DIGEST: &str = "x-cache-digest";
51
52/// Gets the storage key for a request.
53fn storage_key(method: &Method, uri: &Uri, headers: &HeaderMap) -> String {
54    let mut hasher = Sha256::new();
55    hasher.update(method.as_str());
56    hasher.update(":");
57
58    if let Some(scheme) = uri.scheme_str() {
59        hasher.update(scheme);
60    }
61
62    hasher.update("://");
63    if let Some(authority) = uri.authority() {
64        hasher.update(authority.as_str());
65    }
66
67    hasher.update(uri.path());
68
69    if let Some(query) = uri.query() {
70        hasher.update(query);
71    }
72
73    if let Some(value) = headers.get(header::RANGE) {
74        hasher.update(value.as_bytes());
75    }
76
77    let bytes = hasher.finalize();
78    hex::encode(bytes)
79}
80
81/// Represents a basic cache lookup status.
82///
83/// Used in the custom header `x-cache-lookup`.
84#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
85pub enum CacheLookupStatus {
86    /// A response exists in the cache.
87    Hit,
88    /// A response does not exist in the cache.
89    Miss,
90}
91
92impl fmt::Display for CacheLookupStatus {
93    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
94        match self {
95            Self::Hit => write!(f, "HIT"),
96            Self::Miss => write!(f, "MISS"),
97        }
98    }
99}
100
101/// Represents a cache status.
102///
103/// Used in the custom header `x-cache`.
104#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
105pub enum CacheStatus {
106    /// The response was served from the cache.
107    Hit,
108    /// The response was not served from the cache.
109    Miss,
110}
111
112impl fmt::Display for CacheStatus {
113    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
114        match self {
115            Self::Hit => write!(f, "HIT"),
116            Self::Miss => write!(f, "MISS"),
117        }
118    }
119}
120
121/// An extension trait for [`Response`].
122trait ResponseExt {
123    /// Adds a warning header to the response.
124    fn add_warning(&mut self, uri: &Uri, code: usize, message: &str);
125
126    /// Checks if the Cache-Control header contains the must-revalidate
127    /// directive.
128    fn must_revalidate(&self) -> bool;
129
130    /// Extends the request's headers with those from the given header map.
131    ///
132    /// Existing matching headers will be replaced.
133    fn extend_headers(&mut self, headers: HeaderMap);
134
135    /// Sets the cache status headers of the response.
136    fn set_cache_status(
137        &mut self,
138        lookup: CacheLookupStatus,
139        status: CacheStatus,
140        digest: Option<&str>,
141    );
142}
143
144impl<B> ResponseExt for Response<B> {
145    fn add_warning(&mut self, url: &Uri, code: usize, message: &str) {
146        // warning    = "warning" ":" 1#warning-value
147        // warning-value = warn-code SP warn-agent SP warn-text [SP warn-date]
148        // warn-code  = 3DIGIT
149        // warn-agent = ( host [ ":" port ] ) | pseudonym
150        //                 ; the name or pseudonym of the server adding
151        //                 ; the warning header, for use in debugging
152        // warn-text  = quoted-string
153        // warn-date  = <"> HTTP-date <">
154        // (https://tools.ietf.org/html/rfc2616#section-14.46)
155        self.headers_mut().insert(
156            "warning",
157            HeaderValue::from_str(&format!(
158                "{} {} {:?} \"{}\"",
159                code,
160                url.host().expect("URL should be valid"),
161                message,
162                httpdate::fmt_http_date(SystemTime::now())
163            ))
164            .expect("value should be valid"),
165        );
166    }
167
168    fn must_revalidate(&self) -> bool {
169        self.headers()
170            .get(CACHE_CONTROL.as_str())
171            .is_some_and(|val| {
172                val.to_str()
173                    .unwrap_or("")
174                    .to_lowercase()
175                    .contains("must-revalidate")
176            })
177    }
178
179    fn extend_headers(&mut self, headers: HeaderMap) {
180        self.headers_mut().extend(headers);
181    }
182
183    fn set_cache_status(
184        &mut self,
185        lookup: CacheLookupStatus,
186        status: CacheStatus,
187        digest: Option<&str>,
188    ) {
189        let headers = self.headers_mut();
190        headers.insert(
191            X_CACHE_LOOKUP,
192            lookup.to_string().parse().expect("value should parse"),
193        );
194        headers.insert(
195            X_CACHE,
196            status.to_string().parse().expect("value should parse"),
197        );
198        if let Some(digest) = digest {
199            headers.insert(X_CACHE_DIGEST, digest.parse().expect("value should parse"));
200        }
201    }
202}
203
204/// An abstraction of an HTTP request.
205///
206/// This trait is used in HTTP middleware integrations to abstract the request
207/// type and sending the request upstream.
208pub trait Request<B: Body>: Send {
209    /// Gets the request's version.
210    fn version(&self) -> Version;
211
212    /// Gets the request's method.
213    fn method(&self) -> &Method;
214
215    /// Gets the request's URI.
216    fn uri(&self) -> &Uri;
217
218    /// Gets the request's headers.
219    fn headers(&self) -> &HeaderMap;
220
221    /// Sends the request to upstream and gets the response.
222    ///
223    /// If `headers` is `Some`, the supplied headers should override any
224    /// matching headers in the original request.
225    fn send(self, headers: Option<HeaderMap>) -> impl Future<Output = Result<Response<B>>> + Send;
226}
227
228/// Provides an implementation of `RequestLike` for `http-cache-semantics`.
229struct RequestLike {
230    /// The request method.
231    method: Method,
232    /// The request URI.
233    uri: Uri,
234    /// The request headers.
235    headers: HeaderMap,
236}
237
238impl RequestLike {
239    /// Constructs a new `RequestLike` for the given request.
240    fn new<R: Request<B>, B: Body>(request: &R) -> Self {
241        // Unfortunate we have to clone the header map here
242        Self {
243            method: request.method().clone(),
244            uri: request.uri().clone(),
245            headers: request.headers().clone(),
246        }
247    }
248}
249
250impl http_cache_semantics::RequestLike for RequestLike {
251    fn uri(&self) -> Uri {
252        // Note: URI is cheaply cloned
253        self.uri.clone()
254    }
255
256    fn is_same_uri(&self, other: &Uri) -> bool {
257        self.uri.eq(other)
258    }
259
260    fn method(&self) -> &Method {
261        &self.method
262    }
263
264    fn headers(&self) -> &HeaderMap {
265        &self.headers
266    }
267}
268
269/// Represents a revalidation hook.
270///
271/// The hook is provided the original request and a mutable header map
272/// containing headers explicitly set for the revalidation request.
273///
274/// For example, a hook may alter the revalidation headers to update an
275/// `Authorization` header based on the headers used for revalidation.
276///
277/// If the hook returns an error, the error is propagated out as the result of
278/// the original request.
279type RevalidationHook = dyn Fn(&dyn http_cache_semantics::RequestLike, &mut HeaderMap) -> Result<()>
280    + Send
281    + Sync
282    + 'static;
283
284/// Implement a HTTP cache.
285pub struct Cache<S> {
286    /// The cache storage.
287    storage: S,
288    /// The cache options to use.
289    options: CacheOptions,
290    /// Stores the revalidation hook.
291    ///
292    /// This is `None` if no revalidation hook is used.
293    hook: Option<Box<RevalidationHook>>,
294}
295
296impl<S> Cache<S>
297where
298    S: CacheStorage,
299{
300    /// Construct a new cache with the given storage.
301    ///
302    /// Defaults to a private cache.
303    pub fn new(storage: S) -> Self {
304        Self {
305            storage,
306            // Default to a private cache
307            options: CacheOptions {
308                shared: false,
309                ..Default::default()
310            },
311            hook: None,
312        }
313    }
314
315    /// Construct a new cache with the given storage and options.
316    pub fn new_with_options(storage: S, options: CacheOptions) -> Self {
317        Self {
318            storage,
319            options,
320            hook: None,
321        }
322    }
323
324    /// Sets the revalidation hook to use.
325    ///
326    /// The hook is provided the original request and a mutable header map
327    /// containing headers explicitly set for the revalidation request.
328    ///
329    /// For example, a hook may alter the revalidation headers to update an
330    /// `Authorization` header based on the headers used for revalidation.
331    ///
332    /// If the hook returns an error, the error is propagated out as the result
333    /// of the original request.
334    pub fn with_revalidation_hook(
335        mut self,
336        hook: impl Fn(&dyn http_cache_semantics::RequestLike, &mut HeaderMap) -> Result<()>
337        + Send
338        + Sync
339        + 'static,
340    ) -> Self {
341        self.hook = Some(Box::new(hook));
342        self
343    }
344
345    /// Gets the storage used by the cache.
346    pub fn storage(&self) -> &S {
347        &self.storage
348    }
349
350    /// Sends a HTTP request through the cache.
351    ///
352    /// If a previous response is cached and not stale, the request is not sent
353    /// upstream and the cached response is returned.
354    ///
355    /// If a previous response is cached and is stale, the response is
356    /// revalidated, the cache is updated, and the cached response returned.
357    ///
358    /// If a previous response is not in the cache, the request is sent upstream
359    /// and the response is cached, if it is cacheable.
360    pub async fn send<B: Body + Send>(
361        &self,
362        request: impl Request<B>,
363    ) -> Result<Response<CacheBody<B>>> {
364        let method = request.method();
365        let uri = request.uri();
366
367        let key = storage_key(method, uri, request.headers());
368        if matches!(*method, Method::GET | Method::HEAD) {
369            match self.storage.get(&key).await {
370                Ok(Some(stored)) => {
371                    debug!(
372                        method = method.as_str(),
373                        scheme = uri.scheme_str(),
374                        authority = uri.authority().map(Authority::as_str),
375                        path = uri.path(),
376                        key,
377                        "cache hit"
378                    );
379                    return self.conditional_send_upstream(key, request, stored).await;
380                }
381                Ok(None) => {
382                    debug!(
383                        method = method.as_str(),
384                        scheme = uri.scheme_str(),
385                        authority = uri.authority().map(Authority::as_str),
386                        path = uri.path(),
387                        key,
388                        "cache miss"
389                    );
390                }
391                Err(e) => {
392                    debug!(
393                        method = method.as_str(),
394                        scheme = uri.scheme_str(),
395                        authority = uri.authority().map(Authority::as_str),
396                        path = uri.path(),
397                        key,
398                        error = format!("{e:?}"),
399                        "failed to get response from storage; treating as not cached"
400                    );
401
402                    // Treat as a miss
403                }
404            }
405        }
406
407        self.send_upstream(key, request, CacheLookupStatus::Miss)
408            .await
409    }
410
411    /// Sends the original request upstream.
412    ///
413    /// Caches the response if the response is cacheable.
414    async fn send_upstream<B: Body + Send>(
415        &self,
416        key: String,
417        request: impl Request<B>,
418        lookup_status: CacheLookupStatus,
419    ) -> Result<Response<CacheBody<B>>> {
420        let request_like: RequestLike = RequestLike::new(&request);
421
422        let mut response = request.send(None).await?;
423        let policy =
424            CachePolicy::new_options(&request_like, &response, SystemTime::now(), self.options);
425
426        response.set_cache_status(lookup_status, CacheStatus::Miss, None);
427
428        if matches!(request_like.method, Method::GET | Method::HEAD)
429            && response.status().is_success()
430            && policy.is_storable()
431        {
432            let (parts, body) = response.into_parts();
433            return match self.storage.store(key.clone(), parts, body, policy).await {
434                Ok(response) => Ok(response),
435                Err(e) => {
436                    debug!(
437                        method = request_like.method.as_str(),
438                        scheme = request_like.uri.scheme_str(),
439                        authority = request_like.uri.authority().map(Authority::as_str),
440                        path = request_like.uri.path(),
441                        key,
442                        error = format!("{e:?}"),
443                        "failed to store response"
444                    );
445                    Err(e)
446                }
447            };
448        }
449
450        debug!(
451            method = request_like.method.as_str(),
452            scheme = request_like.uri.scheme_str(),
453            authority = request_like.uri.authority().map(Authority::as_str),
454            path = request_like.uri.path(),
455            key,
456            status = response.status().as_u16(),
457            "response is not cacheable"
458        );
459
460        if !request_like.method.is_safe() {
461            // If the request is not safe, assume the resource has been modified and delete
462            // any cached responses we may have for HEAD/GET
463            for method in [Method::HEAD, Method::GET] {
464                let key = storage_key(&method, &request_like.uri, &request_like.headers);
465                if let Err(e) = self.storage.delete(&key).await {
466                    debug!(
467                        method = method.as_str(),
468                        scheme = request_like.uri.scheme_str(),
469                        authority = request_like.uri.authority().map(Authority::as_str),
470                        path = request_like.uri.path(),
471                        key,
472                        error = format!("{e:?}"),
473                        "failed to put response into storage"
474                    );
475                }
476            }
477        }
478
479        Ok(response.map(CacheBody::from_upstream))
480    }
481
482    /// Performs a conditional send to upstream.
483    ///
484    /// If a cached request is still fresh, it is returned.
485    ///
486    /// If a cached request is stale, an attempt is made to revalidate it.
487    async fn conditional_send_upstream<B: Body + Send>(
488        &self,
489        key: String,
490        request: impl Request<B>,
491        mut stored: StoredResponse<B>,
492    ) -> Result<Response<CacheBody<B>>> {
493        let request_like = RequestLike::new(&request);
494
495        let mut headers = match stored
496            .policy
497            .before_request(&request_like, SystemTime::now())
498        {
499            BeforeRequest::Fresh(parts) => {
500                // The cached response is still fresh, return it
501                debug!(
502                    method = request_like.method.as_str(),
503                    scheme = request_like.uri.scheme_str(),
504                    authority = request_like.uri.authority().map(Authority::as_str),
505                    path = request_like.uri.path(),
506                    key,
507                    digest = stored.digest,
508                    "response is still fresh: responding with body from storage"
509                );
510
511                stored.response.extend_headers(parts.headers);
512                stored.response.set_cache_status(
513                    CacheLookupStatus::Hit,
514                    CacheStatus::Hit,
515                    Some(&stored.digest),
516                );
517                return Ok(stored.response);
518            }
519            BeforeRequest::Stale {
520                request: http::request::Parts { headers, .. },
521                matches,
522            } => {
523                // Cached response is stale and needs to be revalidated
524                if matches { Some(headers) } else { None }
525            }
526        };
527
528        debug!(
529            method = request_like.method.as_str(),
530            scheme = request_like.uri.scheme_str(),
531            authority = request_like.uri.authority().map(Authority::as_str),
532            path = request_like.uri.path(),
533            key,
534            "response is stale: sending request upstream for revalidation"
535        );
536
537        // Invoke the revalidation hook if the request will use different headers
538        if let Some(headers) = &mut headers
539            && let Some(hook) = &self.hook
540        {
541            hook(&request_like, headers)?;
542        }
543
544        // Revalidate the request
545        match request.send(headers).await {
546            Ok(response) if response.status().is_success() => {
547                debug!(
548                    method = request_like.method.as_str(),
549                    scheme = request_like.uri.scheme_str(),
550                    authority = request_like.uri.authority().map(Authority::as_str),
551                    path = request_like.uri.path(),
552                    key,
553                    "server responded with a new response"
554                );
555
556                // The server responded with the body, the cached body is no longer valid
557                let policy = CachePolicy::new_options(
558                    &request_like,
559                    &response,
560                    SystemTime::now(),
561                    self.options,
562                );
563
564                let (parts, body) = response.into_parts();
565                match self.storage.store(key.clone(), parts, body, policy).await {
566                    Ok(mut response) => {
567                        response.set_cache_status(CacheLookupStatus::Hit, CacheStatus::Miss, None);
568                        Ok(response)
569                    }
570                    Err(e) => {
571                        debug!(
572                            method = request_like.method.as_str(),
573                            scheme = request_like.uri.scheme_str(),
574                            authority = request_like.uri.authority().map(Authority::as_str),
575                            path = request_like.uri.path(),
576                            key,
577                            error = format!("{e:?}"),
578                            "failed to put response into cache storage"
579                        );
580                        Err(e)
581                    }
582                }
583            }
584            Ok(response) if response.status() == StatusCode::NOT_MODIFIED => {
585                debug!(
586                    method = request_like.method.as_str(),
587                    scheme = request_like.uri.scheme_str(),
588                    authority = request_like.uri.authority().map(Authority::as_str),
589                    path = request_like.uri.path(),
590                    key,
591                    "server responded with a not modified status"
592                );
593
594                // The server informed us that our response hasn't been modified
595                // Note that the response body for this code is always empty
596                match stored
597                    .policy
598                    .after_response(&request_like, &response, SystemTime::now())
599                {
600                    AfterResponse::Modified(..) => {
601                        // Certain cloud providers (e.g. Azure Blob Storage) do not correctly
602                        // implement 304 responses. Specifically, they aren't returning the same
603                        // headers that a 2XX response would have. This causes the HTTP cache
604                        // implementation to effectively say the body needs updating when it does
605                        // not. Instead, we'll return a stale response with a warning; this will
606                        // cause unnecessary revalidation requests in the future, however, because
607                        // we are not storing an updated cache policy object.
608
609                        debug!(
610                            method = request_like.method.as_str(),
611                            scheme = request_like.uri.scheme_str(),
612                            authority = request_like.uri.authority().map(Authority::as_str),
613                            path = request_like.uri.path(),
614                            key,
615                            "cached response was considered modified despite revalidation \
616                             replying with not modified"
617                        );
618
619                        Self::prepare_stale_response(
620                            &request_like.uri,
621                            &mut stored.response,
622                            &stored.digest,
623                        );
624                        Ok(stored.response)
625                    }
626                    AfterResponse::NotModified(policy, parts) => {
627                        stored.response.extend_headers(parts.headers);
628
629                        let (parts, body) = stored.response.into_parts();
630                        match self
631                            .storage
632                            .put(&key, &parts, &policy, &stored.digest)
633                            .await
634                        {
635                            Ok(_) => {
636                                debug!(
637                                    method = request_like.method.as_str(),
638                                    scheme = request_like.uri.scheme_str(),
639                                    authority = request_like.uri.authority().map(Authority::as_str),
640                                    path = request_like.uri.path(),
641                                    key,
642                                    digest = stored.digest,
643                                    "response updated in cache successfully"
644                                );
645
646                                // Response was updated and the body comes from storage
647                                let mut cached_response = Response::from_parts(parts, body);
648                                cached_response.set_cache_status(
649                                    CacheLookupStatus::Hit,
650                                    CacheStatus::Hit,
651                                    Some(&stored.digest),
652                                );
653                                Ok(cached_response)
654                            }
655                            Err(e) => {
656                                debug!(
657                                    method = request_like.method.as_str(),
658                                    scheme = request_like.uri.scheme_str(),
659                                    authority = request_like.uri.authority().map(Authority::as_str),
660                                    path = request_like.uri.path(),
661                                    key,
662                                    error = format!("{e:?}"),
663                                    "failed to put response into cache storage"
664                                );
665                                Err(e)
666                            }
667                        }
668                    }
669                }
670            }
671            Ok(response)
672                if response.status().is_server_error() && !stored.response.must_revalidate() =>
673            {
674                debug!(
675                    method = request_like.method.as_str(),
676                    scheme = request_like.uri.scheme_str(),
677                    authority = request_like.uri.authority().map(Authority::as_str),
678                    path = request_like.uri.path(),
679                    key,
680                    stored.digest,
681                    "failed to revalidate response: serving potentially stale body from storage \
682                     with a warning"
683                );
684
685                Self::prepare_stale_response(
686                    &request_like.uri,
687                    &mut stored.response,
688                    &stored.digest,
689                );
690                Ok(stored.response)
691            }
692            Ok(mut response) => {
693                debug!(
694                    method = request_like.method.as_str(),
695                    scheme = request_like.uri.scheme_str(),
696                    authority = request_like.uri.authority().map(Authority::as_str),
697                    path = request_like.uri.path(),
698                    key,
699                    "failed to revalidate response: returning response from server uncached"
700                );
701
702                // Otherwise, don't serve the cached response at all
703                response.set_cache_status(CacheLookupStatus::Hit, CacheStatus::Miss, None);
704                Ok(response.map(CacheBody::from_upstream))
705            }
706            Err(e) => {
707                if stored.response.must_revalidate() {
708                    Err(e)
709                } else {
710                    debug!(
711                        method = request_like.method.as_str(),
712                        scheme = request_like.uri.scheme_str(),
713                        authority = request_like.uri.authority().map(Authority::as_str),
714                        path = request_like.uri.path(),
715                        key,
716                        stored.digest,
717                        "failed to revalidate response: serving potentially stale body from \
718                         storage with a warning"
719                    );
720
721                    Self::prepare_stale_response(
722                        &request_like.uri,
723                        &mut stored.response,
724                        &stored.digest,
725                    );
726                    Ok(stored.response)
727                }
728            }
729        }
730    }
731
732    /// Prepares a stale response for sending back to the client.
733    fn prepare_stale_response<B>(uri: &Uri, response: &mut Response<CacheBody<B>>, digest: &str) {
734        // If the server failed to give us a response, add the required warning to the
735        // cached response:
736        //   111 Revalidation failed
737        //   MUST be included if a cache returns a stale response
738        //   because an attempt to revalidate the response failed,
739        //   due to an inability to reach the server.
740        // (https://tools.ietf.org/html/rfc2616#section-14.46)
741        response.add_warning(uri, 111, "Revalidation failed");
742        response.set_cache_status(CacheLookupStatus::Hit, CacheStatus::Hit, Some(digest));
743    }
744}