async_resource/pool/
acquire.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4use std::time::Instant;
5
6use super::{AcquireError, Pool, ResourceResolve, Waiter};
7use crate::resource::Managed;
8
9enum AcquireState<T: Send + 'static, E: 'static> {
10    Init,
11    Resolve(ResourceResolve<T, E>),
12    Waiting(Waiter<ResourceResolve<T, E>>),
13}
14
15pub struct Acquire<T: Send + 'static, E: 'static> {
16    pool: Pool<T, E>,
17    start: Instant,
18    state: Option<AcquireState<T, E>>,
19}
20
21impl<T: Send, E> Acquire<T, E> {
22    pub fn new(pool: Pool<T, E>) -> Self {
23        Self {
24            pool,
25            start: Instant::now(),
26            state: Some(AcquireState::Init),
27        }
28    }
29}
30
31impl<T: Send, E> Future for Acquire<T, E> {
32    type Output = Result<Managed<T>, AcquireError<E>>;
33
34    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
35        let mut state = match self.state.take() {
36            Some(state) => state,
37            None => {
38                // future already completed
39                return Poll::Ready(Err(AcquireError::Stopped));
40            }
41        };
42
43        loop {
44            state = match state {
45                AcquireState::Init => {
46                    // FIXME check timer since we may return here after a failure
47
48                    // Try to acquire from the idle queue
49                    let mut resolve = self.pool.inner.try_acquire_idle();
50                    if resolve.is_empty() {
51                        // Queue is empty, try to create a new resource
52                        resolve = self.pool.inner.try_create();
53                    }
54
55                    if resolve.is_empty() {
56                        // Register a waiter
57                        let waiter = self.pool.inner.try_wait(self.start);
58                        AcquireState::Waiting(waiter)
59                    } else {
60                        // Evaluate any attached future if necessary
61                        AcquireState::Resolve(resolve)
62                    }
63                }
64
65                AcquireState::Resolve(mut resolve) => match Pin::new(&mut resolve).poll(cx) {
66                    Poll::Pending => {
67                        // FIXME check timer (need to register one)
68                        self.state.replace(AcquireState::Resolve(resolve));
69                        return Poll::Pending;
70                    }
71                    Poll::Ready(Some(res)) => {
72                        let res = res
73                            .map(|guard| Managed::new(guard, self.pool.inner.shared().clone()))
74                            .map_err(AcquireError::ResourceError);
75                        return Poll::Ready(res);
76                    }
77                    Poll::Ready(None) => {
78                        // Something went wrong during the resolve, start over
79                        AcquireState::Init
80                    }
81                },
82
83                AcquireState::Waiting(mut waiter) => match Pin::new(&mut *waiter).poll(cx) {
84                    Poll::Pending => {
85                        self.state.replace(AcquireState::Waiting(waiter));
86                        return Poll::Pending;
87                    }
88                    Poll::Ready(result) => match result {
89                        Ok(resolve) => AcquireState::Resolve(resolve),
90                        Err(_) => return Poll::Ready(Err(AcquireError::Timeout)),
91                    },
92                },
93            };
94        }
95    }
96}