1use pin_project::pin_project;
2use std::{
3 future::Future,
4 mem::ManuallyDrop,
5 pin::Pin,
6 task::{Context, Poll},
7};
8use tower_service::Service;
9
10use crate::{
11 cache_provider::CacheProvider, CacheGetResponse, CacheGetResponseResult, EtagCacheResBody,
12 EtagCacheServiceError, PassthroughPredicate,
13};
14
15#[pin_project]
17pub struct EtagCacheServiceFuture<
18 ReqBody,
19 ResBody,
20 C: CacheProvider<ReqBody, ResBody>,
21 P: PassthroughPredicate,
22 S: Service<http::Request<ReqBody>, Response = http::Response<ResBody>>,
23> {
24 cache_provider: C,
25 passthrough_predicate: P,
26 inner: S,
27 #[pin]
28 state: EtagCacheServiceFutureState<ReqBody, ResBody, C, S>,
29}
30
31impl<
32 ReqBody,
33 ResBody,
34 C: CacheProvider<ReqBody, ResBody>,
35 P: PassthroughPredicate,
36 S: Service<http::Request<ReqBody>, Response = http::Response<ResBody>>,
37 > EtagCacheServiceFuture<ReqBody, ResBody, C, P, S>
38{
39 pub fn start(
40 cache_provider: C,
41 passthrough_predicate: P,
42 inner: S,
43 req: http::Request<ReqBody>,
44 ) -> Self {
45 Self {
46 cache_provider,
47 passthrough_predicate,
48 inner,
49 state: EtagCacheServiceFutureState::CacheGetBefore {
50 req: ManuallyDrop::new(req),
51 },
52 }
53 }
54
55 pub fn passthrough(
56 cache_provider: C,
57 passthrough_predicate: P,
58 inner: S,
59 req: http::Request<ReqBody>,
60 ) -> Self {
61 Self {
62 cache_provider,
63 passthrough_predicate,
64 inner,
65 state: EtagCacheServiceFutureState::InnerBefore {
66 key: None,
67 req: ManuallyDrop::new(req),
68 },
69 }
70 }
71}
72
73#[pin_project(project = EtagCacheServiceFutureStateProj)]
76enum EtagCacheServiceFutureState<
77 ReqBody,
78 ResBody,
79 C: CacheProvider<ReqBody, ResBody>,
80 S: Service<http::Request<ReqBody>, Response = http::Response<ResBody>>,
81> {
82 CacheGetBefore {
83 req: ManuallyDrop<http::Request<ReqBody>>,
84 },
85 CacheGet {
86 #[pin]
87 fut: <C as Service<http::Request<ReqBody>>>::Future,
88 },
89 InnerBefore {
90 key: Option<C::Key>,
92 req: ManuallyDrop<http::Request<ReqBody>>,
93 },
94 Inner {
95 key: Option<C::Key>,
97 #[pin]
98 fut: S::Future,
99 },
100 CachePutBefore {
101 key: ManuallyDrop<C::Key>,
102 resp: ManuallyDrop<http::Response<ResBody>>,
103 },
104 CachePut {
105 #[pin]
106 fut: <C as Service<(C::Key, http::Response<ResBody>)>>::Future,
107 },
108}
109
110impl<
111 ReqBody,
112 ResBody,
113 C: CacheProvider<ReqBody, ResBody>,
114 P: PassthroughPredicate,
115 S: Service<http::Request<ReqBody>, Response = http::Response<ResBody>>,
116 > Future for EtagCacheServiceFuture<ReqBody, ResBody, C, P, S>
117{
118 type Output = Result<
119 http::Response<EtagCacheResBody<ResBody, C::TResBody>>,
120 EtagCacheServiceError<
121 <C as Service<http::Request<ReqBody>>>::Error,
122 S::Error,
123 <C as Service<(C::Key, http::Response<ResBody>)>>::Error,
124 >,
125 >;
126
127 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
128 let this = self.project();
129 let mut curr_state = this.state;
130
131 match curr_state.as_mut().project() {
132 EtagCacheServiceFutureStateProj::CacheGetBefore { req } => {
133 match <C as Service<http::Request<ReqBody>>>::poll_ready(this.cache_provider, cx) {
134 Poll::Pending => Poll::Pending,
135 Poll::Ready(result) => {
136 if let Err(e) = result {
137 return Poll::Ready(Err(EtagCacheServiceError::CacheGetError(e)));
138 }
139 let fut = <C as Service<http::Request<ReqBody>>>::call(
140 this.cache_provider,
141 unsafe { ManuallyDrop::take(req) },
142 );
143 curr_state.set(EtagCacheServiceFutureState::CacheGet { fut });
144 cx.waker().wake_by_ref();
145 Poll::Pending
146 }
147 }
148 }
149 EtagCacheServiceFutureStateProj::CacheGet { fut } => match fut.poll(cx) {
150 Poll::Pending => Poll::Pending,
151 Poll::Ready(result) => {
152 let CacheGetResponse { req, result } = match result {
153 Ok(r) => r,
154 Err(e) => return Poll::Ready(Err(EtagCacheServiceError::CacheGetError(e))),
155 };
156 let key = match result {
157 CacheGetResponseResult::Hit(headers) => {
158 return Poll::Ready(
159 EtagCacheResBody::hit_resp(headers)
160 .map_err(EtagCacheServiceError::ResponseError),
161 );
162 }
163 CacheGetResponseResult::Miss(k) => k,
164 };
165 curr_state.set(EtagCacheServiceFutureState::InnerBefore {
166 key: Some(key),
167 req: ManuallyDrop::new(req),
168 });
169 cx.waker().wake_by_ref();
170 Poll::Pending
171 }
172 },
173 EtagCacheServiceFutureStateProj::InnerBefore { key, req } => {
174 match this.inner.poll_ready(cx) {
175 Poll::Pending => Poll::Pending,
176 Poll::Ready(result) => {
177 if let Err(e) = result {
178 return Poll::Ready(Err(EtagCacheServiceError::InnerError(e)));
179 }
180 let k = key.take();
181 let fut = this.inner.call(unsafe { ManuallyDrop::take(req) });
182 curr_state.set(EtagCacheServiceFutureState::Inner { fut, key: k });
183 cx.waker().wake_by_ref();
184 Poll::Pending
185 }
186 }
187 }
188 EtagCacheServiceFutureStateProj::Inner { key, fut } => match fut.poll(cx) {
189 Poll::Pending => Poll::Pending,
190 Poll::Ready(result) => {
191 let resp = match result {
192 Ok(r) => r,
193 Err(e) => return Poll::Ready(Err(EtagCacheServiceError::InnerError(e))),
194 };
195
196 if this.passthrough_predicate.should_passthrough_resp(&resp) {
197 return Poll::Ready(Ok(EtagCacheResBody::passthrough_resp(resp)));
198 }
199
200 let k = match key.take() {
201 Some(k) => k,
202 None => return Poll::Ready(Ok(EtagCacheResBody::passthrough_resp(resp))),
203 };
204 curr_state.set(EtagCacheServiceFutureState::CachePutBefore {
205 key: ManuallyDrop::new(k),
206 resp: ManuallyDrop::new(resp),
207 });
208 cx.waker().wake_by_ref();
209 Poll::Pending
210 }
211 },
212 EtagCacheServiceFutureStateProj::CachePutBefore { key, resp } => {
213 match <C as Service<(C::Key, http::Response<ResBody>)>>::poll_ready(
214 this.cache_provider,
215 cx,
216 ) {
217 Poll::Pending => Poll::Pending,
218 Poll::Ready(result) => {
219 if let Err(e) = result {
220 return Poll::Ready(Err(EtagCacheServiceError::CachePutError(e)));
221 }
222 let fut = <C as Service<(C::Key, http::Response<ResBody>)>>::call(
223 this.cache_provider,
224 (unsafe { ManuallyDrop::take(key) }, unsafe {
225 ManuallyDrop::take(resp)
226 }),
227 );
228 curr_state.set(EtagCacheServiceFutureState::CachePut { fut });
229 cx.waker().wake_by_ref();
230 Poll::Pending
231 }
232 }
233 }
234 EtagCacheServiceFutureStateProj::CachePut { fut } => match fut.poll(cx) {
235 Poll::Pending => Poll::Pending,
236 Poll::Ready(result) => Poll::Ready(
237 result
238 .map(EtagCacheResBody::miss_resp)
239 .map_err(EtagCacheServiceError::CachePutError),
240 ),
241 },
242 }
243 }
244}