async_resource/pool/
acquire.rs1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4use std::time::Instant;
5
6use super::{AcquireError, Pool, ResourceResolve, Waiter};
7use crate::resource::Managed;
8
9enum AcquireState<T: Send + 'static, E: 'static> {
10 Init,
11 Resolve(ResourceResolve<T, E>),
12 Waiting(Waiter<ResourceResolve<T, E>>),
13}
14
15pub struct Acquire<T: Send + 'static, E: 'static> {
16 pool: Pool<T, E>,
17 start: Instant,
18 state: Option<AcquireState<T, E>>,
19}
20
21impl<T: Send, E> Acquire<T, E> {
22 pub fn new(pool: Pool<T, E>) -> Self {
23 Self {
24 pool,
25 start: Instant::now(),
26 state: Some(AcquireState::Init),
27 }
28 }
29}
30
31impl<T: Send, E> Future for Acquire<T, E> {
32 type Output = Result<Managed<T>, AcquireError<E>>;
33
34 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
35 let mut state = match self.state.take() {
36 Some(state) => state,
37 None => {
38 return Poll::Ready(Err(AcquireError::Stopped));
40 }
41 };
42
43 loop {
44 state = match state {
45 AcquireState::Init => {
46 let mut resolve = self.pool.inner.try_acquire_idle();
50 if resolve.is_empty() {
51 resolve = self.pool.inner.try_create();
53 }
54
55 if resolve.is_empty() {
56 let waiter = self.pool.inner.try_wait(self.start);
58 AcquireState::Waiting(waiter)
59 } else {
60 AcquireState::Resolve(resolve)
62 }
63 }
64
65 AcquireState::Resolve(mut resolve) => match Pin::new(&mut resolve).poll(cx) {
66 Poll::Pending => {
67 self.state.replace(AcquireState::Resolve(resolve));
69 return Poll::Pending;
70 }
71 Poll::Ready(Some(res)) => {
72 let res = res
73 .map(|guard| Managed::new(guard, self.pool.inner.shared().clone()))
74 .map_err(AcquireError::ResourceError);
75 return Poll::Ready(res);
76 }
77 Poll::Ready(None) => {
78 AcquireState::Init
80 }
81 },
82
83 AcquireState::Waiting(mut waiter) => match Pin::new(&mut *waiter).poll(cx) {
84 Poll::Pending => {
85 self.state.replace(AcquireState::Waiting(waiter));
86 return Poll::Pending;
87 }
88 Poll::Ready(result) => match result {
89 Ok(resolve) => AcquireState::Resolve(resolve),
90 Err(_) => return Poll::Ready(Err(AcquireError::Timeout)),
91 },
92 },
93 };
94 }
95 }
96}