tower_ready_cache/cache.rs
1//! A cache of services.
2
3use crate::error;
4use futures_core::Stream;
5use futures_util::stream::FuturesUnordered;
6pub use indexmap::Equivalent;
7use indexmap::IndexMap;
8use log::{debug, trace};
9use std::future::Future;
10use std::hash::Hash;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13use tokio::sync::oneshot;
14use tower_service::Service;
15
16/// Drives readiness over a set of services.
17///
18/// The cache maintains two internal data structures:
19///
20/// * a set of _pending_ services that have not yet become ready; and
21/// * a set of _ready_ services that have previously polled ready.
22///
23/// As each `S` typed `Service` is added to the cache via `ReadyCache::push`, it
24/// is added to the _pending set_. As `ReadyCache::poll_pending` is invoked,
25/// pending services are polled and added to the _ready set_.
26///
27/// `ReadyCache::call_ready` (or `ReadyCache::call_ready_index`) dispatches a
28/// request to the specified service, but panics if the specified service is not
29/// in the ready set. The `ReadyCache::check_*` functions can be used to ensure
30/// that a service is ready before dispatching a request.
31///
32/// The ready set can hold services for an abitrarily long time. During this
33/// time, the runtime may process events that invalidate that ready state (for
34/// instance, if a keepalive detects a lost connection). In such cases, callers
35/// should use `ReadyCache::check_ready` (or `ReadyCache::check_ready_index`)
36/// immediately before dispatching a request to ensure that the service has not
37/// become unavailable.
38///
39/// Once `ReadyCache::call_ready*` is invoked, the service is placed back into
40/// the _pending_ set to be driven to readiness again.
41///
42/// When `ReadyCache::check_ready*` returns `false`, it indicates that the
43/// specified service is _not_ ready. If an error is returned, this indicats that
44/// the server failed and has been removed from the cache entirely.
45///
46/// `ReadyCache::evict` can be used to remove a service from the cache (by key),
47/// though the service may not be dropped (if it is currently pending) until
48/// `ReadyCache::poll_pending` is invoked.
49///
50/// Note that the by-index accessors are provided to support use cases (like
51/// power-of-two-choices load balancing) where the caller does not care to keep
52/// track of each service's key. Instead, it needs only to access _some_ ready
53/// service. In such a case, it should be noted that calls to
54/// `ReadyCache::poll_pending` and `ReadyCache::evict` may perturb the order of
55/// the ready set, so any cached indexes should be discarded after such a call.
56#[derive(Debug)]
57pub struct ReadyCache<K, S, Req>
58where
59 K: Eq + Hash,
60{
61 /// A stream of services that are not yet ready.
62 pending: FuturesUnordered<Pending<K, S, Req>>,
63 /// An index of cancelation handles for pending streams.
64 pending_cancel_txs: IndexMap<K, CancelTx>,
65
66 /// Services that have previously become ready. Readiness can become stale,
67 /// so a given service should be polled immediately before use.
68 ///
69 /// The cancelation oneshot is preserved (though unused) while the service is
70 /// ready so that it need not be reallocated each time a request is
71 /// dispatched.
72 ready: IndexMap<K, (S, CancelPair)>,
73}
74
75// Safety: This is safe because we do not use `Pin::new_unchecked`.
76impl<S, K: Eq + Hash, Req> Unpin for ReadyCache<K, S, Req> {}
77
78type CancelRx = oneshot::Receiver<()>;
79type CancelTx = oneshot::Sender<()>;
80type CancelPair = (CancelTx, CancelRx);
81
82#[derive(Debug)]
83enum PendingError<K, E> {
84 Canceled(K),
85 Inner(K, E),
86}
87
88/// A Future that becomes satisfied when an `S`-typed service is ready.
89///
90/// May fail due to cancelation, i.e. if the service is evicted from the balancer.
91#[derive(Debug)]
92struct Pending<K, S, Req> {
93 key: Option<K>,
94 cancel: Option<CancelRx>,
95 ready: Option<S>,
96 _pd: std::marker::PhantomData<Req>,
97}
98
99// === ReadyCache ===
100
101impl<K, S, Req> Default for ReadyCache<K, S, Req>
102where
103 K: Eq + Hash,
104 S: Service<Req>,
105{
106 fn default() -> Self {
107 Self {
108 ready: IndexMap::default(),
109 pending: FuturesUnordered::new(),
110 pending_cancel_txs: IndexMap::default(),
111 }
112 }
113}
114
115impl<K, S, Req> ReadyCache<K, S, Req>
116where
117 K: Eq + Hash,
118{
119 /// Returns the total number of services in the cache.
120 pub fn len(&self) -> usize {
121 self.ready_len() + self.pending_len()
122 }
123
124 /// Returns the number of services in the ready set.
125 pub fn ready_len(&self) -> usize {
126 self.ready.len()
127 }
128
129 /// Returns the number of services in the unready set.
130 pub fn pending_len(&self) -> usize {
131 self.pending.len()
132 }
133
134 /// Returns true iff the given key is in the unready set.
135 pub fn pending_contains<Q: Hash + Equivalent<K>>(&self, key: &Q) -> bool {
136 self.pending_cancel_txs.contains_key(key)
137 }
138
139 /// Obtains a reference to a service in the ready set by key.
140 pub fn get_ready<Q: Hash + Equivalent<K>>(&self, key: &Q) -> Option<(usize, &K, &S)> {
141 self.ready.get_full(key).map(|(i, k, v)| (i, k, &v.0))
142 }
143
144 /// Obtains a mutable reference to a service in the ready set by key.
145 pub fn get_ready_mut<Q: Hash + Equivalent<K>>(
146 &mut self,
147 key: &Q,
148 ) -> Option<(usize, &K, &mut S)> {
149 self.ready
150 .get_full_mut(key)
151 .map(|(i, k, v)| (i, k, &mut v.0))
152 }
153
154 /// Obtains a reference to a service in the ready set by index.
155 pub fn get_ready_index(&self, idx: usize) -> Option<(&K, &S)> {
156 self.ready.get_index(idx).map(|(k, v)| (k, &v.0))
157 }
158
159 /// Obtains a mutable reference to a service in the ready set by index.
160 pub fn get_ready_index_mut(&mut self, idx: usize) -> Option<(&mut K, &mut S)> {
161 self.ready.get_index_mut(idx).map(|(k, v)| (k, &mut v.0))
162 }
163
164 /// Evicts an item from the cache.
165 ///
166 /// Returns true if a service was marked for eviction.
167 ///
168 /// Services are dropped from the ready set immediately. Services in the
169 /// pending set are marked for cancellation, but `ReadyCache::poll_pending`
170 /// must be called to cause the service to be dropped.
171 pub fn evict<Q: Hash + Equivalent<K>>(&mut self, key: &Q) -> bool {
172 let canceled = if let Some(c) = self.pending_cancel_txs.swap_remove(key) {
173 c.send(()).expect("cancel receiver lost");
174 true
175 } else {
176 false
177 };
178
179 self.ready
180 .swap_remove_full(key)
181 .map(|_| true)
182 .unwrap_or(canceled)
183 }
184}
185
186impl<K, S, Req> ReadyCache<K, S, Req>
187where
188 K: Clone + Eq + Hash,
189 S: Service<Req>,
190 <S as Service<Req>>::Error: Into<error::Error>,
191 S::Error: Into<error::Error>,
192{
193 /// Pushes a new service onto the pending set.
194 ///
195 /// The service will be promoted to the ready set as `poll_pending` is invoked.
196 ///
197 /// Note that this does **not** remove services from the ready set. Once the
198 /// old service is used, it will be dropped instead of being added back to
199 /// the pending set; OR, when the new service becomes ready, it will replace
200 /// the prior service in the ready set.
201 pub fn push(&mut self, key: K, svc: S) {
202 let cancel = oneshot::channel();
203 self.push_pending(key, svc, cancel);
204 }
205
206 fn push_pending(&mut self, key: K, svc: S, (cancel_tx, cancel_rx): CancelPair) {
207 if let Some(c) = self.pending_cancel_txs.insert(key.clone(), cancel_tx) {
208 // If there is already a service for this key, cancel it.
209 c.send(()).expect("cancel receiver lost");
210 }
211 self.pending.push(Pending {
212 key: Some(key),
213 cancel: Some(cancel_rx),
214 ready: Some(svc),
215 _pd: std::marker::PhantomData,
216 });
217 }
218
219 /// Polls services pending readiness, adding ready services to the ready set.
220 ///
221 /// Returns `Async::Ready` when there are no remaining unready services.
222 /// `poll_pending` should be called again after `push_service` or
223 /// `call_ready_index` are invoked.
224 ///
225 /// Failures indicate that an individual pending service failed to become
226 /// ready (and has been removed from the cache). In such a case,
227 /// `poll_pending` should typically be called again to continue driving
228 /// pending services to readiness.
229 pub fn poll_pending(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), error::Failed<K>>> {
230 loop {
231 match Pin::new(&mut self.pending).poll_next(cx) {
232 Poll::Pending => return Poll::Pending,
233 Poll::Ready(None) => return Poll::Ready(Ok(())),
234 Poll::Ready(Some(Ok((key, svc, cancel_rx)))) => {
235 trace!("endpoint ready");
236 let cancel_tx = self.pending_cancel_txs.swap_remove(&key);
237 if let Some(cancel_tx) = cancel_tx {
238 // Keep track of the cancelation so that it need not be
239 // recreated after the service is used.
240 self.ready.insert(key, (svc, (cancel_tx, cancel_rx)));
241 } else {
242 // This should not technically be possible. We must have decided to cancel
243 // a Service (by sending on the CancelTx), yet that same service then
244 // returns Ready. Since polling a Pending _first_ polls the CancelRx, that
245 // _should_ always see our CancelTx send. Yet empirically, that isn't true:
246 //
247 // https://github.com/tower-rs/tower/issues/415
248 //
249 // So, we instead detect the endpoint as canceled at this point. That
250 // should be fine, since the oneshot is only really there to ensure that
251 // the Pending is polled again anyway.
252 //
253 // We assert that this can't happen in debug mode so that hopefully one day
254 // we can find a test that triggers this reliably.
255 debug_assert!(cancel_tx.is_some());
256 debug!("canceled endpoint removed when ready");
257 }
258 }
259 Poll::Ready(Some(Err(PendingError::Canceled(_)))) => {
260 debug!("endpoint canceled");
261 // The cancellation for this service was removed in order to
262 // cause this cancellation.
263 }
264 Poll::Ready(Some(Err(PendingError::Inner(key, e)))) => {
265 let cancel_tx = self.pending_cancel_txs.swap_remove(&key);
266 if let Some(_) = cancel_tx {
267 return Err(error::Failed(key, e.into())).into();
268 } else {
269 // See comment for the same clause under Ready(Some(Ok)).
270 debug_assert!(cancel_tx.is_some());
271 debug!("canceled endpoint removed on error");
272 }
273 }
274 }
275 }
276 }
277
278 /// Checks whether the referenced endpoint is ready.
279 ///
280 /// Returns true if the endpoint is ready and false if it is not. An error is
281 /// returned if the endpoint fails.
282 pub fn check_ready<Q: Hash + Equivalent<K>>(
283 &mut self,
284 cx: &mut Context<'_>,
285 key: &Q,
286 ) -> Result<bool, error::Failed<K>> {
287 match self.ready.get_full_mut(key) {
288 Some((index, _, _)) => self.check_ready_index(cx, index),
289 None => Ok(false),
290 }
291 }
292
293 /// Checks whether the referenced endpoint is ready.
294 ///
295 /// If the service is no longer ready, it is moved back into the pending set
296 /// and `false` is returned.
297 ///
298 /// If the service errors, it is removed and dropped and the error is returned.
299 pub fn check_ready_index(
300 &mut self,
301 cx: &mut Context<'_>,
302 index: usize,
303 ) -> Result<bool, error::Failed<K>> {
304 let svc = match self.ready.get_index_mut(index) {
305 None => return Ok(false),
306 Some((_, (svc, _))) => svc,
307 };
308 match svc.poll_ready(cx) {
309 Poll::Ready(Ok(())) => Ok(true),
310 Poll::Pending => {
311 // became unready; so move it back there.
312 let (key, (svc, cancel)) = self
313 .ready
314 .swap_remove_index(index)
315 .expect("invalid ready index");
316
317 // If a new version of this service has been added to the
318 // unready set, don't overwrite it.
319 if !self.pending_contains(&key) {
320 self.push_pending(key, svc, cancel);
321 }
322
323 Ok(false)
324 }
325 Poll::Ready(Err(e)) => {
326 // failed, so drop it.
327 let (key, _) = self
328 .ready
329 .swap_remove_index(index)
330 .expect("invalid ready index");
331 Err(error::Failed(key, e.into()))
332 }
333 }
334 }
335
336 /// Calls a ready service by key.
337 ///
338 /// # Panics
339 ///
340 /// If the specified key does not exist in the ready
341 pub fn call_ready<Q: Hash + Equivalent<K>>(&mut self, key: &Q, req: Req) -> S::Future {
342 let (index, _, _) = self
343 .ready
344 .get_full_mut(key)
345 .expect("check_ready was not called");
346 self.call_ready_index(index, req)
347 }
348
349 /// Calls a ready service by index.
350 ///
351 /// # Panics
352 ///
353 /// If the specified index is out of range.
354 pub fn call_ready_index(&mut self, index: usize, req: Req) -> S::Future {
355 let (key, (mut svc, cancel)) = self
356 .ready
357 .swap_remove_index(index)
358 .expect("check_ready_index was not called");
359
360 let fut = svc.call(req);
361
362 // If a new version of this service has been added to the
363 // unready set, don't overwrite it.
364 if !self.pending_contains(&key) {
365 self.push_pending(key, svc, cancel);
366 }
367
368 fut
369 }
370}
371
372// === Pending ===
373
374// Safety: No use unsafe access therefore this is safe.
375impl<K, S, Req> Unpin for Pending<K, S, Req> {}
376
377impl<K, S, Req> Future for Pending<K, S, Req>
378where
379 S: Service<Req>,
380{
381 type Output = Result<(K, S, CancelRx), PendingError<K, S::Error>>;
382
383 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
384 let mut fut = self.cancel.as_mut().expect("polled after complete");
385 if let Poll::Ready(r) = Pin::new(&mut fut).poll(cx) {
386 assert!(r.is_ok(), "cancel sender lost");
387 let key = self.key.take().expect("polled after complete");
388 return Err(PendingError::Canceled(key)).into();
389 }
390
391 match self
392 .ready
393 .as_mut()
394 .expect("polled after ready")
395 .poll_ready(cx)
396 {
397 Poll::Pending => Poll::Pending,
398 Poll::Ready(Ok(())) => {
399 let key = self.key.take().expect("polled after complete");
400 let cancel = self.cancel.take().expect("polled after complete");
401 Ok((key, self.ready.take().expect("polled after ready"), cancel)).into()
402 }
403 Poll::Ready(Err(e)) => {
404 let key = self.key.take().expect("polled after compete");
405 Err(PendingError::Inner(key, e)).into()
406 }
407 }
408 }
409}