tower_ready_cache/
cache.rs

1//! A cache of services.
2
3use crate::error;
4use futures_core::Stream;
5use futures_util::stream::FuturesUnordered;
6pub use indexmap::Equivalent;
7use indexmap::IndexMap;
8use log::{debug, trace};
9use std::future::Future;
10use std::hash::Hash;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13use tokio::sync::oneshot;
14use tower_service::Service;
15
16/// Drives readiness over a set of services.
17///
18/// The cache maintains two internal data structures:
19///
20/// * a set of  _pending_ services that have not yet become ready; and
21/// * a set of _ready_ services that have previously polled ready.
22///
23/// As each `S` typed `Service` is added to the cache via `ReadyCache::push`, it
24/// is added to the _pending set_. As `ReadyCache::poll_pending` is invoked,
25/// pending services are polled and added to the _ready set_.
26///
27/// `ReadyCache::call_ready` (or `ReadyCache::call_ready_index`) dispatches a
28/// request to the specified service, but panics if the specified service is not
29/// in the ready set. The `ReadyCache::check_*` functions can be used to ensure
30/// that a service is ready before dispatching a request.
31///
32/// The ready set can hold services for an abitrarily long time. During this
33/// time, the runtime may process events that invalidate that ready state (for
34/// instance, if a keepalive detects a lost connection). In such cases, callers
35/// should use `ReadyCache::check_ready` (or `ReadyCache::check_ready_index`)
36/// immediately before dispatching a request to ensure that the service has not
37/// become unavailable.
38///
39/// Once `ReadyCache::call_ready*` is invoked, the service is placed back into
40/// the _pending_ set to be driven to readiness again.
41///
42/// When `ReadyCache::check_ready*` returns `false`, it indicates that the
43/// specified service is _not_ ready. If an error is returned, this indicats that
44/// the server failed and has been removed from the cache entirely.
45///
46/// `ReadyCache::evict` can be used to remove a service from the cache (by key),
47/// though the service may not be dropped (if it is currently pending) until
48/// `ReadyCache::poll_pending` is invoked.
49///
50/// Note that the by-index accessors are provided to support use cases (like
51/// power-of-two-choices load balancing) where the caller does not care to keep
52/// track of each service's key. Instead, it needs only to access _some_ ready
53/// service. In such a case, it should be noted that calls to
54/// `ReadyCache::poll_pending` and `ReadyCache::evict` may perturb the order of
55/// the ready set, so any cached indexes should be discarded after such a call.
56#[derive(Debug)]
57pub struct ReadyCache<K, S, Req>
58where
59    K: Eq + Hash,
60{
61    /// A stream of services that are not yet ready.
62    pending: FuturesUnordered<Pending<K, S, Req>>,
63    /// An index of cancelation handles for pending streams.
64    pending_cancel_txs: IndexMap<K, CancelTx>,
65
66    /// Services that have previously become ready. Readiness can become stale,
67    /// so a given service should be polled immediately before use.
68    ///
69    /// The cancelation oneshot is preserved (though unused) while the service is
70    /// ready so that it need not be reallocated each time a request is
71    /// dispatched.
72    ready: IndexMap<K, (S, CancelPair)>,
73}
74
75// Safety: This is safe because we do not use `Pin::new_unchecked`.
76impl<S, K: Eq + Hash, Req> Unpin for ReadyCache<K, S, Req> {}
77
78type CancelRx = oneshot::Receiver<()>;
79type CancelTx = oneshot::Sender<()>;
80type CancelPair = (CancelTx, CancelRx);
81
82#[derive(Debug)]
83enum PendingError<K, E> {
84    Canceled(K),
85    Inner(K, E),
86}
87
88/// A Future that becomes satisfied when an `S`-typed service is ready.
89///
90/// May fail due to cancelation, i.e. if the service is evicted from the balancer.
91#[derive(Debug)]
92struct Pending<K, S, Req> {
93    key: Option<K>,
94    cancel: Option<CancelRx>,
95    ready: Option<S>,
96    _pd: std::marker::PhantomData<Req>,
97}
98
99// === ReadyCache ===
100
101impl<K, S, Req> Default for ReadyCache<K, S, Req>
102where
103    K: Eq + Hash,
104    S: Service<Req>,
105{
106    fn default() -> Self {
107        Self {
108            ready: IndexMap::default(),
109            pending: FuturesUnordered::new(),
110            pending_cancel_txs: IndexMap::default(),
111        }
112    }
113}
114
115impl<K, S, Req> ReadyCache<K, S, Req>
116where
117    K: Eq + Hash,
118{
119    /// Returns the total number of services in the cache.
120    pub fn len(&self) -> usize {
121        self.ready_len() + self.pending_len()
122    }
123
124    /// Returns the number of services in the ready set.
125    pub fn ready_len(&self) -> usize {
126        self.ready.len()
127    }
128
129    /// Returns the number of services in the unready set.
130    pub fn pending_len(&self) -> usize {
131        self.pending.len()
132    }
133
134    /// Returns true iff the given key is in the unready set.
135    pub fn pending_contains<Q: Hash + Equivalent<K>>(&self, key: &Q) -> bool {
136        self.pending_cancel_txs.contains_key(key)
137    }
138
139    /// Obtains a reference to a service in the ready set by key.
140    pub fn get_ready<Q: Hash + Equivalent<K>>(&self, key: &Q) -> Option<(usize, &K, &S)> {
141        self.ready.get_full(key).map(|(i, k, v)| (i, k, &v.0))
142    }
143
144    /// Obtains a mutable reference to a service in the ready set by key.
145    pub fn get_ready_mut<Q: Hash + Equivalent<K>>(
146        &mut self,
147        key: &Q,
148    ) -> Option<(usize, &K, &mut S)> {
149        self.ready
150            .get_full_mut(key)
151            .map(|(i, k, v)| (i, k, &mut v.0))
152    }
153
154    /// Obtains a reference to a service in the ready set by index.
155    pub fn get_ready_index(&self, idx: usize) -> Option<(&K, &S)> {
156        self.ready.get_index(idx).map(|(k, v)| (k, &v.0))
157    }
158
159    /// Obtains a mutable reference to a service in the ready set by index.
160    pub fn get_ready_index_mut(&mut self, idx: usize) -> Option<(&mut K, &mut S)> {
161        self.ready.get_index_mut(idx).map(|(k, v)| (k, &mut v.0))
162    }
163
164    /// Evicts an item from the cache.
165    ///
166    /// Returns true if a service was marked for eviction.
167    ///
168    /// Services are dropped from the ready set immediately. Services in the
169    /// pending set are marked for cancellation, but `ReadyCache::poll_pending`
170    /// must be called to cause the service to be dropped.
171    pub fn evict<Q: Hash + Equivalent<K>>(&mut self, key: &Q) -> bool {
172        let canceled = if let Some(c) = self.pending_cancel_txs.swap_remove(key) {
173            c.send(()).expect("cancel receiver lost");
174            true
175        } else {
176            false
177        };
178
179        self.ready
180            .swap_remove_full(key)
181            .map(|_| true)
182            .unwrap_or(canceled)
183    }
184}
185
186impl<K, S, Req> ReadyCache<K, S, Req>
187where
188    K: Clone + Eq + Hash,
189    S: Service<Req>,
190    <S as Service<Req>>::Error: Into<error::Error>,
191    S::Error: Into<error::Error>,
192{
193    /// Pushes a new service onto the pending set.
194    ///
195    /// The service will be promoted to the ready set as `poll_pending` is invoked.
196    ///
197    /// Note that this does **not** remove services from the ready set. Once the
198    /// old service is used, it will be dropped instead of being added back to
199    /// the pending set; OR, when the new service becomes ready, it will replace
200    /// the prior service in the ready set.
201    pub fn push(&mut self, key: K, svc: S) {
202        let cancel = oneshot::channel();
203        self.push_pending(key, svc, cancel);
204    }
205
206    fn push_pending(&mut self, key: K, svc: S, (cancel_tx, cancel_rx): CancelPair) {
207        if let Some(c) = self.pending_cancel_txs.insert(key.clone(), cancel_tx) {
208            // If there is already a service for this key, cancel it.
209            c.send(()).expect("cancel receiver lost");
210        }
211        self.pending.push(Pending {
212            key: Some(key),
213            cancel: Some(cancel_rx),
214            ready: Some(svc),
215            _pd: std::marker::PhantomData,
216        });
217    }
218
219    /// Polls services pending readiness, adding ready services to the ready set.
220    ///
221    /// Returns `Async::Ready` when there are no remaining unready services.
222    /// `poll_pending` should be called again after `push_service` or
223    /// `call_ready_index` are invoked.
224    ///
225    /// Failures indicate that an individual pending service failed to become
226    /// ready (and has been removed from the cache). In such a case,
227    /// `poll_pending` should typically be called again to continue driving
228    /// pending services to readiness.
229    pub fn poll_pending(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), error::Failed<K>>> {
230        loop {
231            match Pin::new(&mut self.pending).poll_next(cx) {
232                Poll::Pending => return Poll::Pending,
233                Poll::Ready(None) => return Poll::Ready(Ok(())),
234                Poll::Ready(Some(Ok((key, svc, cancel_rx)))) => {
235                    trace!("endpoint ready");
236                    let cancel_tx = self.pending_cancel_txs.swap_remove(&key);
237                    if let Some(cancel_tx) = cancel_tx {
238                        // Keep track of the cancelation so that it need not be
239                        // recreated after the service is used.
240                        self.ready.insert(key, (svc, (cancel_tx, cancel_rx)));
241                    } else {
242                        // This should not technically be possible. We must have decided to cancel
243                        // a Service (by sending on the CancelTx), yet that same service then
244                        // returns Ready. Since polling a Pending _first_ polls the CancelRx, that
245                        // _should_ always see our CancelTx send. Yet empirically, that isn't true:
246                        //
247                        //   https://github.com/tower-rs/tower/issues/415
248                        //
249                        // So, we instead detect the endpoint as canceled at this point. That
250                        // should be fine, since the oneshot is only really there to ensure that
251                        // the Pending is polled again anyway.
252                        //
253                        // We assert that this can't happen in debug mode so that hopefully one day
254                        // we can find a test that triggers this reliably.
255                        debug_assert!(cancel_tx.is_some());
256                        debug!("canceled endpoint removed when ready");
257                    }
258                }
259                Poll::Ready(Some(Err(PendingError::Canceled(_)))) => {
260                    debug!("endpoint canceled");
261                    // The cancellation for this service was removed in order to
262                    // cause this cancellation.
263                }
264                Poll::Ready(Some(Err(PendingError::Inner(key, e)))) => {
265                    let cancel_tx = self.pending_cancel_txs.swap_remove(&key);
266                    if let Some(_) = cancel_tx {
267                        return Err(error::Failed(key, e.into())).into();
268                    } else {
269                        // See comment for the same clause under Ready(Some(Ok)).
270                        debug_assert!(cancel_tx.is_some());
271                        debug!("canceled endpoint removed on error");
272                    }
273                }
274            }
275        }
276    }
277
278    /// Checks whether the referenced endpoint is ready.
279    ///
280    /// Returns true if the endpoint is ready and false if it is not. An error is
281    /// returned if the endpoint fails.
282    pub fn check_ready<Q: Hash + Equivalent<K>>(
283        &mut self,
284        cx: &mut Context<'_>,
285        key: &Q,
286    ) -> Result<bool, error::Failed<K>> {
287        match self.ready.get_full_mut(key) {
288            Some((index, _, _)) => self.check_ready_index(cx, index),
289            None => Ok(false),
290        }
291    }
292
293    /// Checks whether the referenced endpoint is ready.
294    ///
295    /// If the service is no longer ready, it is moved back into the pending set
296    /// and `false` is returned.
297    ///
298    /// If the service errors, it is removed and dropped and the error is returned.
299    pub fn check_ready_index(
300        &mut self,
301        cx: &mut Context<'_>,
302        index: usize,
303    ) -> Result<bool, error::Failed<K>> {
304        let svc = match self.ready.get_index_mut(index) {
305            None => return Ok(false),
306            Some((_, (svc, _))) => svc,
307        };
308        match svc.poll_ready(cx) {
309            Poll::Ready(Ok(())) => Ok(true),
310            Poll::Pending => {
311                // became unready; so move it back there.
312                let (key, (svc, cancel)) = self
313                    .ready
314                    .swap_remove_index(index)
315                    .expect("invalid ready index");
316
317                // If a new version of this service has been added to the
318                // unready set, don't overwrite it.
319                if !self.pending_contains(&key) {
320                    self.push_pending(key, svc, cancel);
321                }
322
323                Ok(false)
324            }
325            Poll::Ready(Err(e)) => {
326                // failed, so drop it.
327                let (key, _) = self
328                    .ready
329                    .swap_remove_index(index)
330                    .expect("invalid ready index");
331                Err(error::Failed(key, e.into()))
332            }
333        }
334    }
335
336    /// Calls a ready service by key.
337    ///
338    /// # Panics
339    ///
340    /// If the specified key does not exist in the ready
341    pub fn call_ready<Q: Hash + Equivalent<K>>(&mut self, key: &Q, req: Req) -> S::Future {
342        let (index, _, _) = self
343            .ready
344            .get_full_mut(key)
345            .expect("check_ready was not called");
346        self.call_ready_index(index, req)
347    }
348
349    /// Calls a ready service by index.
350    ///
351    /// # Panics
352    ///
353    /// If the specified index is out of range.
354    pub fn call_ready_index(&mut self, index: usize, req: Req) -> S::Future {
355        let (key, (mut svc, cancel)) = self
356            .ready
357            .swap_remove_index(index)
358            .expect("check_ready_index was not called");
359
360        let fut = svc.call(req);
361
362        // If a new version of this service has been added to the
363        // unready set, don't overwrite it.
364        if !self.pending_contains(&key) {
365            self.push_pending(key, svc, cancel);
366        }
367
368        fut
369    }
370}
371
372// === Pending ===
373
374// Safety: No use unsafe access therefore this is safe.
375impl<K, S, Req> Unpin for Pending<K, S, Req> {}
376
377impl<K, S, Req> Future for Pending<K, S, Req>
378where
379    S: Service<Req>,
380{
381    type Output = Result<(K, S, CancelRx), PendingError<K, S::Error>>;
382
383    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
384        let mut fut = self.cancel.as_mut().expect("polled after complete");
385        if let Poll::Ready(r) = Pin::new(&mut fut).poll(cx) {
386            assert!(r.is_ok(), "cancel sender lost");
387            let key = self.key.take().expect("polled after complete");
388            return Err(PendingError::Canceled(key)).into();
389        }
390
391        match self
392            .ready
393            .as_mut()
394            .expect("polled after ready")
395            .poll_ready(cx)
396        {
397            Poll::Pending => Poll::Pending,
398            Poll::Ready(Ok(())) => {
399                let key = self.key.take().expect("polled after complete");
400                let cancel = self.cancel.take().expect("polled after complete");
401                Ok((key, self.ready.take().expect("polled after ready"), cancel)).into()
402            }
403            Poll::Ready(Err(e)) => {
404                let key = self.key.take().expect("polled after compete");
405                Err(PendingError::Inner(key, e)).into()
406            }
407        }
408    }
409}