1pub use self::internal::builder;
9
10#[cfg(docsrs)]
11pub use self::internal::Builder;
12#[cfg(docsrs)]
13pub use self::internal::Cache;
14#[cfg(docsrs)]
15pub use self::internal::Cached;
16
17mod internal {
21 use std::fmt;
22 use std::future::Future;
23 use std::pin::Pin;
24 use std::sync::{Arc, Mutex, Weak};
25 use std::task::{self, Poll};
26
27 use futures_core::ready;
28 use futures_util::future;
29 use tokio::sync::oneshot;
30 use tower_service::Service;
31
32 use super::events;
33
34 pub fn builder() -> Builder<events::Ignore> {
36 Builder {
37 events: events::Ignore,
38 }
39 }
40
41 #[derive(Debug)]
51 pub struct Cache<M, Dst, Ev>
52 where
53 M: Service<Dst>,
54 {
55 connector: M,
56 shared: Arc<Mutex<Shared<M::Response>>>,
57 events: Ev,
58 }
59
60 #[derive(Debug)]
68 pub struct Builder<Ev> {
69 events: Ev,
70 }
71
72 pub struct Cached<S> {
83 is_closed: bool,
84 inner: Option<S>,
85 shared: Weak<Mutex<Shared<S>>>,
86 }
88
89 pub enum CacheFuture<M, Dst, Ev>
90 where
91 M: Service<Dst>,
92 {
93 Racing {
94 shared: Arc<Mutex<Shared<M::Response>>>,
95 select: future::Select<oneshot::Receiver<M::Response>, M::Future>,
96 events: Ev,
97 },
98 Connecting {
99 shared: Arc<Mutex<Shared<M::Response>>>,
101 future: M::Future,
102 },
103 Cached {
104 svc: Option<Cached<M::Response>>,
105 },
106 }
107
108 #[derive(Debug)]
110 pub struct Shared<S> {
111 services: Vec<S>,
112 waiters: Vec<oneshot::Sender<S>>,
113 }
114
115 impl<Ev> Builder<Ev> {
118 pub fn executor<E>(self, exec: E) -> Builder<events::WithExecutor<E>> {
138 Builder {
139 events: events::WithExecutor(exec),
140 }
141 }
142
143 pub fn build<M, Dst>(self, connector: M) -> Cache<M, Dst, Ev>
145 where
146 M: Service<Dst>,
147 {
148 Cache {
149 connector,
150 events: self.events,
151 shared: Arc::new(Mutex::new(Shared {
152 services: Vec::new(),
153 waiters: Vec::new(),
154 })),
155 }
156 }
157 }
158
159 impl<M, Dst, Ev> Cache<M, Dst, Ev>
162 where
163 M: Service<Dst>,
164 {
165 pub fn retain<F>(&mut self, predicate: F)
167 where
168 F: FnMut(&mut M::Response) -> bool,
169 {
170 self.shared.lock().unwrap().services.retain_mut(predicate);
171 }
172
173 pub fn is_empty(&self) -> bool {
175 self.shared.lock().unwrap().services.is_empty()
176 }
177 }
178
179 impl<M, Dst, Ev> Service<Dst> for Cache<M, Dst, Ev>
180 where
181 M: Service<Dst>,
182 M::Future: Unpin,
183 M::Response: Unpin,
184 Ev: events::Events<BackgroundConnect<M::Future, M::Response>> + Clone + Unpin,
185 {
186 type Response = Cached<M::Response>;
187 type Error = M::Error;
188 type Future = CacheFuture<M, Dst, Ev>;
189
190 fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
191 if !self.shared.lock().unwrap().services.is_empty() {
192 Poll::Ready(Ok(()))
193 } else {
194 self.connector.poll_ready(cx)
195 }
196 }
197
198 fn call(&mut self, target: Dst) -> Self::Future {
199 let waiter = {
201 let mut locked = self.shared.lock().unwrap();
202 if let Some(found) = locked.take() {
203 return CacheFuture::Cached {
204 svc: Some(Cached::new(found, Arc::downgrade(&self.shared))),
205 };
206 }
207
208 let (tx, rx) = oneshot::channel();
209 locked.waiters.push(tx);
210 rx
211 };
212
213 CacheFuture::Racing {
216 shared: self.shared.clone(),
217 select: future::select(waiter, self.connector.call(target)),
218 events: self.events.clone(),
219 }
220 }
221 }
222
223 impl<M, Dst, Ev> Clone for Cache<M, Dst, Ev>
224 where
225 M: Service<Dst> + Clone,
226 Ev: Clone,
227 {
228 fn clone(&self) -> Self {
229 Self {
230 connector: self.connector.clone(),
231 events: self.events.clone(),
232 shared: self.shared.clone(),
233 }
234 }
235 }
236
237 impl<M, Dst, Ev> Future for CacheFuture<M, Dst, Ev>
238 where
239 M: Service<Dst>,
240 M::Future: Unpin,
241 M::Response: Unpin,
242 Ev: events::Events<BackgroundConnect<M::Future, M::Response>> + Unpin,
243 {
244 type Output = Result<Cached<M::Response>, M::Error>;
245
246 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
247 loop {
248 match &mut *self.as_mut() {
249 CacheFuture::Racing {
250 shared,
251 select,
252 events,
253 } => {
254 match ready!(Pin::new(select).poll(cx)) {
255 future::Either::Left((Err(_pool_closed), connecting)) => {
256 *self = CacheFuture::Connecting {
260 shared: shared.clone(),
261 future: connecting,
262 };
263 }
264 future::Either::Left((Ok(pool_got), connecting)) => {
265 events.on_race_lost(BackgroundConnect {
266 future: connecting,
267 shared: Arc::downgrade(&shared),
268 });
269 return Poll::Ready(Ok(Cached::new(
270 pool_got,
271 Arc::downgrade(&shared),
272 )));
273 }
274 future::Either::Right((connected, _waiter)) => {
275 let inner = connected?;
276 return Poll::Ready(Ok(Cached::new(
277 inner,
278 Arc::downgrade(&shared),
279 )));
280 }
281 }
282 }
283 CacheFuture::Connecting { shared, future } => {
284 let inner = ready!(Pin::new(future).poll(cx))?;
285 return Poll::Ready(Ok(Cached::new(inner, Arc::downgrade(&shared))));
286 }
287 CacheFuture::Cached { svc } => {
288 return Poll::Ready(Ok(svc.take().unwrap()));
289 }
290 }
291 }
292 }
293 }
294
295 impl<S> Cached<S> {
298 fn new(inner: S, shared: Weak<Mutex<Shared<S>>>) -> Self {
299 Cached {
300 is_closed: false,
301 inner: Some(inner),
302 shared,
303 }
304 }
305
306 pub fn inner(&self) -> &S {
310 self.inner.as_ref().expect("inner only taken in drop")
311 }
312
313 pub fn inner_mut(&mut self) -> &mut S {
315 self.inner.as_mut().expect("inner only taken in drop")
316 }
317 }
318
319 impl<S, Req> Service<Req> for Cached<S>
320 where
321 S: Service<Req>,
322 {
323 type Response = S::Response;
324 type Error = S::Error;
325 type Future = S::Future;
326
327 fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
328 self.inner.as_mut().unwrap().poll_ready(cx).map_err(|err| {
329 self.is_closed = true;
330 err
331 })
332 }
333
334 fn call(&mut self, req: Req) -> Self::Future {
335 self.inner.as_mut().unwrap().call(req)
336 }
337 }
338
339 impl<S> Drop for Cached<S> {
340 fn drop(&mut self) {
341 if self.is_closed {
342 return;
343 }
344 if let Some(value) = self.inner.take() {
345 if let Some(shared) = self.shared.upgrade() {
346 if let Ok(mut shared) = shared.lock() {
347 shared.put(value);
348 }
349 }
350 }
351 }
352 }
353
354 impl<S: fmt::Debug> fmt::Debug for Cached<S> {
355 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
356 f.debug_tuple("Cached")
357 .field(self.inner.as_ref().unwrap())
358 .finish()
359 }
360 }
361
362 impl<V> Shared<V> {
365 fn put(&mut self, val: V) {
366 let mut val = Some(val);
367 while let Some(tx) = self.waiters.pop() {
368 if !tx.is_closed() {
369 match tx.send(val.take().unwrap()) {
370 Ok(()) => break,
371 Err(v) => {
372 val = Some(v);
373 }
374 }
375 }
376 }
377
378 if let Some(val) = val {
379 self.services.push(val);
380 }
381 }
382
383 fn take(&mut self) -> Option<V> {
384 self.services.pop()
386 }
387 }
388
389 pub struct BackgroundConnect<CF, S> {
390 future: CF,
391 shared: Weak<Mutex<Shared<S>>>,
392 }
393
394 impl<CF, S, E> Future for BackgroundConnect<CF, S>
395 where
396 CF: Future<Output = Result<S, E>> + Unpin,
397 {
398 type Output = ();
399
400 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
401 match ready!(Pin::new(&mut self.future).poll(cx)) {
402 Ok(svc) => {
403 if let Some(shared) = self.shared.upgrade() {
404 if let Ok(mut locked) = shared.lock() {
405 locked.put(svc);
406 }
407 }
408 Poll::Ready(())
409 }
410 Err(_e) => Poll::Ready(()),
411 }
412 }
413 }
414}
415
416mod events {
417 #[derive(Clone, Debug)]
418 #[non_exhaustive]
419 pub struct Ignore;
420
421 #[derive(Clone, Debug)]
422 pub struct WithExecutor<E>(pub(super) E);
423
424 pub trait Events<CF> {
425 fn on_race_lost(&self, fut: CF);
426 }
427
428 impl<CF> Events<CF> for Ignore {
429 fn on_race_lost(&self, _fut: CF) {}
430 }
431
432 impl<E, CF> Events<CF> for WithExecutor<E>
433 where
434 E: hyper::rt::Executor<CF>,
435 {
436 fn on_race_lost(&self, fut: CF) {
437 self.0.execute(fut);
438 }
439 }
440}
441
442#[cfg(test)]
443mod tests {
444 use futures_util::future;
445 use tower_service::Service;
446 use tower_test::assert_request_eq;
447
448 #[tokio::test]
449 async fn test_makes_svc_when_empty() {
450 let (mock, mut handle) = tower_test::mock::pair();
451 let mut cache = super::builder().build(mock);
452 handle.allow(1);
453
454 crate::common::future::poll_fn(|cx| cache.poll_ready(cx))
455 .await
456 .unwrap();
457
458 let f = cache.call(1);
459
460 future::join(f, async move {
461 assert_request_eq!(handle, 1).send_response("one");
462 })
463 .await
464 .0
465 .expect("call");
466 }
467
468 #[tokio::test]
469 async fn test_reuses_after_idle() {
470 let (mock, mut handle) = tower_test::mock::pair();
471 let mut cache = super::builder().build(mock);
472
473 handle.allow(1);
475
476 crate::common::future::poll_fn(|cx| cache.poll_ready(cx))
477 .await
478 .unwrap();
479 let f = cache.call(1);
480 let cached = future::join(f, async {
481 assert_request_eq!(handle, 1).send_response("one");
482 })
483 .await
484 .0
485 .expect("call");
486 drop(cached);
487
488 crate::common::future::poll_fn(|cx| cache.poll_ready(cx))
489 .await
490 .unwrap();
491 let f = cache.call(1);
492 let cached = f.await.expect("call");
493 drop(cached);
494 }
495}