kutil_http/tower/caching/
service.rs

1use super::super::super::{
2    cache::{middleware::*, *},
3    headers::*,
4    transcoding::*,
5};
6
7use {
8    ::bytes::*,
9    http::{request::*, response::*},
10    http_body::*,
11    kutil_std::{capture_async, error::*, future::*},
12    std::{convert::*, mem, result::Result, sync::*, task::*},
13    tower::*,
14};
15
16//
17// CachingService
18//
19
20/// HTTP response caching service.
21///
22/// See [CachingLayer](super::layer::CachingLayer).
23pub struct CachingService<InnerServiceT, RequestBodyT, CacheT, CacheKeyT = CommonCacheKey>
24where
25    CacheT: Cache<CacheKeyT>,
26    CacheKeyT: CacheKey,
27{
28    inner_service: InnerServiceT,
29    caching: MiddlewareCachingConfiguration<RequestBodyT, CacheT, CacheKeyT>,
30    encoding: MiddlewareEncodingConfiguration,
31}
32
33impl<InnerServiceT, RequestBodyT, CacheT, CacheKeyT> CachingService<InnerServiceT, RequestBodyT, CacheT, CacheKeyT>
34where
35    CacheT: Cache<CacheKeyT>,
36    CacheKeyT: CacheKey,
37{
38    /// Constuctor.
39    pub fn new(
40        inner_service: InnerServiceT,
41        caching: MiddlewareCachingConfiguration<RequestBodyT, CacheT, CacheKeyT>,
42        encoding: MiddlewareEncodingConfiguration,
43    ) -> Self {
44        assert!(caching.inner.min_body_size <= caching.inner.max_body_size);
45        Self { inner_service, caching: caching.clone(), encoding: encoding.clone() }
46    }
47
48    // Clone while keeping `inner_service`.
49    //
50    // See: https://docs.rs/tower/latest/tower/trait.Service.html#be-careful-when-cloning-inner-services
51    fn clone_and_keep_inner_service(&mut self) -> Self
52    where
53        InnerServiceT: Clone,
54    {
55        let mut clone = self.clone();
56        clone.inner_service = mem::replace(&mut self.inner_service, clone.inner_service);
57        clone
58    }
59
60    // Handle request.
61    async fn handle<ResponseBodyT>(
62        mut self,
63        request: Request<RequestBodyT>,
64    ) -> Result<Response<TranscodingBody<ResponseBodyT>>, InnerServiceT::Error>
65    where
66        InnerServiceT: Service<Request<RequestBodyT>, Response = Response<ResponseBodyT>>,
67        ResponseBodyT: 'static + Body + From<Bytes> + Send + Unpin,
68        ResponseBodyT::Data: From<Bytes> + Send,
69        ResponseBodyT::Error: Into<CapturedError>,
70    {
71        if request.should_skip_cache(&self.caching) {
72            // Capture request data before moving the request to the inner service
73            let uri = request.uri().clone();
74            let encoding = request.select_encoding(&self.encoding);
75            let content_length = request.headers().content_length();
76
77            return self.inner_service.call(request).await.map(|upstream_response| {
78                let (encoding, _skip_encoding) =
79                    upstream_response.validate_encoding(&uri, encoding, content_length, &self.encoding);
80                upstream_response.with_transcoding_body(&encoding, self.encoding.inner.encodable_by_default)
81            });
82        }
83
84        let cache = self.caching.cache.clone().expect("cache should exist");
85        let cache_key = request.cache_key_with_hook(&self.caching);
86
87        match cache.get(&cache_key).await {
88            Some(cached_response) => Ok({
89                if modified(request.headers(), cached_response.headers()) {
90                    tracing::debug!("hit");
91
92                    cached_response
93                        .to_transcoding_response(
94                            &request.select_encoding(&self.encoding),
95                            false,
96                            cache,
97                            cache_key,
98                            &self.encoding.inner,
99                        )
100                        .await
101                } else {
102                    tracing::debug!("hit (not modified)");
103
104                    not_modified_transcoding_response()
105                }
106            }),
107
108            None => {
109                // Capture request data before moving the request to the inner service
110                let uri = request.uri().clone();
111                let encoding = request.select_encoding(&self.encoding);
112
113                let upstream_response = self.inner_service.call(request).await?;
114
115                Ok({
116                    let (skip_caching, content_length) = upstream_response.should_skip_cache(&uri, &self.caching);
117                    let (encoding, skip_encoding) =
118                        upstream_response.validate_encoding(&uri, encoding.clone(), content_length, &self.encoding);
119
120                    if skip_caching {
121                        upstream_response.with_transcoding_body(&encoding, self.encoding.inner.encodable_by_default)
122                    } else {
123                        tracing::debug!("miss");
124
125                        match CachedResponse::new_for(
126                            &uri,
127                            upstream_response,
128                            content_length,
129                            encoding.clone(),
130                            skip_encoding,
131                            &self.caching.inner,
132                            &self.encoding.inner,
133                        )
134                        .await
135                        {
136                            Ok(cached_response) => {
137                                tracing::debug!("store ({})", encoding);
138                                Arc::new(cached_response)
139                                    .to_transcoding_response(&encoding, true, cache, cache_key, &self.encoding.inner)
140                                    .await
141                            }
142
143                            Err(error) => match error.pieces {
144                                Some(pieces) => {
145                                    tracing::debug!("skip ({})", error.error);
146                                    pieces.response.with_transcoding_body_with_first_bytes(
147                                        Some(pieces.first_bytes),
148                                        &encoding,
149                                        self.encoding.inner.encodable_by_default,
150                                    )
151                                }
152
153                                None => {
154                                    tracing::error!("could not create cache entry: {} {}", cache_key, error);
155                                    error_transcoding_response()
156                                }
157                            },
158                        }
159                    }
160                })
161            }
162        }
163    }
164}
165
166impl<InnerServiceT, RequestBodyT, CacheT, CacheKeyT> Clone
167    for CachingService<InnerServiceT, RequestBodyT, CacheT, CacheKeyT>
168where
169    InnerServiceT: Clone,
170    CacheT: Cache<CacheKeyT>,
171    CacheKeyT: CacheKey,
172{
173    fn clone(&self) -> Self {
174        Self {
175            inner_service: self.inner_service.clone(),
176            caching: self.caching.clone(),
177            encoding: self.encoding.clone(),
178        }
179    }
180}
181
182impl<InnerServiceT, RequestBodyT, ResponseBodyT, ErrorT, CacheT, CacheKeyT> Service<Request<RequestBodyT>>
183    for CachingService<InnerServiceT, RequestBodyT, CacheT, CacheKeyT>
184where
185    InnerServiceT:
186        'static + Service<Request<RequestBodyT>, Response = Response<ResponseBodyT>, Error = ErrorT> + Clone + Send,
187    InnerServiceT::Future: Send,
188    RequestBodyT: 'static + Send,
189    ResponseBodyT: 'static + Body + From<Bytes> + Send + Unpin,
190    ResponseBodyT::Data: From<Bytes> + Send,
191    ResponseBodyT::Error: Into<CapturedError>,
192    CacheT: Cache<CacheKeyT>,
193    CacheKeyT: CacheKey,
194{
195    type Response = Response<TranscodingBody<ResponseBodyT>>;
196    type Error = InnerServiceT::Error;
197    type Future = CapturedFuture<Result<Self::Response, Self::Error>>;
198
199    fn poll_ready(&mut self, context: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
200        // Note that if we are using the cache, we technically don't have to depend on the inner
201        // service being poll_ready for us to be poll_ready, however Tower's design does not allow
202        // us to optimize here
203        self.inner_service.poll_ready(context)
204    }
205
206    fn call(&mut self, request: Request<RequestBodyT>) -> Self::Future {
207        // We unfortunately must clone the `&mut self` because it cannot be sent to the future as is;
208        //
209        // The worry is that we are cloning our inner service, too, which will clone *its* inner service,
210        // and so on... It can be a sizeable clone if there are many service layers
211        //
212        // But this seems to be standard practice in Tower due to its design!
213
214        let cloned_self = self.clone_and_keep_inner_service();
215        capture_async! { cloned_self.handle(request).await }
216    }
217}