kutil_http/tower/caching/
service.rs1use super::super::super::{
2 cache::{middleware::*, *},
3 headers::*,
4 transcoding::*,
5};
6
7use {
8 ::bytes::*,
9 http::{request::*, response::*},
10 http_body::*,
11 kutil_std::{capture_async, error::*, future::*},
12 std::{convert::*, mem, result::Result, sync::*, task::*},
13 tower::*,
14};
15
16pub struct CachingService<InnerServiceT, RequestBodyT, CacheT, CacheKeyT = CommonCacheKey>
24where
25 CacheT: Cache<CacheKeyT>,
26 CacheKeyT: CacheKey,
27{
28 inner_service: InnerServiceT,
29 caching: MiddlewareCachingConfiguration<RequestBodyT, CacheT, CacheKeyT>,
30 encoding: MiddlewareEncodingConfiguration,
31}
32
33impl<InnerServiceT, RequestBodyT, CacheT, CacheKeyT> CachingService<InnerServiceT, RequestBodyT, CacheT, CacheKeyT>
34where
35 CacheT: Cache<CacheKeyT>,
36 CacheKeyT: CacheKey,
37{
38 pub fn new(
40 inner_service: InnerServiceT,
41 caching: MiddlewareCachingConfiguration<RequestBodyT, CacheT, CacheKeyT>,
42 encoding: MiddlewareEncodingConfiguration,
43 ) -> Self {
44 assert!(caching.inner.min_body_size <= caching.inner.max_body_size);
45 Self { inner_service, caching: caching.clone(), encoding: encoding.clone() }
46 }
47
48 fn clone_and_keep_inner_service(&mut self) -> Self
52 where
53 InnerServiceT: Clone,
54 {
55 let mut clone = self.clone();
56 clone.inner_service = mem::replace(&mut self.inner_service, clone.inner_service);
57 clone
58 }
59
60 async fn handle<ResponseBodyT>(
62 mut self,
63 request: Request<RequestBodyT>,
64 ) -> Result<Response<TranscodingBody<ResponseBodyT>>, InnerServiceT::Error>
65 where
66 InnerServiceT: Service<Request<RequestBodyT>, Response = Response<ResponseBodyT>>,
67 ResponseBodyT: 'static + Body + From<Bytes> + Send + Unpin,
68 ResponseBodyT::Data: From<Bytes> + Send,
69 ResponseBodyT::Error: Into<CapturedError>,
70 {
71 if request.should_skip_cache(&self.caching) {
72 let uri = request.uri().clone();
74 let encoding = request.select_encoding(&self.encoding);
75 let content_length = request.headers().content_length();
76
77 return self.inner_service.call(request).await.map(|upstream_response| {
78 let (encoding, _skip_encoding) =
79 upstream_response.validate_encoding(&uri, encoding, content_length, &self.encoding);
80 upstream_response.with_transcoding_body(&encoding, self.encoding.inner.encodable_by_default)
81 });
82 }
83
84 let cache = self.caching.cache.clone().expect("cache should exist");
85 let cache_key = request.cache_key_with_hook(&self.caching);
86
87 match cache.get(&cache_key).await {
88 Some(cached_response) => Ok({
89 if modified(request.headers(), cached_response.headers()) {
90 tracing::debug!("hit");
91
92 cached_response
93 .to_transcoding_response(
94 &request.select_encoding(&self.encoding),
95 false,
96 cache,
97 cache_key,
98 &self.encoding.inner,
99 )
100 .await
101 } else {
102 tracing::debug!("hit (not modified)");
103
104 not_modified_transcoding_response()
105 }
106 }),
107
108 None => {
109 let uri = request.uri().clone();
111 let encoding = request.select_encoding(&self.encoding);
112
113 let upstream_response = self.inner_service.call(request).await?;
114
115 Ok({
116 let (skip_caching, content_length) = upstream_response.should_skip_cache(&uri, &self.caching);
117 let (encoding, skip_encoding) =
118 upstream_response.validate_encoding(&uri, encoding.clone(), content_length, &self.encoding);
119
120 if skip_caching {
121 upstream_response.with_transcoding_body(&encoding, self.encoding.inner.encodable_by_default)
122 } else {
123 tracing::debug!("miss");
124
125 match CachedResponse::new_for(
126 &uri,
127 upstream_response,
128 content_length,
129 encoding.clone(),
130 skip_encoding,
131 &self.caching.inner,
132 &self.encoding.inner,
133 )
134 .await
135 {
136 Ok(cached_response) => {
137 tracing::debug!("store ({})", encoding);
138 Arc::new(cached_response)
139 .to_transcoding_response(&encoding, true, cache, cache_key, &self.encoding.inner)
140 .await
141 }
142
143 Err(error) => match error.pieces {
144 Some(pieces) => {
145 tracing::debug!("skip ({})", error.error);
146 pieces.response.with_transcoding_body_with_first_bytes(
147 Some(pieces.first_bytes),
148 &encoding,
149 self.encoding.inner.encodable_by_default,
150 )
151 }
152
153 None => {
154 tracing::error!("could not create cache entry: {} {}", cache_key, error);
155 error_transcoding_response()
156 }
157 },
158 }
159 }
160 })
161 }
162 }
163 }
164}
165
166impl<InnerServiceT, RequestBodyT, CacheT, CacheKeyT> Clone
167 for CachingService<InnerServiceT, RequestBodyT, CacheT, CacheKeyT>
168where
169 InnerServiceT: Clone,
170 CacheT: Cache<CacheKeyT>,
171 CacheKeyT: CacheKey,
172{
173 fn clone(&self) -> Self {
174 Self {
175 inner_service: self.inner_service.clone(),
176 caching: self.caching.clone(),
177 encoding: self.encoding.clone(),
178 }
179 }
180}
181
182impl<InnerServiceT, RequestBodyT, ResponseBodyT, ErrorT, CacheT, CacheKeyT> Service<Request<RequestBodyT>>
183 for CachingService<InnerServiceT, RequestBodyT, CacheT, CacheKeyT>
184where
185 InnerServiceT:
186 'static + Service<Request<RequestBodyT>, Response = Response<ResponseBodyT>, Error = ErrorT> + Clone + Send,
187 InnerServiceT::Future: Send,
188 RequestBodyT: 'static + Send,
189 ResponseBodyT: 'static + Body + From<Bytes> + Send + Unpin,
190 ResponseBodyT::Data: From<Bytes> + Send,
191 ResponseBodyT::Error: Into<CapturedError>,
192 CacheT: Cache<CacheKeyT>,
193 CacheKeyT: CacheKey,
194{
195 type Response = Response<TranscodingBody<ResponseBodyT>>;
196 type Error = InnerServiceT::Error;
197 type Future = CapturedFuture<Result<Self::Response, Self::Error>>;
198
199 fn poll_ready(&mut self, context: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
200 self.inner_service.poll_ready(context)
204 }
205
206 fn call(&mut self, request: Request<RequestBodyT>) -> Self::Future {
207 let cloned_self = self.clone_and_keep_inner_service();
215 capture_async! { cloned_self.handle(request).await }
216 }
217}