tower_etag_cache/
future.rs

1use pin_project::pin_project;
2use std::{
3    future::Future,
4    mem::ManuallyDrop,
5    pin::Pin,
6    task::{Context, Poll},
7};
8use tower_service::Service;
9
10use crate::{
11    cache_provider::CacheProvider, CacheGetResponse, CacheGetResponseResult, EtagCacheResBody,
12    EtagCacheServiceError, PassthroughPredicate,
13};
14
15/// `Future` struct returned by [`EtagCache::call`](crate::EtagCache::call)
16#[pin_project]
17pub struct EtagCacheServiceFuture<
18    ReqBody,
19    ResBody,
20    C: CacheProvider<ReqBody, ResBody>,
21    P: PassthroughPredicate,
22    S: Service<http::Request<ReqBody>, Response = http::Response<ResBody>>,
23> {
24    cache_provider: C,
25    passthrough_predicate: P,
26    inner: S,
27    #[pin]
28    state: EtagCacheServiceFutureState<ReqBody, ResBody, C, S>,
29}
30
31impl<
32        ReqBody,
33        ResBody,
34        C: CacheProvider<ReqBody, ResBody>,
35        P: PassthroughPredicate,
36        S: Service<http::Request<ReqBody>, Response = http::Response<ResBody>>,
37    > EtagCacheServiceFuture<ReqBody, ResBody, C, P, S>
38{
39    pub fn start(
40        cache_provider: C,
41        passthrough_predicate: P,
42        inner: S,
43        req: http::Request<ReqBody>,
44    ) -> Self {
45        Self {
46            cache_provider,
47            passthrough_predicate,
48            inner,
49            state: EtagCacheServiceFutureState::CacheGetBefore {
50                req: ManuallyDrop::new(req),
51            },
52        }
53    }
54
55    pub fn passthrough(
56        cache_provider: C,
57        passthrough_predicate: P,
58        inner: S,
59        req: http::Request<ReqBody>,
60    ) -> Self {
61        Self {
62            cache_provider,
63            passthrough_predicate,
64            inner,
65            state: EtagCacheServiceFutureState::InnerBefore {
66                key: None,
67                req: ManuallyDrop::new(req),
68            },
69        }
70    }
71}
72
73// Use ManuallyDrop to allow easy moving of fields behind Pin<&mut self> to the next state
74/// `Enum` used to build the [`EtagCache`](crate::EtagCache) `Service`'s state machine
75#[pin_project(project = EtagCacheServiceFutureStateProj)]
76enum EtagCacheServiceFutureState<
77    ReqBody,
78    ResBody,
79    C: CacheProvider<ReqBody, ResBody>,
80    S: Service<http::Request<ReqBody>, Response = http::Response<ResBody>>,
81> {
82    CacheGetBefore {
83        req: ManuallyDrop<http::Request<ReqBody>>,
84    },
85    CacheGet {
86        #[pin]
87        fut: <C as Service<http::Request<ReqBody>>>::Future,
88    },
89    InnerBefore {
90        /// None indicates req passthrough: only inner service is called
91        key: Option<C::Key>,
92        req: ManuallyDrop<http::Request<ReqBody>>,
93    },
94    Inner {
95        /// None indicates req passthrough: only inner service is called
96        key: Option<C::Key>,
97        #[pin]
98        fut: S::Future,
99    },
100    CachePutBefore {
101        key: ManuallyDrop<C::Key>,
102        resp: ManuallyDrop<http::Response<ResBody>>,
103    },
104    CachePut {
105        #[pin]
106        fut: <C as Service<(C::Key, http::Response<ResBody>)>>::Future,
107    },
108}
109
110impl<
111        ReqBody,
112        ResBody,
113        C: CacheProvider<ReqBody, ResBody>,
114        P: PassthroughPredicate,
115        S: Service<http::Request<ReqBody>, Response = http::Response<ResBody>>,
116    > Future for EtagCacheServiceFuture<ReqBody, ResBody, C, P, S>
117{
118    type Output = Result<
119        http::Response<EtagCacheResBody<ResBody, C::TResBody>>,
120        EtagCacheServiceError<
121            <C as Service<http::Request<ReqBody>>>::Error,
122            S::Error,
123            <C as Service<(C::Key, http::Response<ResBody>)>>::Error,
124        >,
125    >;
126
127    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
128        let this = self.project();
129        let mut curr_state = this.state;
130
131        match curr_state.as_mut().project() {
132            EtagCacheServiceFutureStateProj::CacheGetBefore { req } => {
133                match <C as Service<http::Request<ReqBody>>>::poll_ready(this.cache_provider, cx) {
134                    Poll::Pending => Poll::Pending,
135                    Poll::Ready(result) => {
136                        if let Err(e) = result {
137                            return Poll::Ready(Err(EtagCacheServiceError::CacheGetError(e)));
138                        }
139                        let fut = <C as Service<http::Request<ReqBody>>>::call(
140                            this.cache_provider,
141                            unsafe { ManuallyDrop::take(req) },
142                        );
143                        curr_state.set(EtagCacheServiceFutureState::CacheGet { fut });
144                        cx.waker().wake_by_ref();
145                        Poll::Pending
146                    }
147                }
148            }
149            EtagCacheServiceFutureStateProj::CacheGet { fut } => match fut.poll(cx) {
150                Poll::Pending => Poll::Pending,
151                Poll::Ready(result) => {
152                    let CacheGetResponse { req, result } = match result {
153                        Ok(r) => r,
154                        Err(e) => return Poll::Ready(Err(EtagCacheServiceError::CacheGetError(e))),
155                    };
156                    let key = match result {
157                        CacheGetResponseResult::Hit(headers) => {
158                            return Poll::Ready(
159                                EtagCacheResBody::hit_resp(headers)
160                                    .map_err(EtagCacheServiceError::ResponseError),
161                            );
162                        }
163                        CacheGetResponseResult::Miss(k) => k,
164                    };
165                    curr_state.set(EtagCacheServiceFutureState::InnerBefore {
166                        key: Some(key),
167                        req: ManuallyDrop::new(req),
168                    });
169                    cx.waker().wake_by_ref();
170                    Poll::Pending
171                }
172            },
173            EtagCacheServiceFutureStateProj::InnerBefore { key, req } => {
174                match this.inner.poll_ready(cx) {
175                    Poll::Pending => Poll::Pending,
176                    Poll::Ready(result) => {
177                        if let Err(e) = result {
178                            return Poll::Ready(Err(EtagCacheServiceError::InnerError(e)));
179                        }
180                        let k = key.take();
181                        let fut = this.inner.call(unsafe { ManuallyDrop::take(req) });
182                        curr_state.set(EtagCacheServiceFutureState::Inner { fut, key: k });
183                        cx.waker().wake_by_ref();
184                        Poll::Pending
185                    }
186                }
187            }
188            EtagCacheServiceFutureStateProj::Inner { key, fut } => match fut.poll(cx) {
189                Poll::Pending => Poll::Pending,
190                Poll::Ready(result) => {
191                    let resp = match result {
192                        Ok(r) => r,
193                        Err(e) => return Poll::Ready(Err(EtagCacheServiceError::InnerError(e))),
194                    };
195
196                    if this.passthrough_predicate.should_passthrough_resp(&resp) {
197                        return Poll::Ready(Ok(EtagCacheResBody::passthrough_resp(resp)));
198                    }
199
200                    let k = match key.take() {
201                        Some(k) => k,
202                        None => return Poll::Ready(Ok(EtagCacheResBody::passthrough_resp(resp))),
203                    };
204                    curr_state.set(EtagCacheServiceFutureState::CachePutBefore {
205                        key: ManuallyDrop::new(k),
206                        resp: ManuallyDrop::new(resp),
207                    });
208                    cx.waker().wake_by_ref();
209                    Poll::Pending
210                }
211            },
212            EtagCacheServiceFutureStateProj::CachePutBefore { key, resp } => {
213                match <C as Service<(C::Key, http::Response<ResBody>)>>::poll_ready(
214                    this.cache_provider,
215                    cx,
216                ) {
217                    Poll::Pending => Poll::Pending,
218                    Poll::Ready(result) => {
219                        if let Err(e) = result {
220                            return Poll::Ready(Err(EtagCacheServiceError::CachePutError(e)));
221                        }
222                        let fut = <C as Service<(C::Key, http::Response<ResBody>)>>::call(
223                            this.cache_provider,
224                            (unsafe { ManuallyDrop::take(key) }, unsafe {
225                                ManuallyDrop::take(resp)
226                            }),
227                        );
228                        curr_state.set(EtagCacheServiceFutureState::CachePut { fut });
229                        cx.waker().wake_by_ref();
230                        Poll::Pending
231                    }
232                }
233            }
234            EtagCacheServiceFutureStateProj::CachePut { fut } => match fut.poll(cx) {
235                Poll::Pending => Poll::Pending,
236                Poll::Ready(result) => Poll::Ready(
237                    result
238                        .map(EtagCacheResBody::miss_resp)
239                        .map_err(EtagCacheServiceError::CachePutError),
240                ),
241            },
242        }
243    }
244}