1#![allow(dead_code)]
2
3use std::collections::{HashMap, HashSet, VecDeque};
4use std::convert::Infallible;
5use std::error::Error as StdError;
6use std::fmt::{self, Debug};
7use std::future::Future;
8use std::hash::Hash;
9use std::ops::{Deref, DerefMut};
10use std::pin::Pin;
11use std::sync::{Arc, Mutex, Weak};
12use std::task::{self, Poll};
13
14use std::time::Duration;
15use wasmtimer::{std::Instant, tokio::Sleep};
16
17use futures_channel::oneshot;
18use futures_util::ready;
19use tracing::{debug, trace};
20
21use crate::common::{exec, exec::Exec, timer::{Timer, TimerTrait}};
22
23#[allow(missing_debug_implementations)]
25pub struct Pool<T, K: Key> {
26 inner: Option<Arc<Mutex<PoolInner<T, K>>>>,
28}
29
30pub trait Poolable: Unpin + Send + Sized + 'static {
36 fn is_open(&self) -> bool;
37 fn reserve(self) -> Reservation<Self>;
41 fn can_share(&self) -> bool;
42}
43
44pub trait Key: Eq + Hash + Clone + Debug + Unpin + Send + 'static {}
45
46impl<T> Key for T where T: Eq + Hash + Clone + Debug + Unpin + Send + 'static {}
47
48#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
50#[allow(dead_code)]
51pub enum Ver {
52 Auto,
53 Http2,
54}
55
56#[allow(missing_debug_implementations)]
63pub enum Reservation<T> {
64 #[cfg(feature = "http2")]
68 Shared(T, T),
69 Unique(T),
72}
73
74struct PoolInner<T, K: Eq + Hash> {
78 connecting: HashSet<K>,
82 idle: HashMap<K, Vec<Idle<T>>>,
85 max_idle_per_host: usize,
86 waiters: HashMap<K, VecDeque<oneshot::Sender<T>>>,
96 idle_interval_ref: Option<oneshot::Sender<Infallible>>,
99 exec: Exec,
100 timer: Option<Timer>,
101 timeout: Option<Duration>,
102}
103
104struct WeakOpt<T>(Option<Weak<T>>);
107
108#[derive(Clone, Copy, Debug)]
109pub struct Config {
110 pub idle_timeout: Option<Duration>,
111 pub max_idle_per_host: usize,
112}
113
114impl Config {
115 pub fn is_enabled(&self) -> bool {
116 self.max_idle_per_host > 0
117 }
118}
119
120impl<T, K: Key> Pool<T, K> {
121 pub fn new<E, M>(config: Config, executor: E, _timer: Option<M>) -> Pool<T, K>
122 where
123 E: hyper::rt::Executor<exec::BoxSendFuture> + Send + Sync + Clone + 'static,
124 M: TimerTrait + Send + Sync + Clone + 'static,
125 {
126 let exec = Exec::new(executor);
127 let timer = Some(Timer::new());
128 let inner = if config.is_enabled() {
129 Some(Arc::new(Mutex::new(PoolInner {
130 connecting: HashSet::new(),
131 idle: HashMap::new(),
132 idle_interval_ref: None,
133 max_idle_per_host: config.max_idle_per_host,
134 waiters: HashMap::new(),
135 exec,
136 timer,
137 timeout: config.idle_timeout,
138 })))
139 } else {
140 None
141 };
142
143 Pool { inner }
144 }
145
146 pub(crate) fn is_enabled(&self) -> bool {
147 self.inner.is_some()
148 }
149
150 #[cfg(test)]
151 pub(super) fn no_timer(&self) {
152 {
154 let mut inner = self.inner.as_ref().unwrap().lock().unwrap();
155 assert!(inner.idle_interval_ref.is_none(), "timer already spawned");
156 let (tx, _) = oneshot::channel();
157 inner.idle_interval_ref = Some(tx);
158 }
159 }
160}
161
162impl<T: Poolable, K: Key> Pool<T, K> {
163 pub fn checkout(&self, key: K) -> Checkout<T, K> {
166 Checkout {
167 key,
168 pool: self.clone(),
169 waiter: None,
170 }
171 }
172
173 pub fn connecting(&self, key: &K, ver: Ver) -> Option<Connecting<T, K>> {
176 if ver == Ver::Http2 {
177 if let Some(ref enabled) = self.inner {
178 let mut inner = enabled.lock().unwrap();
179 return if inner.connecting.insert(key.clone()) {
180 let connecting = Connecting {
181 key: key.clone(),
182 pool: WeakOpt::downgrade(enabled),
183 };
184 Some(connecting)
185 } else {
186 trace!("HTTP/2 connecting already in progress for {:?}", key);
187 None
188 };
189 }
190 }
191
192 Some(Connecting {
194 key: key.clone(),
195 pool: WeakOpt::none(),
198 })
199 }
200
201 #[cfg(test)]
202 fn locked(&self) -> std::sync::MutexGuard<'_, PoolInner<T, K>> {
203 self.inner.as_ref().expect("enabled").lock().expect("lock")
204 }
205
206 pub fn pooled(
224 &self,
225 #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut connecting: Connecting<T, K>,
226 value: T,
227 ) -> Pooled<T, K> {
228 let (value, pool_ref) = if let Some(ref enabled) = self.inner {
229 match value.reserve() {
230 #[cfg(feature = "http2")]
231 Reservation::Shared(to_insert, to_return) => {
232 let mut inner = enabled.lock().unwrap();
233 inner.put(connecting.key.clone(), to_insert, enabled);
234 inner.connected(&connecting.key);
237 connecting.pool = WeakOpt::none();
239
240 (to_return, WeakOpt::none())
243 }
244 Reservation::Unique(value) => {
245 (value, WeakOpt::downgrade(enabled))
249 }
250 }
251 } else {
252 debug_assert!(connecting.pool.upgrade().is_none());
256
257 (value, WeakOpt::none())
258 };
259 Pooled {
260 key: connecting.key.clone(),
261 is_reused: false,
262 pool: pool_ref,
263 value: Some(value),
264 }
265 }
266
267 fn reuse(&self, key: &K, value: T) -> Pooled<T, K> {
268 debug!("reuse idle connection for {:?}", key);
269 let mut pool_ref = WeakOpt::none();
278 if !value.can_share() {
279 if let Some(ref enabled) = self.inner {
280 pool_ref = WeakOpt::downgrade(enabled);
281 }
282 }
283
284 Pooled {
285 is_reused: true,
286 key: key.clone(),
287 pool: pool_ref,
288 value: Some(value),
289 }
290 }
291}
292
293struct IdlePopper<'a, T, K> {
295 key: &'a K,
296 list: &'a mut Vec<Idle<T>>,
297}
298
299impl<'a, T: Poolable + 'a, K: Debug> IdlePopper<'a, T, K> {
300 fn pop(self, expiration: &Expiration) -> Option<Idle<T>> {
301 while let Some(entry) = self.list.pop() {
302 if !entry.value.is_open() {
305 trace!("removing closed connection for {:?}", self.key);
306 continue;
307 }
308 if expiration.expires(entry.idle_at) {
315 trace!("removing expired connection for {:?}", self.key);
316 continue;
317 }
318
319 let value = match entry.value.reserve() {
320 #[cfg(feature = "http2")]
321 Reservation::Shared(to_reinsert, to_checkout) => {
322 self.list.push(Idle {
323 idle_at: Instant::now(),
324 value: to_reinsert,
325 });
326 to_checkout
327 }
328 Reservation::Unique(unique) => unique,
329 };
330
331 return Some(Idle {
332 idle_at: entry.idle_at,
333 value,
334 });
335 }
336
337 None
338 }
339}
340
341impl<T: Poolable, K: Key> PoolInner<T, K> {
342 fn put(&mut self, key: K, value: T, __pool_ref: &Arc<Mutex<PoolInner<T, K>>>) {
343 if value.can_share() && self.idle.contains_key(&key) {
344 trace!("put; existing idle HTTP/2 connection for {:?}", key);
345 return;
346 }
347 trace!("put; add idle connection for {:?}", key);
348 let mut remove_waiters = false;
349 let mut value = Some(value);
350 if let Some(waiters) = self.waiters.get_mut(&key) {
351 while let Some(tx) = waiters.pop_front() {
352 if !tx.is_canceled() {
353 let reserved = value.take().expect("value already sent");
354 let reserved = match reserved.reserve() {
355 #[cfg(feature = "http2")]
356 Reservation::Shared(to_keep, to_send) => {
357 value = Some(to_keep);
358 to_send
359 }
360 Reservation::Unique(uniq) => uniq,
361 };
362 match tx.send(reserved) {
363 Ok(()) => {
364 if value.is_none() {
365 break;
366 } else {
367 continue;
368 }
369 }
370 Err(e) => {
371 value = Some(e);
372 }
373 }
374 }
375
376 trace!("put; removing canceled waiter for {:?}", key);
377 }
378 remove_waiters = waiters.is_empty();
379 }
380 if remove_waiters {
381 self.waiters.remove(&key);
382 }
383
384 match value {
385 Some(value) => {
386 {
388 let idle_list = self.idle.entry(key.clone()).or_default();
389 if self.max_idle_per_host <= idle_list.len() {
390 trace!("max idle per host for {:?}, dropping connection", key);
391 return;
392 }
393
394 debug!("pooling idle connection for {:?}", key);
395 idle_list.push(Idle {
396 value,
397 idle_at: Instant::now(),
398 });
399 }
400
401 self.spawn_idle_interval(__pool_ref);
402 }
403 None => trace!("put; found waiter for {:?}", key),
404 }
405 }
406
407 fn connected(&mut self, key: &K) {
410 let existed = self.connecting.remove(key);
411 debug_assert!(existed, "Connecting dropped, key not in pool.connecting");
412 self.waiters.remove(key);
416 }
417
418 fn spawn_idle_interval(&mut self, pool_ref: &Arc<Mutex<PoolInner<T, K>>>) {
419 if self.idle_interval_ref.is_some() {
420 return;
421 }
422 let dur = if let Some(dur) = self.timeout {
423 dur
424 } else {
425 return;
426 };
427 let timer = if let Some(timer) = self.timer.clone() {
428 timer
429 } else {
430 return;
431 };
432 let (tx, rx) = oneshot::channel();
433 self.idle_interval_ref = Some(tx);
434
435 let interval = IdleTask {
436 timer: timer.clone(),
437 duration: dur,
438 deadline: Instant::now(),
439 fut: timer.sleep_until(Instant::now()), pool: WeakOpt::downgrade(pool_ref),
441 pool_drop_notifier: rx,
442 };
443
444 self.exec.execute(interval);
445 }
446}
447
448impl<T, K: Eq + Hash> PoolInner<T, K> {
449 fn clean_waiters(&mut self, key: &K) {
454 let mut remove_waiters = false;
455 if let Some(waiters) = self.waiters.get_mut(key) {
456 waiters.retain(|tx| !tx.is_canceled());
457 remove_waiters = waiters.is_empty();
458 }
459 if remove_waiters {
460 self.waiters.remove(key);
461 }
462 }
463}
464
465impl<T: Poolable, K: Key> PoolInner<T, K> {
466 fn clear_expired(&mut self) {
468 let dur = self.timeout.expect("interval assumes timeout");
469
470 let now = Instant::now();
471 self.idle.retain(|key, values| {
474 values.retain(|entry| {
475 if !entry.value.is_open() {
476 trace!("idle interval evicting closed for {:?}", key);
477 return false;
478 }
479
480 if now.duration_since(entry.idle_at) > dur {
481 trace!("idle interval evicting expired for {:?}", key);
482 return false;
483 }
484
485 true
487 });
488
489 !values.is_empty()
491 });
492 }
493}
494
495impl<T, K: Key> Clone for Pool<T, K> {
496 fn clone(&self) -> Pool<T, K> {
497 Pool {
498 inner: self.inner.clone(),
499 }
500 }
501}
502
503pub struct Pooled<T: Poolable, K: Key> {
506 value: Option<T>,
507 is_reused: bool,
508 key: K,
509 pool: WeakOpt<Mutex<PoolInner<T, K>>>,
510}
511
512impl<T: Poolable, K: Key> Pooled<T, K> {
513 pub fn is_reused(&self) -> bool {
514 self.is_reused
515 }
516
517 pub fn is_pool_enabled(&self) -> bool {
518 self.pool.0.is_some()
519 }
520
521 fn as_ref(&self) -> &T {
522 self.value.as_ref().expect("not dropped")
523 }
524
525 fn as_mut(&mut self) -> &mut T {
526 self.value.as_mut().expect("not dropped")
527 }
528}
529
530impl<T: Poolable, K: Key> Deref for Pooled<T, K> {
531 type Target = T;
532 fn deref(&self) -> &T {
533 self.as_ref()
534 }
535}
536
537impl<T: Poolable, K: Key> DerefMut for Pooled<T, K> {
538 fn deref_mut(&mut self) -> &mut T {
539 self.as_mut()
540 }
541}
542
543impl<T: Poolable, K: Key> Drop for Pooled<T, K> {
544 fn drop(&mut self) {
545 if let Some(value) = self.value.take() {
546 if !value.is_open() {
547 return;
550 }
551
552 if let Some(pool) = self.pool.upgrade() {
553 if let Ok(mut inner) = pool.lock() {
554 inner.put(self.key.clone(), value, &pool);
555 }
556 } else if !value.can_share() {
557 trace!("pool dropped, dropping pooled ({:?})", self.key);
558 }
559 }
562 }
563}
564
565impl<T: Poolable, K: Key> fmt::Debug for Pooled<T, K> {
566 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
567 f.debug_struct("Pooled").field("key", &self.key).finish()
568 }
569}
570
571struct Idle<T> {
572 idle_at: Instant,
573 value: T,
574}
575
576#[allow(missing_debug_implementations)]
578pub struct Checkout<T, K: Key> {
579 key: K,
580 pool: Pool<T, K>,
581 waiter: Option<oneshot::Receiver<T>>,
582}
583
584#[derive(Debug)]
585#[non_exhaustive]
586pub enum Error {
587 PoolDisabled,
588 CheckoutNoLongerWanted,
589 CheckedOutClosedValue,
590}
591
592impl Error {
593 pub(super) fn is_canceled(&self) -> bool {
594 matches!(self, Error::CheckedOutClosedValue)
595 }
596}
597
598impl fmt::Display for Error {
599 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
600 f.write_str(match self {
601 Error::PoolDisabled => "pool is disabled",
602 Error::CheckedOutClosedValue => "checked out connection was closed",
603 Error::CheckoutNoLongerWanted => "request was canceled",
604 })
605 }
606}
607
608impl StdError for Error {}
609
610impl<T: Poolable, K: Key> Checkout<T, K> {
611 fn poll_waiter(
612 &mut self,
613 cx: &mut task::Context<'_>,
614 ) -> Poll<Option<Result<Pooled<T, K>, Error>>> {
615 if let Some(mut rx) = self.waiter.take() {
616 match Pin::new(&mut rx).poll(cx) {
617 Poll::Ready(Ok(value)) => {
618 if value.is_open() {
619 Poll::Ready(Some(Ok(self.pool.reuse(&self.key, value))))
620 } else {
621 Poll::Ready(Some(Err(Error::CheckedOutClosedValue)))
622 }
623 }
624 Poll::Pending => {
625 self.waiter = Some(rx);
626 Poll::Pending
627 }
628 Poll::Ready(Err(_canceled)) => {
629 Poll::Ready(Some(Err(Error::CheckoutNoLongerWanted)))
630 }
631 }
632 } else {
633 Poll::Ready(None)
634 }
635 }
636
637 fn checkout(&mut self, cx: &mut task::Context<'_>) -> Option<Pooled<T, K>> {
638 let entry = {
639 let mut inner = self.pool.inner.as_ref()?.lock().unwrap();
640 let expiration = Expiration::new(inner.timeout);
641 let maybe_entry = inner.idle.get_mut(&self.key).and_then(|list| {
642 trace!("take? {:?}: expiration = {:?}", self.key, expiration.0);
643 {
646 let popper = IdlePopper {
647 key: &self.key,
648 list,
649 };
650 popper.pop(&expiration)
651 }
652 .map(|e| (e, list.is_empty()))
653 });
654
655 let (entry, empty) = if let Some((e, empty)) = maybe_entry {
656 (Some(e), empty)
657 } else {
658 (None, true)
660 };
661 if empty {
662 inner.idle.remove(&self.key);
664 }
665
666 if entry.is_none() && self.waiter.is_none() {
667 let (tx, mut rx) = oneshot::channel();
668 trace!("checkout waiting for idle connection: {:?}", self.key);
669 inner
670 .waiters
671 .entry(self.key.clone())
672 .or_insert_with(VecDeque::new)
673 .push_back(tx);
674
675 assert!(Pin::new(&mut rx).poll(cx).is_pending());
677 self.waiter = Some(rx);
678 }
679
680 entry
681 };
682
683 entry.map(|e| self.pool.reuse(&self.key, e.value))
684 }
685}
686
687impl<T: Poolable, K: Key> Future for Checkout<T, K> {
688 type Output = Result<Pooled<T, K>, Error>;
689
690 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
691 if let Some(pooled) = ready!(self.poll_waiter(cx)?) {
692 return Poll::Ready(Ok(pooled));
693 }
694
695 if let Some(pooled) = self.checkout(cx) {
696 Poll::Ready(Ok(pooled))
697 } else if !self.pool.is_enabled() {
698 Poll::Ready(Err(Error::PoolDisabled))
699 } else {
700 debug_assert!(self.waiter.is_some());
702 Poll::Pending
703 }
704 }
705}
706
707impl<T, K: Key> Drop for Checkout<T, K> {
708 fn drop(&mut self) {
709 if self.waiter.take().is_some() {
710 trace!("checkout dropped for {:?}", self.key);
711 if let Some(Ok(mut inner)) = self.pool.inner.as_ref().map(|i| i.lock()) {
712 inner.clean_waiters(&self.key);
713 }
714 }
715 }
716}
717
718#[allow(missing_debug_implementations)]
720pub struct Connecting<T: Poolable, K: Key> {
721 key: K,
722 pool: WeakOpt<Mutex<PoolInner<T, K>>>,
723}
724
725impl<T: Poolable, K: Key> Connecting<T, K> {
726 pub fn alpn_h2(self, pool: &Pool<T, K>) -> Option<Self> {
727 debug_assert!(
728 self.pool.0.is_none(),
729 "Connecting::alpn_h2 but already Http2"
730 );
731
732 pool.connecting(&self.key, Ver::Http2)
733 }
734}
735
736impl<T: Poolable, K: Key> Drop for Connecting<T, K> {
737 fn drop(&mut self) {
738 if let Some(pool) = self.pool.upgrade() {
739 if let Ok(mut inner) = pool.lock() {
741 inner.connected(&self.key);
742 }
743 }
744 }
745}
746
747struct Expiration(Option<Duration>);
748
749impl Expiration {
750 fn new(dur: Option<Duration>) -> Expiration {
751 Expiration(dur)
752 }
753
754 fn expires(&self, instant: Instant) -> bool {
755 match self.0 {
756 Some(timeout) => Instant::now().duration_since(instant) > timeout,
757 None => false,
758 }
759 }
760}
761
762pin_project_lite::pin_project! {
763 struct IdleTask<T, K: Key> {
764 timer: Timer,
765 duration: Duration,
766 deadline: Instant,
767 fut: Sleep,
768 pool: WeakOpt<Mutex<PoolInner<T, K>>>,
769 #[pin]
773 pool_drop_notifier: oneshot::Receiver<Infallible>,
774 }
775}
776
777impl<T: Poolable + 'static, K: Key> Future for IdleTask<T, K> {
778 type Output = ();
779
780 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
781 let mut this = self.project();
782 loop {
783 match this.pool_drop_notifier.as_mut().poll(cx) {
784 Poll::Ready(Ok(n)) => match n {},
785 Poll::Pending => (),
786 Poll::Ready(Err(_canceled)) => {
787 trace!("pool closed, canceling idle interval");
788 return Poll::Ready(());
789 }
790 }
791
792 ready!(Pin::new(&mut this.fut).poll(cx));
793 *this.deadline += *this.duration;
797 if *this.deadline < Instant::now() - Duration::from_millis(5) {
798 *this.deadline = Instant::now() + *this.duration;
799 }
800 *this.fut = this.timer.sleep_until(*this.deadline);
801
802 if let Some(inner) = this.pool.upgrade() {
803 if let Ok(mut inner) = inner.lock() {
804 trace!("idle interval checking for expired");
805 inner.clear_expired();
806 continue;
807 }
808 }
809 return Poll::Ready(());
810 }
811 }
812}
813
814impl<T> WeakOpt<T> {
815 fn none() -> Self {
816 WeakOpt(None)
817 }
818
819 fn downgrade(arc: &Arc<T>) -> Self {
820 WeakOpt(Some(Arc::downgrade(arc)))
821 }
822
823 fn upgrade(&self) -> Option<Arc<T>> {
824 self.0.as_ref().and_then(Weak::upgrade)
825 }
826}
827
828#[cfg(all(test, not(miri)))]
829mod tests {
830 use std::fmt::Debug;
831 use std::future::Future;
832 use std::hash::Hash;
833 use std::pin::Pin;
834 use std::task::{self, Poll};
835 use std::time::Duration;
836
837 use super::{Connecting, Key, Pool, Poolable, Reservation, WeakOpt};
838 use crate::rt::{TokioExecutor, TokioTimer};
839
840 use crate::common::timer;
841
842 #[derive(Clone, Debug, PartialEq, Eq, Hash)]
843 struct KeyImpl(http::uri::Scheme, http::uri::Authority);
844
845 type KeyTuple = (http::uri::Scheme, http::uri::Authority);
846
847 #[derive(Debug, PartialEq, Eq)]
849 struct Uniq<T>(T);
850
851 impl<T: Send + 'static + Unpin> Poolable for Uniq<T> {
852 fn is_open(&self) -> bool {
853 true
854 }
855
856 fn reserve(self) -> Reservation<Self> {
857 Reservation::Unique(self)
858 }
859
860 fn can_share(&self) -> bool {
861 false
862 }
863 }
864
865 fn c<T: Poolable, K: Key>(key: K) -> Connecting<T, K> {
866 Connecting {
867 key,
868 pool: WeakOpt::none(),
869 }
870 }
871
872 fn host_key(s: &str) -> KeyImpl {
873 KeyImpl(http::uri::Scheme::HTTP, s.parse().expect("host key"))
874 }
875
876 fn pool_no_timer<T, K: Key>() -> Pool<T, K> {
877 pool_max_idle_no_timer(::std::usize::MAX)
878 }
879
880 fn pool_max_idle_no_timer<T, K: Key>(max_idle: usize) -> Pool<T, K> {
881 let pool = Pool::new(
882 super::Config {
883 idle_timeout: Some(Duration::from_millis(100)),
884 max_idle_per_host: max_idle,
885 },
886 TokioExecutor::new(),
887 Option::<timer::Timer>::None,
888 );
889 pool.no_timer();
890 pool
891 }
892
893 #[tokio::test]
894 async fn test_pool_checkout_smoke() {
895 let pool = pool_no_timer();
896 let key = host_key("foo");
897 let pooled = pool.pooled(c(key.clone()), Uniq(41));
898
899 drop(pooled);
900
901 match pool.checkout(key).await {
902 Ok(pooled) => assert_eq!(*pooled, Uniq(41)),
903 Err(_) => panic!("not ready"),
904 };
905 }
906
907 struct PollOnce<'a, F>(&'a mut F);
909
910 impl<F, T, U> Future for PollOnce<'_, F>
911 where
912 F: Future<Output = Result<T, U>> + Unpin,
913 {
914 type Output = Option<()>;
915
916 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
917 match Pin::new(&mut self.0).poll(cx) {
918 Poll::Ready(Ok(_)) => Poll::Ready(Some(())),
919 Poll::Ready(Err(_)) => Poll::Ready(Some(())),
920 Poll::Pending => Poll::Ready(None),
921 }
922 }
923 }
924
925 #[tokio::test]
926 async fn test_pool_checkout_returns_none_if_expired() {
927 let pool = pool_no_timer();
928 let key = host_key("foo");
929 let pooled = pool.pooled(c(key.clone()), Uniq(41));
930
931 drop(pooled);
932 tokio::time::sleep(pool.locked().timeout.unwrap()).await;
933 let mut checkout = pool.checkout(key);
934 let poll_once = PollOnce(&mut checkout);
935 let is_not_ready = poll_once.await.is_none();
936 assert!(is_not_ready);
937 }
938
939 #[tokio::test]
940 async fn test_pool_checkout_removes_expired() {
941 let pool = pool_no_timer();
942 let key = host_key("foo");
943
944 pool.pooled(c(key.clone()), Uniq(41));
945 pool.pooled(c(key.clone()), Uniq(5));
946 pool.pooled(c(key.clone()), Uniq(99));
947
948 assert_eq!(
949 pool.locked().idle.get(&key).map(|entries| entries.len()),
950 Some(3)
951 );
952 tokio::time::sleep(pool.locked().timeout.unwrap()).await;
953
954 let mut checkout = pool.checkout(key.clone());
955 let poll_once = PollOnce(&mut checkout);
956 poll_once.await;
958 assert!(pool.locked().idle.get(&key).is_none());
959 }
960
961 #[test]
962 fn test_pool_max_idle_per_host() {
963 let pool = pool_max_idle_no_timer(2);
964 let key = host_key("foo");
965
966 pool.pooled(c(key.clone()), Uniq(41));
967 pool.pooled(c(key.clone()), Uniq(5));
968 pool.pooled(c(key.clone()), Uniq(99));
969
970 assert_eq!(
972 pool.locked().idle.get(&key).map(|entries| entries.len()),
973 Some(2)
974 );
975 }
976
977 #[tokio::test]
978 async fn test_pool_timer_removes_expired() {
979 let pool = Pool::new(
980 super::Config {
981 idle_timeout: Some(Duration::from_millis(10)),
982 max_idle_per_host: std::usize::MAX,
983 },
984 TokioExecutor::new(),
985 Some(TokioTimer::new()),
986 );
987
988 let key = host_key("foo");
989
990 pool.pooled(c(key.clone()), Uniq(41));
991 pool.pooled(c(key.clone()), Uniq(5));
992 pool.pooled(c(key.clone()), Uniq(99));
993
994 assert_eq!(
995 pool.locked().idle.get(&key).map(|entries| entries.len()),
996 Some(3)
997 );
998
999 tokio::time::sleep(Duration::from_millis(30)).await;
1001 tokio::task::yield_now().await;
1003
1004 assert!(pool.locked().idle.get(&key).is_none());
1005 }
1006
1007 #[tokio::test]
1008 async fn test_pool_checkout_task_unparked() {
1009 use futures_util::future::join;
1010 use futures_util::FutureExt;
1011
1012 let pool = pool_no_timer();
1013 let key = host_key("foo");
1014 let pooled = pool.pooled(c(key.clone()), Uniq(41));
1015
1016 let checkout = join(pool.checkout(key), async {
1017 drop(pooled);
1023 })
1024 .map(|(entry, _)| entry);
1025
1026 assert_eq!(*checkout.await.unwrap(), Uniq(41));
1027 }
1028
1029 #[tokio::test]
1030 async fn test_pool_checkout_drop_cleans_up_waiters() {
1031 let pool = pool_no_timer::<Uniq<i32>, KeyImpl>();
1032 let key = host_key("foo");
1033
1034 let mut checkout1 = pool.checkout(key.clone());
1035 let mut checkout2 = pool.checkout(key.clone());
1036
1037 let poll_once1 = PollOnce(&mut checkout1);
1038 let poll_once2 = PollOnce(&mut checkout2);
1039
1040 poll_once1.await;
1042 assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1);
1043 poll_once2.await;
1044 assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 2);
1045
1046 drop(checkout1);
1048 assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1);
1049
1050 drop(checkout2);
1051 assert!(pool.locked().waiters.get(&key).is_none());
1052 }
1053
1054 #[derive(Debug)]
1055 struct CanClose {
1056 #[allow(unused)]
1057 val: i32,
1058 closed: bool,
1059 }
1060
1061 impl Poolable for CanClose {
1062 fn is_open(&self) -> bool {
1063 !self.closed
1064 }
1065
1066 fn reserve(self) -> Reservation<Self> {
1067 Reservation::Unique(self)
1068 }
1069
1070 fn can_share(&self) -> bool {
1071 false
1072 }
1073 }
1074
1075 #[test]
1076 fn pooled_drop_if_closed_doesnt_reinsert() {
1077 let pool = pool_no_timer();
1078 let key = host_key("foo");
1079 pool.pooled(
1080 c(key.clone()),
1081 CanClose {
1082 val: 57,
1083 closed: true,
1084 },
1085 );
1086
1087 assert!(!pool.locked().idle.contains_key(&key));
1088 }
1089}