mysql_connector/pool/
async_pool.rs

1use {
2    super::{AsyncPoolContent, AsyncPoolGetFuture, AsyncPoolTrait, PoolItem, PoolPut},
3    crossbeam::queue::{ArrayQueue, SegQueue},
4    std::{
5        future::Future,
6        mem::ManuallyDrop,
7        pin::Pin,
8        sync::atomic::{AtomicUsize, Ordering},
9        task::{self, Poll, Waker},
10    },
11};
12
13pub struct AsyncPool<T: AsyncPoolContent<C>, C, const N: usize> {
14    ctx: T::Ctx,
15    items: AtomicUsize,
16    pool: ArrayQueue<T>,
17    wakers: SegQueue<Waker>,
18}
19
20impl<T: AsyncPoolContent<C>, C, const N: usize> PoolPut<T> for AsyncPool<T, C, N> {
21    fn put(&self, value: T) {
22        // As we won't create too many items, the pool won't be full
23        let _ = self.pool.push(value);
24    }
25}
26
27impl<T: AsyncPoolContent<C>, C, const N: usize> AsyncPool<T, C, N> {
28    pub fn new(ctx: T::Ctx) -> Self {
29        Self {
30            ctx,
31            items: AtomicUsize::new(0),
32            pool: ArrayQueue::new(N),
33            wakers: SegQueue::new(),
34        }
35    }
36}
37
38impl<T: AsyncPoolContent<C>, C, const N: usize> AsyncPoolTrait<T> for AsyncPool<T, C, N> {
39    fn get(&self) -> Pin<Box<AsyncPoolGetFuture<'_, T>>> {
40        Box::pin(PoolTake {
41            pool: self,
42            add: None,
43            waker_added: false,
44        })
45    }
46}
47
48#[allow(clippy::type_complexity)]
49#[must_use = "futures do nothing unless you `.await` or poll them"]
50pub struct PoolTake<'a, T: AsyncPoolContent<C>, C, const N: usize> {
51    pool: &'a AsyncPool<T, C, N>,
52    add: Option<Pin<Box<dyn Future<Output = Result<T, T::Error>> + 'a>>>,
53    waker_added: bool,
54}
55
56impl<'a, T: AsyncPoolContent<C>, C, const N: usize> PoolTake<'a, T, C, N> {
57    fn poll_add(&mut self, cx: &mut task::Context<'_>) -> Option<Poll<<Self as Future>::Output>> {
58        self.add.as_mut().map(|add| {
59            add.as_mut().poll(cx).map(|res| {
60                res.map(|item| PoolItem {
61                    item: ManuallyDrop::new(item),
62                    pool: self.pool,
63                })
64            })
65        })
66    }
67}
68
69impl<'a, T: AsyncPoolContent<C>, C, const N: usize> Future for PoolTake<'a, T, C, N> {
70    type Output = Result<PoolItem<'a, T>, T::Error>;
71
72    fn poll(self: std::pin::Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
73        let this = self.get_mut();
74        let pool = this.pool;
75        if let Some(res) = this.poll_add(cx) {
76            return res;
77        }
78        match pool.pool.pop() {
79            Some(item) => Poll::Ready(Ok(PoolItem {
80                item: ManuallyDrop::new(item),
81                pool,
82            })),
83            None => {
84                let item_count = pool.items.load(Ordering::Relaxed);
85                if item_count < N {
86                    if pool
87                        .items
88                        .compare_exchange(
89                            item_count,
90                            item_count + 1,
91                            Ordering::Relaxed,
92                            Ordering::Relaxed,
93                        )
94                        .is_ok()
95                    {
96                        this.add = Some(Box::pin(T::new(&pool.ctx)));
97                        if let Some(res) = this.poll_add(cx) {
98                            return res;
99                        }
100                    } else {
101                        cx.waker().wake_by_ref();
102                    }
103                } else if !this.waker_added {
104                    this.waker_added = true;
105                    pool.wakers.push(cx.waker().clone());
106                    // wake for the rare case that an item was added after we tried to get one but before we registered the waker
107                    cx.waker().wake_by_ref();
108                }
109                Poll::Pending
110            }
111        }
112    }
113}