hyper_util/client/pool/
cache.rs

1//! A cache of services
2//!
3//! The cache is a single list of cached services, bundled with a `MakeService`.
4//! Calling the cache returns either an existing service, or makes a new one.
5//! The returned `impl Service` can be used to send requests, and when dropped,
6//! it will try to be returned back to the cache.
7
8pub use self::internal::builder;
9
10#[cfg(docsrs)]
11pub use self::internal::Builder;
12#[cfg(docsrs)]
13pub use self::internal::Cache;
14#[cfg(docsrs)]
15pub use self::internal::Cached;
16
17// For now, nothing else in this module is nameable. We can always make things
18// more public, but we can't change type shapes (generics) once things are
19// public.
20mod internal {
21    use std::fmt;
22    use std::future::Future;
23    use std::pin::Pin;
24    use std::sync::{Arc, Mutex, Weak};
25    use std::task::{self, Poll};
26
27    use futures_core::ready;
28    use futures_util::future;
29    use tokio::sync::oneshot;
30    use tower_service::Service;
31
32    use super::events;
33
34    /// Start a builder to construct a `Cache` pool.
35    pub fn builder() -> Builder<events::Ignore> {
36        Builder {
37            events: events::Ignore,
38        }
39    }
40
41    /// A cache pool of services from the inner make service.
42    ///
43    /// Created with [`builder()`].
44    ///
45    /// # Unnameable
46    ///
47    /// This type is normally unnameable, forbidding naming of the type within
48    /// code. The type is exposed in the documentation to show which methods
49    /// can be publicly called.
50    #[derive(Debug)]
51    pub struct Cache<M, Dst, Ev>
52    where
53        M: Service<Dst>,
54    {
55        connector: M,
56        shared: Arc<Mutex<Shared<M::Response>>>,
57        events: Ev,
58    }
59
60    /// A builder to configure a `Cache`.
61    ///
62    /// # Unnameable
63    ///
64    /// This type is normally unnameable, forbidding naming of the type within
65    /// code. The type is exposed in the documentation to show which methods
66    /// can be publicly called.
67    #[derive(Debug)]
68    pub struct Builder<Ev> {
69        events: Ev,
70    }
71
72    /// A cached service returned from a [`Cache`].
73    ///
74    /// Implements `Service` by delegating to the inner service. Once dropped,
75    /// tries to reinsert into the `Cache`.
76    ///
77    /// # Unnameable
78    ///
79    /// This type is normally unnameable, forbidding naming of the type within
80    /// code. The type is exposed in the documentation to show which methods
81    /// can be publicly called.
82    pub struct Cached<S> {
83        is_closed: bool,
84        inner: Option<S>,
85        shared: Weak<Mutex<Shared<S>>>,
86        // todo: on_idle
87    }
88
89    pub enum CacheFuture<M, Dst, Ev>
90    where
91        M: Service<Dst>,
92    {
93        Racing {
94            shared: Arc<Mutex<Shared<M::Response>>>,
95            select: future::Select<oneshot::Receiver<M::Response>, M::Future>,
96            events: Ev,
97        },
98        Connecting {
99            // TODO: could be Weak even here...
100            shared: Arc<Mutex<Shared<M::Response>>>,
101            future: M::Future,
102        },
103        Cached {
104            svc: Option<Cached<M::Response>>,
105        },
106    }
107
108    // shouldn't be pub
109    #[derive(Debug)]
110    pub struct Shared<S> {
111        services: Vec<S>,
112        waiters: Vec<oneshot::Sender<S>>,
113    }
114
115    // impl Builder
116
117    impl<Ev> Builder<Ev> {
118        /// Provide a `Future` executor to be used by the `Cache`.
119        ///
120        /// The executor is used handle some optional background tasks that
121        /// can improve the behavior of the cache, such as reducing connection
122        /// thrashing when a race is won. If not configured with an executor,
123        /// the default behavior is to ignore any of these optional background
124        /// tasks.
125        ///
126        /// The executor should implmenent [`hyper::rt::Executor`].
127        ///
128        /// # Example
129        ///
130        /// ```rust
131        /// # #[cfg(feature = "tokio")]
132        /// # fn run() {
133        /// let builder = hyper_util::client::pool::cache::builder()
134        ///     .executor(hyper_util::rt::TokioExecutor::new());
135        /// # }
136        /// ```
137        pub fn executor<E>(self, exec: E) -> Builder<events::WithExecutor<E>> {
138            Builder {
139                events: events::WithExecutor(exec),
140            }
141        }
142
143        /// Build a `Cache` pool around the `connector`.
144        pub fn build<M, Dst>(self, connector: M) -> Cache<M, Dst, Ev>
145        where
146            M: Service<Dst>,
147        {
148            Cache {
149                connector,
150                events: self.events,
151                shared: Arc::new(Mutex::new(Shared {
152                    services: Vec::new(),
153                    waiters: Vec::new(),
154                })),
155            }
156        }
157    }
158
159    // impl Cache
160
161    impl<M, Dst, Ev> Cache<M, Dst, Ev>
162    where
163        M: Service<Dst>,
164    {
165        /// Retain all cached services indicated by the predicate.
166        pub fn retain<F>(&mut self, predicate: F)
167        where
168            F: FnMut(&mut M::Response) -> bool,
169        {
170            self.shared.lock().unwrap().services.retain_mut(predicate);
171        }
172
173        /// Check whether this cache has no cached services.
174        pub fn is_empty(&self) -> bool {
175            self.shared.lock().unwrap().services.is_empty()
176        }
177    }
178
179    impl<M, Dst, Ev> Service<Dst> for Cache<M, Dst, Ev>
180    where
181        M: Service<Dst>,
182        M::Future: Unpin,
183        M::Response: Unpin,
184        Ev: events::Events<BackgroundConnect<M::Future, M::Response>> + Clone + Unpin,
185    {
186        type Response = Cached<M::Response>;
187        type Error = M::Error;
188        type Future = CacheFuture<M, Dst, Ev>;
189
190        fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
191            if !self.shared.lock().unwrap().services.is_empty() {
192                Poll::Ready(Ok(()))
193            } else {
194                self.connector.poll_ready(cx)
195            }
196        }
197
198        fn call(&mut self, target: Dst) -> Self::Future {
199            // 1. If already cached, easy!
200            let waiter = {
201                let mut locked = self.shared.lock().unwrap();
202                if let Some(found) = locked.take() {
203                    return CacheFuture::Cached {
204                        svc: Some(Cached::new(found, Arc::downgrade(&self.shared))),
205                    };
206                }
207
208                let (tx, rx) = oneshot::channel();
209                locked.waiters.push(tx);
210                rx
211            };
212
213            // 2. Otherwise, we start a new connect, and also listen for
214            //    any newly idle.
215            CacheFuture::Racing {
216                shared: self.shared.clone(),
217                select: future::select(waiter, self.connector.call(target)),
218                events: self.events.clone(),
219            }
220        }
221    }
222
223    impl<M, Dst, Ev> Clone for Cache<M, Dst, Ev>
224    where
225        M: Service<Dst> + Clone,
226        Ev: Clone,
227    {
228        fn clone(&self) -> Self {
229            Self {
230                connector: self.connector.clone(),
231                events: self.events.clone(),
232                shared: self.shared.clone(),
233            }
234        }
235    }
236
237    impl<M, Dst, Ev> Future for CacheFuture<M, Dst, Ev>
238    where
239        M: Service<Dst>,
240        M::Future: Unpin,
241        M::Response: Unpin,
242        Ev: events::Events<BackgroundConnect<M::Future, M::Response>> + Unpin,
243    {
244        type Output = Result<Cached<M::Response>, M::Error>;
245
246        fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
247            loop {
248                match &mut *self.as_mut() {
249                    CacheFuture::Racing {
250                        shared,
251                        select,
252                        events,
253                    } => {
254                        match ready!(Pin::new(select).poll(cx)) {
255                            future::Either::Left((Err(_pool_closed), connecting)) => {
256                                // pool was dropped, so we'll never get it from a waiter,
257                                // but if this future still exists, then the user still
258                                // wants a connection. just wait for the connecting
259                                *self = CacheFuture::Connecting {
260                                    shared: shared.clone(),
261                                    future: connecting,
262                                };
263                            }
264                            future::Either::Left((Ok(pool_got), connecting)) => {
265                                events.on_race_lost(BackgroundConnect {
266                                    future: connecting,
267                                    shared: Arc::downgrade(&shared),
268                                });
269                                return Poll::Ready(Ok(Cached::new(
270                                    pool_got,
271                                    Arc::downgrade(&shared),
272                                )));
273                            }
274                            future::Either::Right((connected, _waiter)) => {
275                                let inner = connected?;
276                                return Poll::Ready(Ok(Cached::new(
277                                    inner,
278                                    Arc::downgrade(&shared),
279                                )));
280                            }
281                        }
282                    }
283                    CacheFuture::Connecting { shared, future } => {
284                        let inner = ready!(Pin::new(future).poll(cx))?;
285                        return Poll::Ready(Ok(Cached::new(inner, Arc::downgrade(&shared))));
286                    }
287                    CacheFuture::Cached { svc } => {
288                        return Poll::Ready(Ok(svc.take().unwrap()));
289                    }
290                }
291            }
292        }
293    }
294
295    // impl Cached
296
297    impl<S> Cached<S> {
298        fn new(inner: S, shared: Weak<Mutex<Shared<S>>>) -> Self {
299            Cached {
300                is_closed: false,
301                inner: Some(inner),
302                shared,
303            }
304        }
305
306        // TODO: inner()? looks like `tower` likes `get_ref()` and `get_mut()`.
307
308        /// Get a reference to the inner service.
309        pub fn inner(&self) -> &S {
310            self.inner.as_ref().expect("inner only taken in drop")
311        }
312
313        /// Get a mutable reference to the inner service.
314        pub fn inner_mut(&mut self) -> &mut S {
315            self.inner.as_mut().expect("inner only taken in drop")
316        }
317    }
318
319    impl<S, Req> Service<Req> for Cached<S>
320    where
321        S: Service<Req>,
322    {
323        type Response = S::Response;
324        type Error = S::Error;
325        type Future = S::Future;
326
327        fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
328            self.inner.as_mut().unwrap().poll_ready(cx).map_err(|err| {
329                self.is_closed = true;
330                err
331            })
332        }
333
334        fn call(&mut self, req: Req) -> Self::Future {
335            self.inner.as_mut().unwrap().call(req)
336        }
337    }
338
339    impl<S> Drop for Cached<S> {
340        fn drop(&mut self) {
341            if self.is_closed {
342                return;
343            }
344            if let Some(value) = self.inner.take() {
345                if let Some(shared) = self.shared.upgrade() {
346                    if let Ok(mut shared) = shared.lock() {
347                        shared.put(value);
348                    }
349                }
350            }
351        }
352    }
353
354    impl<S: fmt::Debug> fmt::Debug for Cached<S> {
355        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
356            f.debug_tuple("Cached")
357                .field(self.inner.as_ref().unwrap())
358                .finish()
359        }
360    }
361
362    // impl Shared
363
364    impl<V> Shared<V> {
365        fn put(&mut self, val: V) {
366            let mut val = Some(val);
367            while let Some(tx) = self.waiters.pop() {
368                if !tx.is_closed() {
369                    match tx.send(val.take().unwrap()) {
370                        Ok(()) => break,
371                        Err(v) => {
372                            val = Some(v);
373                        }
374                    }
375                }
376            }
377
378            if let Some(val) = val {
379                self.services.push(val);
380            }
381        }
382
383        fn take(&mut self) -> Option<V> {
384            // TODO: take in a loop
385            self.services.pop()
386        }
387    }
388
389    pub struct BackgroundConnect<CF, S> {
390        future: CF,
391        shared: Weak<Mutex<Shared<S>>>,
392    }
393
394    impl<CF, S, E> Future for BackgroundConnect<CF, S>
395    where
396        CF: Future<Output = Result<S, E>> + Unpin,
397    {
398        type Output = ();
399
400        fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
401            match ready!(Pin::new(&mut self.future).poll(cx)) {
402                Ok(svc) => {
403                    if let Some(shared) = self.shared.upgrade() {
404                        if let Ok(mut locked) = shared.lock() {
405                            locked.put(svc);
406                        }
407                    }
408                    Poll::Ready(())
409                }
410                Err(_e) => Poll::Ready(()),
411            }
412        }
413    }
414}
415
416mod events {
417    #[derive(Clone, Debug)]
418    #[non_exhaustive]
419    pub struct Ignore;
420
421    #[derive(Clone, Debug)]
422    pub struct WithExecutor<E>(pub(super) E);
423
424    pub trait Events<CF> {
425        fn on_race_lost(&self, fut: CF);
426    }
427
428    impl<CF> Events<CF> for Ignore {
429        fn on_race_lost(&self, _fut: CF) {}
430    }
431
432    impl<E, CF> Events<CF> for WithExecutor<E>
433    where
434        E: hyper::rt::Executor<CF>,
435    {
436        fn on_race_lost(&self, fut: CF) {
437            self.0.execute(fut);
438        }
439    }
440}
441
442#[cfg(test)]
443mod tests {
444    use futures_util::future;
445    use tower_service::Service;
446    use tower_test::assert_request_eq;
447
448    #[tokio::test]
449    async fn test_makes_svc_when_empty() {
450        let (mock, mut handle) = tower_test::mock::pair();
451        let mut cache = super::builder().build(mock);
452        handle.allow(1);
453
454        crate::common::future::poll_fn(|cx| cache.poll_ready(cx))
455            .await
456            .unwrap();
457
458        let f = cache.call(1);
459
460        future::join(f, async move {
461            assert_request_eq!(handle, 1).send_response("one");
462        })
463        .await
464        .0
465        .expect("call");
466    }
467
468    #[tokio::test]
469    async fn test_reuses_after_idle() {
470        let (mock, mut handle) = tower_test::mock::pair();
471        let mut cache = super::builder().build(mock);
472
473        // only 1 connection should ever be made
474        handle.allow(1);
475
476        crate::common::future::poll_fn(|cx| cache.poll_ready(cx))
477            .await
478            .unwrap();
479        let f = cache.call(1);
480        let cached = future::join(f, async {
481            assert_request_eq!(handle, 1).send_response("one");
482        })
483        .await
484        .0
485        .expect("call");
486        drop(cached);
487
488        crate::common::future::poll_fn(|cx| cache.poll_ready(cx))
489            .await
490            .unwrap();
491        let f = cache.call(1);
492        let cached = f.await.expect("call");
493        drop(cached);
494    }
495}