http_cache_stream/
cache.rs

1//! Implementation of the HTTP cache.
2
3use std::fmt;
4use std::io;
5use std::pin::Pin;
6use std::task::Context;
7use std::task::Poll;
8use std::task::ready;
9use std::time::SystemTime;
10
11use anyhow::Result;
12use bytes::Bytes;
13use http::HeaderMap;
14use http::HeaderValue;
15use http::Method;
16use http::Response;
17use http::StatusCode;
18use http::Uri;
19use http::Version;
20use http::header;
21use http::header::CACHE_CONTROL;
22use http::uri::Authority;
23use http_cache_semantics::AfterResponse;
24use http_cache_semantics::BeforeRequest;
25use http_cache_semantics::CacheOptions;
26use http_cache_semantics::CachePolicy;
27use sha2::Digest;
28use sha2::Sha256;
29use tracing::debug;
30
31use crate::body::Body;
32use crate::storage::CacheStorage;
33use crate::storage::StoredResponse;
34
35/// The name of the `x-cache-lookup` custom header.
36///
37/// Value will be `HIT` if a response existed in cache, `MISS` if not.
38pub const X_CACHE_LOOKUP: &str = "x-cache-lookup";
39
40/// The name of the `x-cache` custom header.
41///
42/// Value will be `HIT` if a response was served from the cache, `MISS` if not.
43pub const X_CACHE: &str = "x-cache";
44
45/// The name of the `x-cache-digest` custom header.
46///
47/// This header is only present in the response when returning a body from the
48/// cache.
49///
50/// This can be used to read a body directly from cache storage rather than
51/// reading the body through the response.
52///
53/// This header is only present when a cached response body is being served from
54/// the cache.
55pub const X_CACHE_DIGEST: &str = "x-cache-digest";
56
57/// Gets the storage key for a request.
58fn storage_key(method: &Method, uri: &Uri, headers: &HeaderMap) -> String {
59    let mut hasher = Sha256::new();
60    hasher.update(method.as_str());
61    hasher.update(":");
62
63    if let Some(scheme) = uri.scheme_str() {
64        hasher.update(scheme);
65    }
66
67    hasher.update("://");
68    if let Some(authority) = uri.authority() {
69        hasher.update(authority.as_str());
70    }
71
72    hasher.update(uri.path());
73
74    if let Some(query) = uri.query() {
75        hasher.update(query);
76    }
77
78    if let Some(value) = headers.get(header::RANGE) {
79        hasher.update(value.as_bytes());
80    }
81
82    let bytes = hasher.finalize();
83    hex::encode(bytes)
84}
85
86/// Represents a basic cache lookup status.
87///
88/// Used in the custom header `x-cache-lookup`.
89#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
90pub enum CacheLookupStatus {
91    /// A response exists in the cache.
92    Hit,
93    /// A response does not exist in the cache.
94    Miss,
95}
96
97impl fmt::Display for CacheLookupStatus {
98    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99        match self {
100            Self::Hit => write!(f, "HIT"),
101            Self::Miss => write!(f, "MISS"),
102        }
103    }
104}
105
106/// Represents a cache status.
107///
108/// Used in the custom header `x-cache`.
109#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
110pub enum CacheStatus {
111    /// The response was served from the cache.
112    Hit,
113    /// The response was not served from the cache.
114    Miss,
115}
116
117impl fmt::Display for CacheStatus {
118    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119        match self {
120            Self::Hit => write!(f, "HIT"),
121            Self::Miss => write!(f, "MISS"),
122        }
123    }
124}
125
126/// An extension trait for [`Response`].
127trait ResponseExt {
128    /// Adds a warning header to the response.
129    fn add_warning(&mut self, uri: &Uri, code: usize, message: &str);
130
131    /// Checks if the Cache-Control header contains the must-revalidate
132    /// directive.
133    fn must_revalidate(&self) -> bool;
134
135    /// Extends the request's headers with those from the given header map.
136    ///
137    /// Existing matching headers will be replaced.
138    fn extend_headers(&mut self, headers: HeaderMap);
139
140    /// Sets the cache status headers of the response.
141    fn set_cache_status(
142        &mut self,
143        lookup: CacheLookupStatus,
144        status: CacheStatus,
145        digest: Option<&str>,
146    );
147}
148
149impl<B> ResponseExt for Response<B> {
150    fn add_warning(&mut self, url: &Uri, code: usize, message: &str) {
151        // warning    = "warning" ":" 1#warning-value
152        // warning-value = warn-code SP warn-agent SP warn-text [SP warn-date]
153        // warn-code  = 3DIGIT
154        // warn-agent = ( host [ ":" port ] ) | pseudonym
155        //                 ; the name or pseudonym of the server adding
156        //                 ; the warning header, for use in debugging
157        // warn-text  = quoted-string
158        // warn-date  = <"> HTTP-date <">
159        // (https://tools.ietf.org/html/rfc2616#section-14.46)
160        self.headers_mut().insert(
161            "warning",
162            HeaderValue::from_str(&format!(
163                "{} {} {:?} \"{}\"",
164                code,
165                url.host().expect("URL should be valid"),
166                message,
167                httpdate::fmt_http_date(SystemTime::now())
168            ))
169            .expect("value should be valid"),
170        );
171    }
172
173    fn must_revalidate(&self) -> bool {
174        self.headers()
175            .get(CACHE_CONTROL.as_str())
176            .is_some_and(|val| {
177                val.to_str()
178                    .unwrap_or("")
179                    .to_lowercase()
180                    .contains("must-revalidate")
181            })
182    }
183
184    fn extend_headers(&mut self, headers: HeaderMap) {
185        self.headers_mut().extend(headers);
186    }
187
188    fn set_cache_status(
189        &mut self,
190        lookup: CacheLookupStatus,
191        status: CacheStatus,
192        digest: Option<&str>,
193    ) {
194        let headers = self.headers_mut();
195        headers.insert(
196            X_CACHE_LOOKUP,
197            lookup.to_string().parse().expect("value should parse"),
198        );
199        headers.insert(
200            X_CACHE,
201            status.to_string().parse().expect("value should parse"),
202        );
203        if let Some(digest) = digest {
204            headers.insert(X_CACHE_DIGEST, digest.parse().expect("value should parse"));
205        }
206    }
207}
208
209/// Represents the supported HTTP body trait from middleware integrations.
210pub trait HttpBody: http_body::Body<Data = Bytes, Error = io::Error> + Send {
211    /// Polls the next data frame as bytes.
212    ///
213    /// Returns end of stream after all data frames, thereby ignoring trailers.
214    fn poll_next_data(
215        self: Pin<&mut Self>,
216        cx: &mut Context<'_>,
217    ) -> Poll<Option<io::Result<Bytes>>> {
218        match ready!(self.poll_frame(cx)) {
219            Some(Ok(frame)) => match frame.into_data().ok() {
220                Some(data) => Poll::Ready(Some(Ok(data))),
221                None => Poll::Ready(None),
222            },
223            Some(Err(e)) => Poll::Ready(Some(Err(e))),
224            None => Poll::Ready(None),
225        }
226    }
227}
228
229/// An abstraction of an HTTP request.
230///
231/// This trait is used in HTTP middleware integrations to abstract the request
232/// type and sending the request upstream.
233pub trait Request<B: HttpBody>: Send {
234    /// Gets the request's version.
235    fn version(&self) -> Version;
236
237    /// Gets the request's method.
238    fn method(&self) -> &Method;
239
240    /// Gets the request's URI.
241    fn uri(&self) -> &Uri;
242
243    /// Gets the request's headers.
244    fn headers(&self) -> &HeaderMap;
245
246    /// Sends the request to upstream and gets the response.
247    ///
248    /// If `headers` is `Some`, the supplied headers should override any
249    /// matching headers in the original request.
250    fn send(self, headers: Option<HeaderMap>) -> impl Future<Output = Result<Response<B>>> + Send;
251}
252
253/// Provides an implementation of `RequestLike` for `http-cache-semantics`.
254struct RequestLike {
255    /// The request method.
256    method: Method,
257    /// The request URI.
258    uri: Uri,
259    /// The request headers.
260    headers: HeaderMap,
261}
262
263impl RequestLike {
264    /// Constructs a new `RequestLike` for the given request.
265    fn new<R: Request<B>, B: HttpBody>(request: &R) -> Self {
266        // Unfortunate we have to clone the header map here
267        Self {
268            method: request.method().clone(),
269            uri: request.uri().clone(),
270            headers: request.headers().clone(),
271        }
272    }
273}
274
275impl http_cache_semantics::RequestLike for RequestLike {
276    fn uri(&self) -> Uri {
277        // Note: URI is cheaply cloned
278        self.uri.clone()
279    }
280
281    fn is_same_uri(&self, other: &Uri) -> bool {
282        self.uri.eq(other)
283    }
284
285    fn method(&self) -> &Method {
286        &self.method
287    }
288
289    fn headers(&self) -> &HeaderMap {
290        &self.headers
291    }
292}
293
294/// Implement a HTTP cache.
295pub struct Cache<S> {
296    /// The cache storage.
297    storage: S,
298    /// The cache options to use.
299    options: CacheOptions,
300}
301
302impl<S> Cache<S>
303where
304    S: CacheStorage,
305{
306    /// Construct a new cache with the given storage.
307    ///
308    /// Defaults to a private cache.
309    pub fn new(storage: S) -> Self {
310        Self {
311            storage,
312            // Default to a private cache
313            options: CacheOptions {
314                shared: false,
315                ..Default::default()
316            },
317        }
318    }
319
320    /// Construct a new cache with the given storage and options.
321    pub fn new_with_options(storage: S, options: CacheOptions) -> Self {
322        Self { storage, options }
323    }
324
325    /// Gets the storage used by the cache.
326    pub fn storage(&self) -> &S {
327        &self.storage
328    }
329
330    /// Sends a HTTP request through the cache.
331    ///
332    /// If a previous response is cached and not stale, the request is not sent
333    /// upstream and the cached response is returned.
334    ///
335    /// If a previous response is cached and is stale, the response is
336    /// revalidated, the cache is updated, and the cached response returned.
337    ///
338    /// If a previous response is not in the cache, the request is sent upstream
339    /// and the response is cached, if it is cacheable.
340    pub async fn send<B: HttpBody>(&self, request: impl Request<B>) -> Result<Response<Body<B>>> {
341        let method = request.method();
342        let uri = request.uri();
343
344        let key = storage_key(method, uri, request.headers());
345        if matches!(*method, Method::GET | Method::HEAD) {
346            match self.storage.get(&key).await {
347                Ok(Some(stored)) => {
348                    debug!(
349                        method = method.as_str(),
350                        scheme = uri.scheme_str(),
351                        authority = uri.authority().map(Authority::as_str),
352                        path = uri.path(),
353                        key,
354                        "cache hit"
355                    );
356                    return self.conditional_send_upstream(key, request, stored).await;
357                }
358                Ok(None) => {
359                    debug!(
360                        method = method.as_str(),
361                        scheme = uri.scheme_str(),
362                        authority = uri.authority().map(Authority::as_str),
363                        path = uri.path(),
364                        key,
365                        "cache miss"
366                    );
367                }
368                Err(e) => {
369                    debug!(
370                        method = method.as_str(),
371                        scheme = uri.scheme_str(),
372                        authority = uri.authority().map(Authority::as_str),
373                        path = uri.path(),
374                        key,
375                        error = format!("{e:?}"),
376                        "failed to get response from storage; treating as not cached"
377                    );
378
379                    // Treat as a miss
380                }
381            }
382        }
383
384        self.send_upstream(key, request, CacheLookupStatus::Miss)
385            .await
386    }
387
388    /// Sends the original request upstream.
389    ///
390    /// Caches the response if the response is cacheable.
391    async fn send_upstream<B: HttpBody>(
392        &self,
393        key: String,
394        request: impl Request<B>,
395        lookup_status: CacheLookupStatus,
396    ) -> Result<Response<Body<B>>> {
397        let request_like: RequestLike = RequestLike::new(&request);
398
399        let mut response = request.send(None).await?;
400        let policy =
401            CachePolicy::new_options(&request_like, &response, SystemTime::now(), self.options);
402
403        response.set_cache_status(lookup_status, CacheStatus::Miss, None);
404
405        if matches!(request_like.method, Method::GET | Method::HEAD)
406            && response.status().is_success()
407            && policy.is_storable()
408        {
409            let (parts, body) = response.into_parts();
410            return match self.storage.store(key.clone(), parts, body, policy).await {
411                Ok(response) => Ok(response),
412                Err(e) => {
413                    debug!(
414                        method = request_like.method.as_str(),
415                        scheme = request_like.uri.scheme_str(),
416                        authority = request_like.uri.authority().map(Authority::as_str),
417                        path = request_like.uri.path(),
418                        key,
419                        error = format!("{e:?}"),
420                        "failed to store response"
421                    );
422                    Err(e)
423                }
424            };
425        }
426
427        debug!(
428            method = request_like.method.as_str(),
429            scheme = request_like.uri.scheme_str(),
430            authority = request_like.uri.authority().map(Authority::as_str),
431            path = request_like.uri.path(),
432            key,
433            status = response.status().as_u16(),
434            "response is not cacheable"
435        );
436
437        if !request_like.method.is_safe() {
438            // If the request is not safe, assume the resource has been modified and delete
439            // any cached responses we may have for HEAD/GET
440            for method in [Method::HEAD, Method::GET] {
441                let key = storage_key(&method, &request_like.uri, &request_like.headers);
442                if let Err(e) = self.storage.delete(&key).await {
443                    debug!(
444                        method = method.as_str(),
445                        scheme = request_like.uri.scheme_str(),
446                        authority = request_like.uri.authority().map(Authority::as_str),
447                        path = request_like.uri.path(),
448                        key,
449                        error = format!("{e:?}"),
450                        "failed to put response into storage"
451                    );
452                }
453            }
454        }
455
456        Ok(response.map(Body::from_upstream))
457    }
458
459    /// Performs a conditional send to upstream.
460    ///
461    /// If a cached request is still fresh, it is returned.
462    ///
463    /// If a cached request is stale, an attempt is made to revalidate it.
464    async fn conditional_send_upstream<B: HttpBody>(
465        &self,
466        key: String,
467        request: impl Request<B>,
468        mut stored: StoredResponse<B>,
469    ) -> Result<Response<Body<B>>> {
470        let request_like = RequestLike::new(&request);
471
472        let headers = match stored
473            .policy
474            .before_request(&request_like, SystemTime::now())
475        {
476            BeforeRequest::Fresh(parts) => {
477                // The cached response is still fresh, return it
478                debug!(
479                    method = request_like.method.as_str(),
480                    scheme = request_like.uri.scheme_str(),
481                    authority = request_like.uri.authority().map(Authority::as_str),
482                    path = request_like.uri.path(),
483                    key,
484                    digest = stored.digest,
485                    "response is still fresh: responding with body from storage"
486                );
487
488                stored.response.extend_headers(parts.headers);
489                stored.response.set_cache_status(
490                    CacheLookupStatus::Hit,
491                    CacheStatus::Hit,
492                    Some(&stored.digest),
493                );
494                return Ok(stored.response);
495            }
496            BeforeRequest::Stale {
497                request: http::request::Parts { headers, .. },
498                matches,
499            } => {
500                // Cached response is stale and needs to be revalidated
501                if matches { Some(headers) } else { None }
502            }
503        };
504
505        debug!(
506            method = request_like.method.as_str(),
507            scheme = request_like.uri.scheme_str(),
508            authority = request_like.uri.authority().map(Authority::as_str),
509            path = request_like.uri.path(),
510            key,
511            "response is stale: sending request upstream for validation"
512        );
513
514        // Revalidate the request
515        match request.send(headers).await {
516            Ok(response) if response.status().is_success() => {
517                debug!(
518                    method = request_like.method.as_str(),
519                    scheme = request_like.uri.scheme_str(),
520                    authority = request_like.uri.authority().map(Authority::as_str),
521                    path = request_like.uri.path(),
522                    key,
523                    "server responded with a new response"
524                );
525
526                // The server responded with the body, the cached body is no longer valid
527                let policy = CachePolicy::new_options(
528                    &request_like,
529                    &response,
530                    SystemTime::now(),
531                    self.options,
532                );
533
534                let (parts, body) = response.into_parts();
535                match self.storage.store(key.clone(), parts, body, policy).await {
536                    Ok(mut response) => {
537                        response.set_cache_status(CacheLookupStatus::Hit, CacheStatus::Miss, None);
538                        Ok(response)
539                    }
540                    Err(e) => {
541                        debug!(
542                            method = request_like.method.as_str(),
543                            scheme = request_like.uri.scheme_str(),
544                            authority = request_like.uri.authority().map(Authority::as_str),
545                            path = request_like.uri.path(),
546                            key,
547                            error = format!("{e:?}"),
548                            "failed to put response into cache storage"
549                        );
550                        Err(e)
551                    }
552                }
553            }
554            Ok(response) if response.status() == StatusCode::NOT_MODIFIED => {
555                debug!(
556                    method = request_like.method.as_str(),
557                    scheme = request_like.uri.scheme_str(),
558                    authority = request_like.uri.authority().map(Authority::as_str),
559                    path = request_like.uri.path(),
560                    key,
561                    "server responded with a not modified status"
562                );
563
564                // The server informed us that our response hasn't been modified
565                // Note that the response body for this code is always empty
566                match stored
567                    .policy
568                    .after_response(&request_like, &response, SystemTime::now())
569                {
570                    AfterResponse::Modified(..) => {
571                        // Certain cloud providers (e.g. Azure Blob Storage) do not correctly
572                        // implement 304 responses. Specifically, they aren't returning the same
573                        // headers that a 2XX response would have. This causes the HTTP cache
574                        // implementation to effectively say the body needs updating when it does
575                        // not. Instead, we'll return a stale response with a warning; this will
576                        // cause unnecessary revalidation requests in the future, however, because
577                        // we are not storing an updated cache policy object.
578
579                        debug!(
580                            method = request_like.method.as_str(),
581                            scheme = request_like.uri.scheme_str(),
582                            authority = request_like.uri.authority().map(Authority::as_str),
583                            path = request_like.uri.path(),
584                            key,
585                            "cached response was considered modified despite revalidation \
586                             replying with not modified"
587                        );
588
589                        Self::prepare_stale_response(
590                            &request_like.uri,
591                            &mut stored.response,
592                            &stored.digest,
593                        );
594                        Ok(stored.response)
595                    }
596                    AfterResponse::NotModified(policy, parts) => {
597                        stored.response.extend_headers(parts.headers);
598
599                        let (parts, body) = stored.response.into_parts();
600                        match self
601                            .storage
602                            .put(&key, &parts, &policy, &stored.digest)
603                            .await
604                        {
605                            Ok(_) => {
606                                debug!(
607                                    method = request_like.method.as_str(),
608                                    scheme = request_like.uri.scheme_str(),
609                                    authority = request_like.uri.authority().map(Authority::as_str),
610                                    path = request_like.uri.path(),
611                                    key,
612                                    digest = stored.digest,
613                                    "response updated in cache successfully"
614                                );
615
616                                // Response was updated and the body comes from storage
617                                let mut cached_response = Response::from_parts(parts, body);
618                                cached_response.set_cache_status(
619                                    CacheLookupStatus::Hit,
620                                    CacheStatus::Hit,
621                                    Some(&stored.digest),
622                                );
623                                Ok(cached_response)
624                            }
625                            Err(e) => {
626                                debug!(
627                                    method = request_like.method.as_str(),
628                                    scheme = request_like.uri.scheme_str(),
629                                    authority = request_like.uri.authority().map(Authority::as_str),
630                                    path = request_like.uri.path(),
631                                    key,
632                                    error = format!("{e:?}"),
633                                    "failed to put response into cache storage"
634                                );
635                                Err(e)
636                            }
637                        }
638                    }
639                }
640            }
641            Ok(response)
642                if response.status().is_server_error() && !stored.response.must_revalidate() =>
643            {
644                debug!(
645                    method = request_like.method.as_str(),
646                    scheme = request_like.uri.scheme_str(),
647                    authority = request_like.uri.authority().map(Authority::as_str),
648                    path = request_like.uri.path(),
649                    key,
650                    stored.digest,
651                    "failed to revalidate response: serving potentially stale body from storage \
652                     with a warning"
653                );
654
655                Self::prepare_stale_response(
656                    &request_like.uri,
657                    &mut stored.response,
658                    &stored.digest,
659                );
660                Ok(stored.response)
661            }
662            Ok(mut response) => {
663                debug!(
664                    method = request_like.method.as_str(),
665                    scheme = request_like.uri.scheme_str(),
666                    authority = request_like.uri.authority().map(Authority::as_str),
667                    path = request_like.uri.path(),
668                    key,
669                    "failed to revalidate response: returning response from server uncached"
670                );
671
672                // Otherwise, don't serve the cached response at all
673                response.set_cache_status(CacheLookupStatus::Hit, CacheStatus::Miss, None);
674                Ok(response.map(Body::from_upstream))
675            }
676            Err(e) => {
677                if stored.response.must_revalidate() {
678                    Err(e)
679                } else {
680                    debug!(
681                        method = request_like.method.as_str(),
682                        scheme = request_like.uri.scheme_str(),
683                        authority = request_like.uri.authority().map(Authority::as_str),
684                        path = request_like.uri.path(),
685                        key,
686                        stored.digest,
687                        "failed to revalidate response: serving potentially stale body from \
688                         storage with a warning"
689                    );
690
691                    Self::prepare_stale_response(
692                        &request_like.uri,
693                        &mut stored.response,
694                        &stored.digest,
695                    );
696                    Ok(stored.response)
697                }
698            }
699        }
700    }
701
702    /// Prepares a stale response for sending back to the client.
703    fn prepare_stale_response<B>(uri: &Uri, response: &mut Response<Body<B>>, digest: &str) {
704        // If the server failed to give us a response, add the required warning to the
705        // cached response:
706        //   111 Revalidation failed
707        //   MUST be included if a cache returns a stale response
708        //   because an attempt to revalidate the response failed,
709        //   due to an inability to reach the server.
710        // (https://tools.ietf.org/html/rfc2616#section-14.46)
711        response.add_warning(uri, 111, "Revalidation failed");
712        response.set_cache_status(CacheLookupStatus::Hit, CacheStatus::Hit, Some(digest));
713    }
714}