http_cache_stream/
cache.rs

1//! Implementation of the HTTP cache.
2
3use std::fmt;
4use std::io;
5use std::time::SystemTime;
6
7use anyhow::Result;
8use anyhow::bail;
9use bytes::Bytes;
10use http::HeaderMap;
11use http::HeaderValue;
12use http::Method;
13use http::Response;
14use http::StatusCode;
15use http::Uri;
16use http::Version;
17use http::header::CACHE_CONTROL;
18use http::uri::Authority;
19use http_cache_semantics::AfterResponse;
20use http_cache_semantics::BeforeRequest;
21use http_cache_semantics::CacheOptions;
22use http_cache_semantics::CachePolicy;
23use sha2::Digest;
24use sha2::Sha256;
25use tracing::debug;
26
27use crate::body::Body;
28use crate::storage::CacheStorage;
29use crate::storage::StoredResponse;
30
31/// The name of the `x-cache-lookup` custom header.
32///
33/// Value will be `HIT` if a response existed in cache, `MISS` if not.
34pub const X_CACHE_LOOKUP: &str = "x-cache-lookup";
35
36/// The name of the `x-cache` custom header.
37///
38/// Value will be `HIT` if a response was served from the cache, `MISS` if not.
39pub const X_CACHE: &str = "x-cache";
40
41/// The name of the `x-cache-digest` custom header.
42///
43/// The value will be the calculated content digest for the response body.
44///
45/// This can be used to link a response body directly from cache storage rather
46/// than reading the body through the HTTP client.
47///
48/// This header is only present when the body is coming directly from the cache.
49pub const X_CACHE_DIGEST: &str = "x-cache-digest";
50
51/// Gets the storage key for a request.
52fn storage_key(method: &Method, uri: &Uri) -> String {
53    let mut hasher = Sha256::new();
54    hasher.update(method.as_str());
55    hasher.update(":");
56
57    if let Some(scheme) = uri.scheme_str() {
58        hasher.update(scheme);
59    }
60
61    hasher.update("://");
62    if let Some(authority) = uri.authority() {
63        hasher.update(authority.as_str());
64    }
65
66    hasher.update(uri.path());
67
68    if let Some(query) = uri.query() {
69        hasher.update(query);
70    }
71
72    let bytes = hasher.finalize();
73    hex::encode(bytes)
74}
75
76/// Represents a basic cache lookup status.
77///
78/// Used in the custom header `x-cache-lookup`.
79#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
80pub enum CacheLookupStatus {
81    /// A response exists in the cache.
82    Hit,
83    /// A response does not exist in the cache.
84    Miss,
85}
86
87impl fmt::Display for CacheLookupStatus {
88    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89        match self {
90            Self::Hit => write!(f, "HIT"),
91            Self::Miss => write!(f, "MISS"),
92        }
93    }
94}
95
96/// Represents a cache status.
97///
98/// Used in the custom header `x-cache`.
99#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
100pub enum CacheStatus {
101    /// The response was served from the cache.
102    Hit,
103    /// The response was not served from the cache.
104    Miss,
105}
106
107impl fmt::Display for CacheStatus {
108    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
109        match self {
110            Self::Hit => write!(f, "HIT"),
111            Self::Miss => write!(f, "MISS"),
112        }
113    }
114}
115
116/// An extension trait for [`Response`].
117trait ResponseExt {
118    /// Adds a warning header to the response.
119    fn add_warning(&mut self, uri: &Uri, code: usize, message: &str);
120
121    /// Checks if the Cache-Control header contains the must-revalidate
122    /// directive.
123    fn must_revalidate(&self) -> bool;
124
125    /// Extends the request's headers with those from the given header map.
126    ///
127    /// Existing matching headers will be replaced.
128    fn extend_headers(&mut self, headers: HeaderMap);
129
130    /// Sets the cache status headers of the response.
131    fn set_cache_status(&mut self, lookup: CacheLookupStatus, status: CacheStatus);
132
133    /// Sets the cache digest header of the response.
134    fn set_cache_digest(&mut self, digest: &str);
135}
136
137impl<B> ResponseExt for Response<B> {
138    fn add_warning(&mut self, url: &Uri, code: usize, message: &str) {
139        // warning    = "warning" ":" 1#warning-value
140        // warning-value = warn-code SP warn-agent SP warn-text [SP warn-date]
141        // warn-code  = 3DIGIT
142        // warn-agent = ( host [ ":" port ] ) | pseudonym
143        //                 ; the name or pseudonym of the server adding
144        //                 ; the warning header, for use in debugging
145        // warn-text  = quoted-string
146        // warn-date  = <"> HTTP-date <">
147        // (https://tools.ietf.org/html/rfc2616#section-14.46)
148        self.headers_mut().insert(
149            "warning",
150            HeaderValue::from_str(&format!(
151                "{} {} {:?} \"{}\"",
152                code,
153                url.host().expect("URL should be valid"),
154                message,
155                httpdate::fmt_http_date(SystemTime::now())
156            ))
157            .expect("value should be valid"),
158        );
159    }
160
161    fn must_revalidate(&self) -> bool {
162        self.headers()
163            .get(CACHE_CONTROL.as_str())
164            .is_some_and(|val| {
165                val.to_str()
166                    .unwrap_or("")
167                    .to_lowercase()
168                    .contains("must-revalidate")
169            })
170    }
171
172    fn extend_headers(&mut self, headers: HeaderMap) {
173        self.headers_mut().extend(headers);
174    }
175
176    fn set_cache_status(&mut self, lookup: CacheLookupStatus, status: CacheStatus) {
177        self.headers_mut().insert(
178            X_CACHE_LOOKUP,
179            lookup.to_string().parse().expect("value should parse"),
180        );
181        self.headers_mut().insert(
182            X_CACHE,
183            status.to_string().parse().expect("value should parse"),
184        );
185    }
186
187    fn set_cache_digest(&mut self, digest: &str) {
188        self.headers_mut()
189            .insert(X_CACHE_DIGEST, digest.parse().expect("value should parse"));
190    }
191}
192
193/// Represents the supported HTTP body trait from middleware integrations.
194pub trait HttpBody: http_body::Body<Data = Bytes, Error = io::Error> + Send {}
195
196/// An abstraction of an HTTP request.
197///
198/// This trait is used in HTTP middleware integrations to abstract the request
199/// type and sending the request upstream.
200pub trait Request<B: HttpBody>: Send {
201    /// Gets the request's version.
202    fn version(&self) -> Version;
203
204    /// Gets the request's method.
205    fn method(&self) -> &Method;
206
207    /// Gets the request's URI.
208    fn uri(&self) -> &Uri;
209
210    /// Gets the request's headers.
211    fn headers(&self) -> &HeaderMap;
212
213    /// Sends the request to upstream and gets the response.
214    ///
215    /// If `headers` is `Some`, the supplied headers should override any
216    /// matching headers in the original request.
217    fn send(self, headers: Option<HeaderMap>) -> impl Future<Output = Result<Response<B>>> + Send;
218}
219
220/// Provides an implementation of `RequestLike` for `http-cache-semantics`.
221struct RequestLike {
222    /// The request method.
223    method: Method,
224    /// The request URI.
225    uri: Uri,
226    /// The request headers.
227    headers: HeaderMap,
228}
229
230impl RequestLike {
231    /// Constructs a new `RequestLike` for the given request.
232    fn new<R: Request<B>, B: HttpBody>(request: &R) -> Self {
233        // Unfortunate we have to clone the header map here
234        Self {
235            method: request.method().clone(),
236            uri: request.uri().clone(),
237            headers: request.headers().clone(),
238        }
239    }
240}
241
242impl http_cache_semantics::RequestLike for RequestLike {
243    fn uri(&self) -> Uri {
244        // Note: URI is cheaply cloned
245        self.uri.clone()
246    }
247
248    fn is_same_uri(&self, other: &Uri) -> bool {
249        self.uri.eq(other)
250    }
251
252    fn method(&self) -> &Method {
253        &self.method
254    }
255
256    fn headers(&self) -> &HeaderMap {
257        &self.headers
258    }
259}
260
261/// Implement a HTTP cache.
262pub struct Cache<S> {
263    /// The cache storage.
264    storage: S,
265    /// The cache options to use.
266    options: CacheOptions,
267}
268
269impl<S> Cache<S>
270where
271    S: CacheStorage,
272{
273    /// Construct a new cache with the given storage.
274    ///
275    /// Defaults to a private cache.
276    pub fn new(storage: S) -> Self {
277        Self {
278            storage,
279            // Default to a private cache
280            options: CacheOptions {
281                shared: false,
282                ..Default::default()
283            },
284        }
285    }
286
287    /// Construct a new cache with the given storage and options.
288    pub fn new_with_options(storage: S, options: CacheOptions) -> Self {
289        Self { storage, options }
290    }
291
292    /// Gets the storage used by the cache.
293    pub fn storage(&self) -> &S {
294        &self.storage
295    }
296
297    /// Sends a HTTP request through the cache.
298    ///
299    /// If a previous response is cached and not stale, the request is not sent
300    /// upstream and the cached response is returned.
301    ///
302    /// If a previous response is cached and is stale, the response is
303    /// revalidated, the cache is updated, and the cached response returned.
304    ///
305    /// If a previous response is not in the cache, the request is sent upstream
306    /// and the response is cached, if it is cacheable.
307    pub async fn send<B: HttpBody>(&self, request: impl Request<B>) -> Result<Response<Body<B>>> {
308        let method = request.method();
309        let uri = request.uri();
310
311        let key = storage_key(method, uri);
312        if matches!(*method, Method::GET | Method::HEAD) {
313            match self.storage.get(&key).await {
314                Ok(Some(stored)) => {
315                    debug!(
316                        method = method.as_str(),
317                        scheme = uri.scheme_str(),
318                        authority = uri.authority().map(Authority::as_str),
319                        path = uri.path(),
320                        key,
321                        "cache hit"
322                    );
323                    return self.conditional_send_upstream(key, request, stored).await;
324                }
325                Ok(None) => {
326                    debug!(
327                        method = method.as_str(),
328                        scheme = uri.scheme_str(),
329                        authority = uri.authority().map(Authority::as_str),
330                        path = uri.path(),
331                        key,
332                        "cache miss"
333                    );
334                }
335                Err(e) => {
336                    debug!(
337                        method = method.as_str(),
338                        scheme = uri.scheme_str(),
339                        authority = uri.authority().map(Authority::as_str),
340                        path = uri.path(),
341                        key,
342                        error = format!("{e:?}"),
343                        "failed to get response from storage; treating as not cached"
344                    );
345
346                    // Treat as a miss
347                }
348            }
349        }
350
351        self.send_upstream(key, request, CacheLookupStatus::Miss)
352            .await
353    }
354
355    /// Sends the original request upstream.
356    ///
357    /// Caches the response if the response is cacheable.
358    async fn send_upstream<B: HttpBody>(
359        &self,
360        key: String,
361        request: impl Request<B>,
362        lookup_status: CacheLookupStatus,
363    ) -> Result<Response<Body<B>>> {
364        let request_like: RequestLike = RequestLike::new(&request);
365
366        let mut response = request.send(None).await?;
367        let policy =
368            CachePolicy::new_options(&request_like, &response, SystemTime::now(), self.options);
369
370        response.set_cache_status(lookup_status, CacheStatus::Miss);
371
372        if matches!(request_like.method, Method::GET | Method::HEAD)
373            && response.status() == StatusCode::OK
374            && policy.is_storable()
375        {
376            let (parts, body) = response.into_parts();
377            return match self
378                .storage
379                .put_with_body(&key, &parts, &policy, body)
380                .await
381            {
382                Ok((body, digest)) => {
383                    debug!(
384                        method = request_like.method.as_str(),
385                        scheme = request_like.uri.scheme_str(),
386                        authority = request_like.uri.authority().map(Authority::as_str),
387                        path = request_like.uri.path(),
388                        key,
389                        digest,
390                        "cache storage updated successfully"
391                    );
392
393                    let mut response = Response::from_parts(parts, body);
394                    response.set_cache_digest(&digest);
395                    Ok(response)
396                }
397                Err(e) => {
398                    debug!(
399                        method = request_like.method.as_str(),
400                        scheme = request_like.uri.scheme_str(),
401                        authority = request_like.uri.authority().map(Authority::as_str),
402                        path = request_like.uri.path(),
403                        key,
404                        error = format!("{e:?}"),
405                        "failed to put response into storage"
406                    );
407                    Err(e)
408                }
409            };
410        }
411
412        debug!(
413            method = request_like.method.as_str(),
414            scheme = request_like.uri.scheme_str(),
415            authority = request_like.uri.authority().map(Authority::as_str),
416            path = request_like.uri.path(),
417            key,
418            status = response.status().as_u16(),
419            "response is not cacheable"
420        );
421
422        if !request_like.method.is_safe() {
423            // If the request is not safe, assume the resource has been modified and delete
424            // any cached responses we may have for HEAD/GET
425            for method in [Method::HEAD, Method::GET] {
426                let key = storage_key(&method, &request_like.uri);
427                if let Err(e) = self.storage.delete(&key).await {
428                    debug!(
429                        method = method.as_str(),
430                        scheme = request_like.uri.scheme_str(),
431                        authority = request_like.uri.authority().map(Authority::as_str),
432                        path = request_like.uri.path(),
433                        key,
434                        error = format!("{e:?}"),
435                        "failed to put response into storage"
436                    );
437                }
438            }
439        }
440
441        Ok(response.map(|body| Body::from_upstream(body)))
442    }
443
444    /// Performs a conditional send to upstream.
445    ///
446    /// If a cached request is still fresh, it is returned.
447    ///
448    /// If a cached request is stale, an attempt is made to revalidate it.
449    async fn conditional_send_upstream<B: HttpBody>(
450        &self,
451        key: String,
452        request: impl Request<B>,
453        mut stored: StoredResponse<B>,
454    ) -> Result<Response<Body<B>>> {
455        let request_like = RequestLike::new(&request);
456
457        let headers = match stored
458            .policy
459            .before_request(&request_like, SystemTime::now())
460        {
461            BeforeRequest::Fresh(parts) => {
462                // The cached response is still fresh, return it
463                debug!(
464                    method = request_like.method.as_str(),
465                    scheme = request_like.uri.scheme_str(),
466                    authority = request_like.uri.authority().map(Authority::as_str),
467                    path = request_like.uri.path(),
468                    key,
469                    digest = stored.digest,
470                    "response is still fresh: responding with body from storage"
471                );
472
473                stored.response.extend_headers(parts.headers);
474                stored
475                    .response
476                    .set_cache_status(CacheLookupStatus::Hit, CacheStatus::Hit);
477                stored.response.set_cache_digest(&stored.digest);
478                return Ok(stored.response);
479            }
480            BeforeRequest::Stale {
481                request: http::request::Parts { headers, .. },
482                matches,
483            } => {
484                // Cached response is stale and needs to be revalidated
485                if matches { Some(headers) } else { None }
486            }
487        };
488
489        debug!(
490            method = request_like.method.as_str(),
491            scheme = request_like.uri.scheme_str(),
492            authority = request_like.uri.authority().map(Authority::as_str),
493            path = request_like.uri.path(),
494            key,
495            "response is stale: sending request upstream for validation"
496        );
497
498        // Revalidate the request
499        match request.send(headers).await {
500            Ok(response) if response.status() == StatusCode::OK => {
501                debug!(
502                    method = request_like.method.as_str(),
503                    scheme = request_like.uri.scheme_str(),
504                    authority = request_like.uri.authority().map(Authority::as_str),
505                    path = request_like.uri.path(),
506                    key,
507                    "server responded with a new response"
508                );
509
510                // The server responded with the body, the cached body is no longer valid
511                let policy = CachePolicy::new_options(
512                    &request_like,
513                    &response,
514                    SystemTime::now(),
515                    self.options,
516                );
517
518                let (parts, body) = response.into_parts();
519                match self
520                    .storage
521                    .put_with_body(&key, &parts, &policy, body)
522                    .await
523                {
524                    Ok((body, digest)) => {
525                        debug!(
526                            method = request_like.method.as_str(),
527                            scheme = request_like.uri.scheme_str(),
528                            authority = request_like.uri.authority().map(Authority::as_str),
529                            path = request_like.uri.path(),
530                            key,
531                            digest,
532                            "cache storage updated successfully"
533                        );
534
535                        // Response was stored, return the response with the storage key
536                        let mut response = Response::from_parts(parts, body);
537                        response.set_cache_status(CacheLookupStatus::Hit, CacheStatus::Miss);
538                        response.set_cache_digest(&digest);
539                        Ok(response)
540                    }
541                    Err(e) => {
542                        debug!(
543                            method = request_like.method.as_str(),
544                            scheme = request_like.uri.scheme_str(),
545                            authority = request_like.uri.authority().map(Authority::as_str),
546                            path = request_like.uri.path(),
547                            key,
548                            error = format!("{e:?}"),
549                            "failed to put response into cache storage"
550                        );
551                        Err(e)
552                    }
553                }
554            }
555            Ok(response) if response.status() == StatusCode::NOT_MODIFIED => {
556                debug!(
557                    method = request_like.method.as_str(),
558                    scheme = request_like.uri.scheme_str(),
559                    authority = request_like.uri.authority().map(Authority::as_str),
560                    path = request_like.uri.path(),
561                    key,
562                    "server responded with a not modified status"
563                );
564
565                // The server informed us that our response hasn't been modified
566                // Note that the response body for this code is always empty
567                match stored
568                    .policy
569                    .after_response(&request_like, &response, SystemTime::now())
570                {
571                    AfterResponse::Modified(..) => {
572                        debug!(
573                            method = request_like.method.as_str(),
574                            scheme = request_like.uri.scheme_str(),
575                            authority = request_like.uri.authority().map(Authority::as_str),
576                            path = request_like.uri.path(),
577                            key,
578                            "cached response was considered modified despite revalidation"
579                        );
580
581                        // This shouldn't happen as the server say it was unmodified, but
582                        // `http-cache-semantics` said it was
583                        bail!("cached response was considered modified despite revalidation");
584                    }
585                    AfterResponse::NotModified(policy, parts) => {
586                        stored.response.extend_headers(parts.headers);
587
588                        let (parts, body) = stored.response.into_parts();
589                        match self
590                            .storage
591                            .put(&key, &parts, &policy, &stored.digest)
592                            .await
593                        {
594                            Ok(_) => {
595                                debug!(
596                                    method = request_like.method.as_str(),
597                                    scheme = request_like.uri.scheme_str(),
598                                    authority = request_like.uri.authority().map(Authority::as_str),
599                                    path = request_like.uri.path(),
600                                    key,
601                                    digest = stored.digest,
602                                    "cache storage updated successfully"
603                                );
604
605                                // Response was stored and the body comes from storage
606                                let mut cached_response = Response::from_parts(parts, body);
607                                cached_response
608                                    .set_cache_status(CacheLookupStatus::Hit, CacheStatus::Hit);
609                                cached_response.set_cache_digest(&stored.digest);
610                                Ok(cached_response)
611                            }
612                            Err(e) => {
613                                debug!(
614                                    method = request_like.method.as_str(),
615                                    scheme = request_like.uri.scheme_str(),
616                                    authority = request_like.uri.authority().map(Authority::as_str),
617                                    path = request_like.uri.path(),
618                                    key,
619                                    error = format!("{e:?}"),
620                                    "failed to put response into cache storage"
621                                );
622                                Err(e)
623                            }
624                        }
625                    }
626                }
627            }
628            Ok(response)
629                if response.status().is_server_error() && !stored.response.must_revalidate() =>
630            {
631                Self::prepare_stale_response(
632                    &request_like.method,
633                    &request_like.uri,
634                    &key,
635                    &stored.digest,
636                    &mut stored.response,
637                );
638                Ok(stored.response)
639            }
640            Ok(mut response) => {
641                debug!(
642                    method = request_like.method.as_str(),
643                    scheme = request_like.uri.scheme_str(),
644                    authority = request_like.uri.authority().map(Authority::as_str),
645                    path = request_like.uri.path(),
646                    key,
647                    "failed to revalidate response: returning response from server uncached"
648                );
649
650                // Otherwise, don't serve the cached response at all
651                response.set_cache_status(CacheLookupStatus::Hit, CacheStatus::Miss);
652                Ok(response.map(|b| Body::from_upstream(b)))
653            }
654            Err(e) => {
655                if stored.response.must_revalidate() {
656                    Err(e)
657                } else {
658                    Self::prepare_stale_response(
659                        &request_like.method,
660                        &request_like.uri,
661                        &key,
662                        &stored.digest,
663                        &mut stored.response,
664                    );
665                    Ok(stored.response)
666                }
667            }
668        }
669    }
670
671    /// Prepares a stale response for sending back to the client.
672    fn prepare_stale_response<B>(
673        method: &Method,
674        uri: &Uri,
675        key: &str,
676        digest: &str,
677        response: &mut Response<Body<B>>,
678    ) {
679        debug!(
680            method = method.as_str(),
681            scheme = uri.scheme_str(),
682            authority = uri.authority().map(Authority::as_str),
683            path = uri.path(),
684            key,
685            digest,
686            "failed to revalidate response: serving potentially stale body from storage with a \
687             warning"
688        );
689
690        // If the server failed to give us a response, add the required warning to the
691        // cached response:
692        //   111 Revalidation failed
693        //   MUST be included if a cache returns a stale response
694        //   because an attempt to revalidate the response failed,
695        //   due to an inability to reach the server.
696        // (https://tools.ietf.org/html/rfc2616#section-14.46)
697        response.add_warning(uri, 111, "Revalidation failed");
698        response.set_cache_status(CacheLookupStatus::Hit, CacheStatus::Hit);
699        response.set_cache_digest(digest);
700    }
701}